From 4ec2a40cd52f148727d1b039e5aa5eab163a9e87 Mon Sep 17 00:00:00 2001 From: Dan Larkin Date: Thu, 20 Apr 2017 11:10:08 -0400 Subject: [PATCH] Split replication context/status/tailing. --- arangod/RocksDBEngine/CMakeLists.txt | 2 + .../RocksDBReplicationCommon.cpp | 32 +++ .../RocksDBEngine/RocksDBReplicationCommon.h | 43 ++++ .../RocksDBReplicationContext.cpp | 189 +--------------- .../RocksDBEngine/RocksDBReplicationContext.h | 18 +- .../RocksDBReplicationTailing.cpp | 213 ++++++++++++++++++ .../RocksDBEngine/RocksDBReplicationTailing.h | 46 ++++ 7 files changed, 342 insertions(+), 201 deletions(-) create mode 100644 arangod/RocksDBEngine/RocksDBReplicationCommon.cpp create mode 100644 arangod/RocksDBEngine/RocksDBReplicationCommon.h create mode 100644 arangod/RocksDBEngine/RocksDBReplicationTailing.cpp create mode 100644 arangod/RocksDBEngine/RocksDBReplicationTailing.h diff --git a/arangod/RocksDBEngine/CMakeLists.txt b/arangod/RocksDBEngine/CMakeLists.txt index e38a47f26f..b4b23c77ea 100644 --- a/arangod/RocksDBEngine/CMakeLists.txt +++ b/arangod/RocksDBEngine/CMakeLists.txt @@ -16,8 +16,10 @@ set(ROCKSDB_SOURCES RocksDBEngine/RocksDBKey.cpp RocksDBEngine/RocksDBKeyBounds.cpp RocksDBEngine/RocksDBPrimaryIndex.cpp + RocksDBEngine/RocksDBReplicationCommon.cpp RocksDBEngine/RocksDBReplicationContext.cpp RocksDBEngine/RocksDBReplicationManager.cpp + RocksDBEngine/RocksDBReplicationTailing.cpp RocksDBEngine/RocksDBRestExportHandler.cpp RocksDBEngine/RocksDBRestHandlers.cpp RocksDBEngine/RocksDBRestReplicationHandler.cpp diff --git a/arangod/RocksDBEngine/RocksDBReplicationCommon.cpp b/arangod/RocksDBEngine/RocksDBReplicationCommon.cpp new file mode 100644 index 0000000000..d6cb9588d1 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBReplicationCommon.cpp @@ -0,0 +1,32 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +//////////////////////////////////////////////////////////////////////////////// + +#include "RocksDBEngine/RocksDBReplicationCommon.h" + +using namespace arangodb; + +RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber, + uint64_t maxTick) + : Result(errorNumber), _maxTick(maxTick) {} + +uint64_t RocksDBReplicationResult::maxTick() const { return _maxTick; } diff --git a/arangod/RocksDBEngine/RocksDBReplicationCommon.h b/arangod/RocksDBEngine/RocksDBReplicationCommon.h new file mode 100644 index 0000000000..ddc413ae8a --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBReplicationCommon.h @@ -0,0 +1,43 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGO_ROCKSDB_ROCKSDB_REPLICATION_COMMON_H +#define ARANGO_ROCKSDB_ROCKSDB_REPLICATION_COMMON_H 1 + +#include "Basics/Common.h" +#include "Basics/Result.h" + +namespace arangodb { + +class RocksDBReplicationResult : public Result { + public: + RocksDBReplicationResult(int, uint64_t); + uint64_t maxTick() const; + + private: + uint64_t _maxTick; +}; + +} // namespace arangodb + +#endif diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 48cbd44226..0936b11d70 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -30,159 +30,10 @@ #include "VocBase/replication-common.h" #include "VocBase/ticks.h" -#include -#include -#include - using namespace arangodb; using namespace arangodb::rocksutils; using namespace arangodb::velocypack; -/// WAL parser, no locking required here, because we have been locked from the -/// outside -class WBReader : public rocksdb::WriteBatch::Handler { - public: - explicit WBReader(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit, - bool includeSystem, VPackBuilder& builder) - : _vocbase(vocbase), - _from(from), - _limit(limit), - _includeSystem(includeSystem), - _builder(builder) {} - - void Put(rocksdb::Slice const& key, rocksdb::Slice const& value) override { - if (shouldHandleKey(key)) { - int res = TRI_ERROR_NO_ERROR; - _builder.openObject(); - switch (RocksDBKey::type(key)) { - case RocksDBEntryType::Collection: { - _builder.add( - "type", - VPackValue(static_cast(REPLICATION_COLLECTION_CREATE))); - } - case RocksDBEntryType::Document: { - _builder.add( - "type", - VPackValue(static_cast(REPLICATION_MARKER_DOCUMENT))); - // TODO: add transaction id? - break; - } - default: - break; // shouldn't get here? - } - - auto containers = getContainerIds(key); - _builder.add("database", VPackValue(containers.first)); - _builder.add("cid", VPackValue(containers.second)); - _builder.add("data", RocksDBValue::data(value)); - - _builder.close(); - - if (res == TRI_ERROR_NO_ERROR) { - _limit--; - } - } - } - - void Delete(rocksdb::Slice const& key) override { handleDeletion(key); } - - void SingleDelete(rocksdb::Slice const& key) override { handleDeletion(key); } - - private: - bool shouldHandleKey(rocksdb::Slice const& key) { - if (_limit == 0) { - return false; - } - - switch (RocksDBKey::type(key)) { - case RocksDBEntryType::Collection: - case RocksDBEntryType::Document: { - return fromEligibleCollection(key); - } - case RocksDBEntryType::View: // should handle these eventually? - default: - return false; - } - } - - void handleDeletion(rocksdb::Slice const& key) { - if (shouldHandleKey(key)) { - int res = TRI_ERROR_NO_ERROR; - _builder.openObject(); - switch (RocksDBKey::type(key)) { - case RocksDBEntryType::Collection: { - _builder.add( - "type", - VPackValue(static_cast(REPLICATION_COLLECTION_DROP))); - auto containers = getContainerIds(key); - _builder.add("database", VPackValue(containers.first)); - _builder.add("cid", VPackValue(containers.second)); - break; - } - case RocksDBEntryType::Document: { - uint64_t revisionId = RocksDBKey::revisionId(key); - _builder.add( - "type", - VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); - // TODO: add transaction id? - auto containers = getContainerIds(key); - _builder.add("database", VPackValue(containers.first)); - _builder.add("cid", VPackValue(containers.second)); - _builder.add("data", VPackValue(VPackValueType::Object)); - _builder.add(StaticStrings::RevString, - VPackValue(std::to_string(revisionId))); - _builder.close(); - break; - } - default: - break; // shouldn't get here? - } - _builder.close(); - if (res == TRI_ERROR_NO_ERROR) { - _limit--; - } - } - } - - std::pair getContainerIds( - rocksdb::Slice const& key) { - uint64_t objectId = RocksDBKey::collectionId(key); - return mapObjectToCollection(objectId); - } - - bool fromEligibleCollection(rocksdb::Slice const& key) { - auto mapping = getContainerIds(key); - if (mapping.first == _vocbase->id()) { - std::string const collectionName = - _vocbase->collectionName(mapping.second); - - if (collectionName.size() == 0) { - return false; - } - - if (!_includeSystem && collectionName[0] == '_') { - return false; - } - - return true; - } - return false; - } - - private: - TRI_vocbase_t* _vocbase; - uint64_t _from; - size_t& _limit; - bool _includeSystem; - VPackBuilder& _builder; -}; - -RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber, - uint64_t maxTick) - : Result(errorNumber), _maxTick(maxTick) {} - -uint64_t RocksDBReplicationResult::maxTick() const { return _maxTick; } - RocksDBReplicationContext::RocksDBReplicationContext() : _id(TRI_NewTickServer()), _lastTick(0), @@ -190,6 +41,10 @@ RocksDBReplicationContext::RocksDBReplicationContext() _collection(nullptr), _iter() {} +RocksDBReplicationContext::~RocksDBReplicationContext() { + releaseDumpingResources(); +} + TRI_voc_tick_t RocksDBReplicationContext::id() const { return _id; } uint64_t RocksDBReplicationContext::lastTick() const { return _lastTick; } @@ -242,42 +97,6 @@ std::pair RocksDBReplicationContext::dump( hasMore); } -// iterates over WAL starting at 'from' and returns up to 'limit' documents -// from the corresponding database -RocksDBReplicationResult RocksDBReplicationContext::tail( - TRI_vocbase_t* vocbase, uint64_t from, size_t limit, bool includeSystem, - VPackBuilder& builder) { - releaseDumpingResources(); - std::unique_ptr handler( - new WBReader(vocbase, from, limit, includeSystem, builder)); - std::unique_ptr iterator; // reader(); - rocksdb::Status s = static_cast(globalRocksDB()) - ->GetUpdatesSince(from, &iterator); - if (!s.ok()) { // TODO do something? - auto converted = convertStatus(s); - return {converted.errorNumber(), _lastTick}; - } - - while (iterator->Valid() && limit > 0) { - s = iterator->status(); - if (s.ok()) { - rocksdb::BatchResult batch = iterator->GetBatch(); - _lastTick = batch.sequence; - s = batch.writeBatchPtr->Iterate(handler.get()); - } - if (!s.ok()) { - LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan"; - LOG_TOPIC(ERR, Logger::ENGINES) << iterator->status().getState(); - auto converted = convertStatus(s); - return {converted.errorNumber(), _lastTick}; - } - - iterator->Next(); - } - - return {TRI_ERROR_NO_ERROR, _lastTick}; -} - double RocksDBReplicationContext::expires() const { return _expires; } bool RocksDBReplicationContext::isDeleted() const { return _isDeleted; } diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index 16eaf3e6cc..134774dd7b 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -25,8 +25,8 @@ #define ARANGO_ROCKSDB_ROCKSDB_REPLICATION_CONTEXT_H 1 #include "Basics/Common.h" -#include "Basics/Result.h" #include "Indexes/IndexIterator.h" +#include "RocksDBEngine/RocksDBReplicationCommon.h" #include "RocksDBEngine/RocksDBToken.h" #include "Transaction/Methods.h" #include "VocBase/vocbase.h" @@ -36,15 +36,6 @@ namespace arangodb { -class RocksDBReplicationResult : public Result { - public: - RocksDBReplicationResult(int, uint64_t); - uint64_t maxTick() const; - - private: - uint64_t _maxTick; -}; - class RocksDBReplicationContext { private: typedef std::function @@ -52,6 +43,7 @@ class RocksDBReplicationContext { public: RocksDBReplicationContext(); + ~RocksDBReplicationContext(); TRI_voc_tick_t id() const; uint64_t lastTick() const; @@ -66,12 +58,6 @@ class RocksDBReplicationContext { TRI_vocbase_t* vocbase, std::string const& collectionName, TokenCallback cb, size_t limit); - // iterates over WAL starting at 'from' and returns up to 'limit' documents - // from the corresponding database; releases dumping resources - RocksDBReplicationResult tail(TRI_vocbase_t* vocbase, uint64_t from, - size_t limit, bool includeSystem, - VPackBuilder& builder); - double expires() const; bool isDeleted() const; void deleted(); diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp new file mode 100644 index 0000000000..54e42915bb --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -0,0 +1,213 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +//////////////////////////////////////////////////////////////////////////////// + +#include "RocksDBEngine/RocksDBReplicationTailing.h" +#include "Basics/StaticStrings.h" +#include "Logger/Logger.h" +#include "RocksDBEngine/RocksDBCommon.h" +#include "VocBase/replication-common.h" +#include "VocBase/ticks.h" + +#include +#include +#include + +using namespace arangodb; +using namespace arangodb::rocksutils; +using namespace arangodb::velocypack; + +/// WAL parser, no locking required here, because we have been locked from the +/// outside +class WBReader : public rocksdb::WriteBatch::Handler { + public: + explicit WBReader(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit, + bool includeSystem, VPackBuilder& builder) + : _vocbase(vocbase), + _from(from), + _limit(limit), + _includeSystem(includeSystem), + _builder(builder) {} + + void Put(rocksdb::Slice const& key, rocksdb::Slice const& value) override { + if (shouldHandleKey(key)) { + int res = TRI_ERROR_NO_ERROR; + _builder.openObject(); + switch (RocksDBKey::type(key)) { + case RocksDBEntryType::Collection: { + _builder.add( + "type", + VPackValue(static_cast(REPLICATION_COLLECTION_CREATE))); + } + case RocksDBEntryType::Document: { + _builder.add( + "type", + VPackValue(static_cast(REPLICATION_MARKER_DOCUMENT))); + // TODO: add transaction id? + break; + } + default: + break; // shouldn't get here? + } + + auto containers = getContainerIds(key); + _builder.add("database", VPackValue(containers.first)); + _builder.add("cid", VPackValue(containers.second)); + _builder.add("data", RocksDBValue::data(value)); + + _builder.close(); + + if (res == TRI_ERROR_NO_ERROR) { + _limit--; + } + } + } + + void Delete(rocksdb::Slice const& key) override { handleDeletion(key); } + + void SingleDelete(rocksdb::Slice const& key) override { handleDeletion(key); } + + private: + bool shouldHandleKey(rocksdb::Slice const& key) { + if (_limit == 0) { + return false; + } + + switch (RocksDBKey::type(key)) { + case RocksDBEntryType::Collection: + case RocksDBEntryType::Document: { + return fromEligibleCollection(key); + } + case RocksDBEntryType::View: // should handle these eventually? + default: + return false; + } + } + + void handleDeletion(rocksdb::Slice const& key) { + if (shouldHandleKey(key)) { + int res = TRI_ERROR_NO_ERROR; + _builder.openObject(); + switch (RocksDBKey::type(key)) { + case RocksDBEntryType::Collection: { + _builder.add( + "type", + VPackValue(static_cast(REPLICATION_COLLECTION_DROP))); + auto containers = getContainerIds(key); + _builder.add("database", VPackValue(containers.first)); + _builder.add("cid", VPackValue(containers.second)); + break; + } + case RocksDBEntryType::Document: { + uint64_t revisionId = RocksDBKey::revisionId(key); + _builder.add( + "type", + VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); + // TODO: add transaction id? + auto containers = getContainerIds(key); + _builder.add("database", VPackValue(containers.first)); + _builder.add("cid", VPackValue(containers.second)); + _builder.add("data", VPackValue(VPackValueType::Object)); + _builder.add(StaticStrings::RevString, + VPackValue(std::to_string(revisionId))); + _builder.close(); + break; + } + default: + break; // shouldn't get here? + } + _builder.close(); + if (res == TRI_ERROR_NO_ERROR) { + _limit--; + } + } + } + + std::pair getContainerIds( + rocksdb::Slice const& key) { + uint64_t objectId = RocksDBKey::collectionId(key); + return mapObjectToCollection(objectId); + } + + bool fromEligibleCollection(rocksdb::Slice const& key) { + auto mapping = getContainerIds(key); + if (mapping.first == _vocbase->id()) { + std::string const collectionName = + _vocbase->collectionName(mapping.second); + + if (collectionName.size() == 0) { + return false; + } + + if (!_includeSystem && collectionName[0] == '_') { + return false; + } + + return true; + } + return false; + } + + private: + TRI_vocbase_t* _vocbase; + uint64_t _from; + size_t& _limit; + bool _includeSystem; + VPackBuilder& _builder; +}; + +// iterates over WAL starting at 'from' and returns up to 'limit' documents +// from the corresponding database +RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, + uint64_t from, size_t limit, + bool includeSystem, + VPackBuilder& builder) { + uint64_t lastTick = from; + std::unique_ptr handler( + new WBReader(vocbase, from, limit, includeSystem, builder)); + std::unique_ptr iterator; // reader(); + rocksdb::Status s = static_cast(globalRocksDB()) + ->GetUpdatesSince(from, &iterator); + if (!s.ok()) { // TODO do something? + auto converted = convertStatus(s); + return {converted.errorNumber(), lastTick}; + } + + while (iterator->Valid() && limit > 0) { + s = iterator->status(); + if (s.ok()) { + rocksdb::BatchResult batch = iterator->GetBatch(); + lastTick = batch.sequence; + s = batch.writeBatchPtr->Iterate(handler.get()); + } + if (!s.ok()) { + LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan"; + LOG_TOPIC(ERR, Logger::ENGINES) << iterator->status().getState(); + auto converted = convertStatus(s); + return {converted.errorNumber(), lastTick}; + } + + iterator->Next(); + } + + return {TRI_ERROR_NO_ERROR, lastTick}; +} diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.h b/arangod/RocksDBEngine/RocksDBReplicationTailing.h new file mode 100644 index 0000000000..8e39527a58 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.h @@ -0,0 +1,46 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Daniel H. Larkin +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGO_ROCKSDB_ROCKSDB_REPLICATION_TAILING_H +#define ARANGO_ROCKSDB_ROCKSDB_REPLICATION_TAILING_H 1 + +#include "Basics/Common.h" +#include "RocksDBEngine/RocksDBReplicationCommon.h" +#include "VocBase/vocbase.h" + +#include +#include + +namespace arangodb { +namespace rocksutils { + +// iterates over WAL starting at 'from' and returns up to 'limit' documents +// from the corresponding database; releases dumping resources +RocksDBReplicationResult tailWal(TRI_vocbase_t* vocbase, uint64_t from, + size_t limit, bool includeSystem, + VPackBuilder& builder); + +} // namespace rocksutils +} // namespace arangodb + +#endif