From b03f823db05a9f89d7ba901b0237f0fd41031257 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Mon, 27 Mar 2017 12:00:36 +0200 Subject: [PATCH] added lookupKey --- arangod/MMFiles/MMFilesCollection.cpp | 2 +- arangod/MMFiles/MMFilesCollection.h | 2 +- arangod/RocksDBEngine/RocksDBCollection.cpp | 132 +++++++++++++++++- arangod/RocksDBEngine/RocksDBCollection.h | 9 ++ .../RocksDBEngine/RocksDBPrimaryMockIndex.cpp | 18 +-- .../RocksDBEngine/RocksDBPrimaryMockIndex.h | 5 +- arangod/RocksDBEngine/RocksDBToken.h | 48 +++++++ 7 files changed, 199 insertions(+), 17 deletions(-) create mode 100644 arangod/RocksDBEngine/RocksDBToken.h diff --git a/arangod/MMFiles/MMFilesCollection.cpp b/arangod/MMFiles/MMFilesCollection.cpp index a8ebd5607d..8a498b2b19 100644 --- a/arangod/MMFiles/MMFilesCollection.cpp +++ b/arangod/MMFiles/MMFilesCollection.cpp @@ -3530,7 +3530,7 @@ int MMFilesCollection::removeFastPath(arangodb::transaction::Methods* trx, /// the caller must make sure the read lock on the collection is held /// the key must be a string slice, no revision check is performed int MMFilesCollection::lookupDocument(transaction::Methods* trx, - VPackSlice const key, + VPackSlice key, ManagedDocumentResult& result) { if (!key.isString()) { return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD; diff --git a/arangod/MMFiles/MMFilesCollection.h b/arangod/MMFiles/MMFilesCollection.h index 3e7e67c1ec..a591b6a8f4 100644 --- a/arangod/MMFiles/MMFilesCollection.h +++ b/arangod/MMFiles/MMFilesCollection.h @@ -490,7 +490,7 @@ class MMFilesCollection final : public PhysicalCollection { int deleteSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId, velocypack::Slice const&, bool isRollback); - int lookupDocument(transaction::Methods*, velocypack::Slice const, + int lookupDocument(transaction::Methods*, velocypack::Slice, ManagedDocumentResult& result); int updateDocument(transaction::Methods*, TRI_voc_rid_t oldRevisionId, diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 01f0650ff1..8f1fb43c9e 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -32,6 +32,7 @@ #include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBEntry.h" #include "RocksDBEngine/RocksDBPrimaryMockIndex.h" +#include "RocksDBEngine/RocksDBToken.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionState.h" @@ -331,6 +332,10 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx, arangodb::ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool /*lock*/) { + // store the tick that was used for writing the document + // note that we don't need it for this engine + resultMarkerTick = 0; + VPackSlice fromSlice; VPackSlice toSlice; @@ -384,14 +389,13 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx, res = insertDocument(trx, revisionId, newSlice, options.waitForSync); if (res == TRI_ERROR_NO_ERROR) { + // TODO: handle returning of result value! + // uint8_t const* vpack = lookupRevisionVPack(revisionId); // if (vpack != nullptr) { // result.addExisting(vpack, revisionId); // } - // store the tick that was used for writing the document - // note that we don't need it for this engine - resultMarkerTick = 0; } return res; } @@ -424,11 +428,50 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const slice, arangodb::ManagedDocumentResult& previous, OperationOptions& options, - TRI_voc_tick_t& resultMarkerTick, bool lock, + TRI_voc_tick_t& resultMarkerTick, bool /*lock*/, TRI_voc_rid_t const& revisionId, TRI_voc_rid_t& prevRev) { - THROW_ARANGO_NOT_YET_IMPLEMENTED(); - return 0; + // store the tick that was used for writing the document + // note that we don't need it for this engine + resultMarkerTick = 0; + prevRev = 0; + + transaction::BuilderLeaser builder(trx); + newObjectForRemove(trx, slice, TRI_RidToString(revisionId), *builder.get()); + + VPackSlice key; + if (slice.isString()) { + key = slice; + } else { + key = slice.get(StaticStrings::KeyString); + } + TRI_ASSERT(!key.isNone()); + + // get the previous revision + int res = lookupDocument(trx, key, previous); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + + uint8_t const* vpack = previous.vpack(); + VPackSlice oldDoc(vpack); + TRI_voc_rid_t oldRevisionId = arangodb::transaction::helpers::extractRevFromDocument(oldDoc); + prevRev = oldRevisionId; + + // Check old revision: + if (!options.ignoreRevs && slice.isObject()) { + TRI_voc_rid_t expectedRevisionId = TRI_ExtractRevisionId(slice); + int res = checkRevision(trx, expectedRevisionId, oldRevisionId); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } + + res = removeDocument(trx, oldRevisionId, oldDoc, options.waitForSync); + + return res; } void RocksDBCollection::deferDropCollection( @@ -596,3 +639,80 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx, return result; } + +int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx, + TRI_voc_rid_t revisionId, + VPackSlice const& doc, + bool& waitForSync) { + // Coordinator doesn't know index internals + TRI_ASSERT(!ServerState::instance()->isCoordinator()); + + RocksDBEntry entry(RocksDBEntry::Document(_objectId, revisionId, basics::VelocyPackHelper::EmptyObjectValue())); + + rocksdb::WriteBatch writeBatch; + writeBatch.Delete(entry.key()); + + auto indexes = _indexes; + size_t const n = indexes.size(); + + int result = TRI_ERROR_NO_ERROR; + + for (size_t i = 0; i < n; ++i) { + auto idx = indexes[i]; + + int res = idx->remove(trx, revisionId, doc, false); + + // in case of no-memory, return immediately + if (res == TRI_ERROR_OUT_OF_MEMORY) { + return res; + } + } + + if (result != TRI_ERROR_NO_ERROR) { + rocksdb::WriteOptions writeOptions; + + if (_logicalCollection->waitForSync()) { + waitForSync = true; + } + + if (waitForSync) { + trx->state()->waitForSync(true); + + // handle waitForSync for single operations here + if (trx->state()->isSingleOperation()) { + writeOptions.sync = true; + } + } + + StorageEngine* engine = EngineSelectorFeature::ENGINE; + rocksdb::TransactionDB* db = static_cast(engine)->db(); + db->Write(writeOptions, &writeBatch); + } + + return result; +} + +/// @brief looks up a document by key, low level worker +/// the key must be a string slice, no revision check is performed +int RocksDBCollection::lookupDocument(transaction::Methods* trx, + VPackSlice key, + ManagedDocumentResult& result) { + if (!key.isString()) { + return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD; + } + + RocksDBToken token = primaryIndex()->lookupKey(trx, key, result); + TRI_voc_rid_t revisionId = token.revisionId(); + + if (revisionId > 0) { + // TODO: add result handling! +/* uint8_t const* vpack = lookupRevisionVPack(revisionId); + if (vpack != nullptr) { + result.addExisting(vpack, revisionId); + } +*/ + return TRI_ERROR_NO_ERROR; + } + + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; +} diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index c61cf640e4..c332efd31e 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -186,6 +186,15 @@ class RocksDBCollection final : public PhysicalCollection { arangodb::velocypack::Slice const& doc, bool& waitForSync); + int removeDocument(arangodb::transaction::Methods* trx, + TRI_voc_rid_t revisionId, + arangodb::velocypack::Slice const& doc, + bool& waitForSync); + + int lookupDocument(transaction::Methods* trx, + arangodb::velocypack::Slice key, + ManagedDocumentResult& result); + private: uint64_t _objectId; // rocksdb-specific object id for collection }; diff --git a/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.cpp index 800850bc43..59f61bbc04 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.cpp @@ -22,12 +22,12 @@ //////////////////////////////////////////////////////////////////////////////// #include "RocksDBPrimaryMockIndex.h" -#include "RocksDBEntry.h" #include "Aql/AstNode.h" #include "Basics/Exceptions.h" #include "Basics/StaticStrings.h" #include "Basics/VelocyPackHelper.h" #include "Indexes/SimpleAttributeEqualityMatcher.h" +#include "RocksDBEngine/RocksDBEntry.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" #include "Transaction/Context.h" @@ -140,11 +140,16 @@ void RocksDBPrimaryMockIndex::toVelocyPackFigures(VPackBuilder& builder) const { // TODO: implement } +RocksDBToken RocksDBPrimaryMockIndex::lookupKey(transaction::Methods* trx, VPackSlice key, ManagedDocumentResult& result) { + std::lock_guard lock(_keyRevMutex); + auto it = _keyRevMap.find(key.copyString()); + if (it == _keyRevMap.end()) { + return RocksDBToken(); + } + return RocksDBToken((*it).second); +} + int RocksDBPrimaryMockIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId, VPackSlice const& slice, bool) { -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "insert() called for primary index"; -#endif -// // THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "insert() called for primary index"); auto value = RocksDBEntry::IndexValue(objectId(), revisionId, slice); std::lock_guard lock(_keyRevMutex); auto result = _keyRevMap.emplace(value.key(),value.revisionId()); @@ -155,9 +160,6 @@ int RocksDBPrimaryMockIndex::insert(transaction::Methods*, TRI_voc_rid_t revisio } int RocksDBPrimaryMockIndex::remove(transaction::Methods*, TRI_voc_rid_t revisionId, VPackSlice const& slice, bool) { -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "remove() called for primary index"; -#endif auto value = RocksDBEntry::IndexValue(objectId(), revisionId, slice); std::lock_guard lock(_keyRevMutex); auto result = _keyRevMap.erase(value.key()); //result number of deleted elements diff --git a/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.h b/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.h index c131ca20d6..f12b5cb463 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.h +++ b/arangod/RocksDBEngine/RocksDBPrimaryMockIndex.h @@ -27,6 +27,7 @@ #include "Basics/Common.h" #include "Indexes/Index.h" #include "Indexes/IndexIterator.h" +#include "RocksDBEngine/RocksDBToken.h" #include "VocBase/vocbase.h" #include "VocBase/voc-types.h" @@ -38,7 +39,7 @@ #include namespace arangodb { - +class ManagedDocumentResult; class RocksDBPrimaryMockIndex; namespace transaction { class Methods; @@ -136,6 +137,8 @@ class RocksDBPrimaryMockIndex final : public Index { void toVelocyPack(VPackBuilder&, bool) const override; void toVelocyPackFigures(VPackBuilder&) const override; + RocksDBToken lookupKey(transaction::Methods* trx, arangodb::velocypack::Slice key, ManagedDocumentResult& result); + int insert(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; int remove(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; diff --git a/arangod/RocksDBEngine/RocksDBToken.h b/arangod/RocksDBEngine/RocksDBToken.h new file mode 100644 index 0000000000..c00d33dd36 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBToken.h @@ -0,0 +1,48 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Michael Hackstein +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_ROCKSDB_ENGINE_ROCKSDB_TOKEN_H +#define ARANGOD_ROCKSDB_ENGINE_ROCKSDB_TOKEN_H 1 + +#include "StorageEngine/DocumentIdentifierToken.h" + +namespace arangodb { + +struct RocksDBToken : public DocumentIdentifierToken { + public: + RocksDBToken() : DocumentIdentifierToken() {} + explicit RocksDBToken(TRI_voc_rid_t revisionId) + : DocumentIdentifierToken(revisionId) {} + RocksDBToken(RocksDBToken const& other) + : DocumentIdentifierToken(other._data) {} + + inline TRI_voc_rid_t revisionId() const { + return static_cast(_data); + } +}; + +static_assert(sizeof(RocksDBToken) == sizeof(uint64_t), "invalid RocksDBToken size"); + +} + +#endif