From 4104296fd3c1b09c0dc3c3825cb8c79e9e2aa5b2 Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 7 Dec 2017 09:24:20 +0100 Subject: [PATCH] fix revision id vs local document id usage in incremental sync (#3916) --- arangod/Replication/DatabaseInitialSyncer.cpp | 15 ++- arangod/RocksDBEngine/RocksDBIterators.cpp | 41 +----- arangod/RocksDBEngine/RocksDBIterators.h | 7 - .../RocksDBReplicationContext.cpp | 33 ++--- .../tests/replication/replication-sync.js | 121 +++++++++++++++--- 5 files changed, 130 insertions(+), 87 deletions(-) diff --git a/arangod/Replication/DatabaseInitialSyncer.cpp b/arangod/Replication/DatabaseInitialSyncer.cpp index 38eedc685a..d5af91327e 100644 --- a/arangod/Replication/DatabaseInitialSyncer.cpp +++ b/arangod/Replication/DatabaseInitialSyncer.cpp @@ -712,12 +712,19 @@ int64_t DatabaseInitialSyncer::getSize(arangodb::LogicalCollection* col) { Result res = trx.begin(); - if (!res.ok()) { + if (res.fail()) { return -1; } - auto document = trx.documentCollection(); - return static_cast(document->numberDocuments(&trx)); + OperationResult result = trx.count(col->name(), false); + if (result.result.fail()) { + return -1; + } + VPackSlice s = result.slice(); + if (!s.isNumber()) { + return -1; + } + return s.getNumber(); } /// @brief handle the information about a collection @@ -899,7 +906,7 @@ Result DatabaseInitialSyncer::handleCollection(VPackSlice const& parameters, } Result res; - + if (incremental && getSize(col) > 0) { res = handleCollectionSync(col, StringUtils::itoa(masterCid), masterName, _masterInfo._lastLogTick); } else { diff --git a/arangod/RocksDBEngine/RocksDBIterators.cpp b/arangod/RocksDBEngine/RocksDBIterators.cpp index 6bf8173fa0..9b4694c067 100644 --- a/arangod/RocksDBEngine/RocksDBIterators.cpp +++ b/arangod/RocksDBEngine/RocksDBIterators.cpp @@ -128,8 +128,8 @@ bool RocksDBAllIndexIterator::nextDocument( } while (limit > 0) { - TRI_voc_rid_t revisionId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key()); - cb(LocalDocumentId(revisionId), VPackSlice(_iterator->value().data())); + TRI_voc_rid_t documentId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key()); + cb(LocalDocumentId(documentId), VPackSlice(_iterator->value().data())); --limit; if (_reverse) { @@ -253,8 +253,8 @@ bool RocksDBAnyIndexIterator::nextDocument( } while (limit > 0) { - TRI_voc_rid_t revisionId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key()); - cb(LocalDocumentId(revisionId), VPackSlice(_iterator->value().data())); + TRI_voc_rid_t documentId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key()); + cb(LocalDocumentId(documentId), VPackSlice(_iterator->value().data())); --limit; _returned++; _iterator->Next(); @@ -317,8 +317,8 @@ bool RocksDBSortedAllIterator::next(LocalDocumentIdCallback const& cb, size_t li } while (limit > 0) { - LocalDocumentId token(RocksDBValue::revisionId(_iterator->value())); - cb(token); + LocalDocumentId documentId(RocksDBValue::revisionId(_iterator->value())); + cb(documentId); --limit; @@ -331,35 +331,6 @@ bool RocksDBSortedAllIterator::next(LocalDocumentIdCallback const& cb, size_t li return true; } -/// special method to expose the document key for incremental replication -bool RocksDBSortedAllIterator::nextWithKey(TokenKeyCallback const& cb, - size_t limit) { - TRI_ASSERT(_trx->state()->isRunning()); - - if (limit == 0 || !_iterator->Valid() || outOfRange()) { - // No limit no data, or we are actually done. The last call should have - // returned false - TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken - return false; - } - - while (limit > 0) { -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key())); -#endif - LocalDocumentId token(RocksDBValue::revisionId(_iterator->value())); - StringRef key = RocksDBKey::primaryKey(_iterator->key()); - cb(token, key); - --limit; - - _iterator->Next(); - if (!_iterator->Valid() || outOfRange()) { - return false; - } - } - return true; -} - void RocksDBSortedAllIterator::seek(StringRef const& key) { TRI_ASSERT(_trx->state()->isRunning()); // don't want to get the index pointer just for this diff --git a/arangod/RocksDBEngine/RocksDBIterators.h b/arangod/RocksDBEngine/RocksDBIterators.h index 8c447bfae2..51c4c33815 100644 --- a/arangod/RocksDBEngine/RocksDBIterators.h +++ b/arangod/RocksDBEngine/RocksDBIterators.h @@ -45,9 +45,6 @@ class RocksDBPrimaryIndex; /// basically sorted after revision ID class RocksDBAllIndexIterator final : public IndexIterator { public: - typedef std::function - TokenKeyCallback; RocksDBAllIndexIterator(LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, @@ -104,9 +101,6 @@ class RocksDBAnyIndexIterator final : public IndexIterator { /// into the document store. E.g. used for incremental sync class RocksDBSortedAllIterator final : public IndexIterator { public: - typedef std::function - TokenKeyCallback; RocksDBSortedAllIterator(LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, @@ -120,7 +114,6 @@ class RocksDBSortedAllIterator final : public IndexIterator { void reset() override; // engine specific optimizations - bool nextWithKey(TokenKeyCallback const& cb, size_t limit); void seek(StringRef const& key); private: diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 5b7aae20be..649fc64971 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -374,36 +374,23 @@ arangodb::Result RocksDBReplicationContext::dumpKeys( } } - auto cb = [&](LocalDocumentId const& documentId, StringRef const& key) { + auto cb = [&](LocalDocumentId const& documentId, VPackSlice slice) { + TRI_voc_rid_t revisionId = 0; + VPackSlice key; + transaction::helpers::extractKeyAndRevFromDocument(slice, key, revisionId); + + TRI_ASSERT(key.isString()); + b.openArray(); - b.add(VPackValuePair(key.data(), key.size(), VPackValueType::String)); - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - b.add(VPackValue(std::to_string(documentId.id()))); // TODO: must return the revision here - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////////////// + b.add(key); + b.add(VPackValue(TRI_RidToString(revisionId))); b.close(); }; b.openArray(); // chunkSize is going to be ignored here try { - _hasMore = primary->nextWithKey(cb, chunkSize); + _hasMore = primary->nextDocument(cb, chunkSize); _lastIteratorOffset++; } catch (std::exception const&) { return rv.reset(TRI_ERROR_INTERNAL); diff --git a/js/server/tests/replication/replication-sync.js b/js/server/tests/replication/replication-sync.js index 13e9fd3c6a..393f0db4e2 100644 --- a/js/server/tests/replication/replication-sync.js +++ b/js/server/tests/replication/replication-sync.js @@ -78,7 +78,7 @@ const compare = function(masterFunc, slaveInitFunc, slaveCompareFunc, incrementa connectToSlave(); slaveInitFunc(state); - internal.wait(1, false); + internal.wait(0.1, false); var syncResult = replication.syncCollection(cn, { endpoint: masterEndpoint, @@ -313,7 +313,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -372,7 +377,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._createEdgeCollection(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 100; ++i) { c.save(cn + "/test" + i, cn + "/test" + (i % 10), { @@ -427,7 +437,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._createEdgeCollection(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 200; ++i) { c.save( @@ -545,7 +560,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it }, function(state) { assertEqual(state.count, collectionCount(cn)); @@ -579,7 +599,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it }, function(state) { assertEqual(state.count, collectionCount(cn)); @@ -607,7 +632,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -641,7 +671,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -681,7 +716,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 500; ++i) { c.save({ @@ -721,7 +761,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 500; ++i) { c.save({ @@ -761,7 +806,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 6000; ++i) { c.save({ @@ -801,7 +851,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 6000; ++i) { c.save({ @@ -842,7 +897,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -884,7 +944,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -925,7 +990,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -965,7 +1035,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -1007,7 +1082,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({ @@ -1051,7 +1131,12 @@ function BaseTestConfig() { }, function(state) { // already create the collection on the slave - var c = db._create(cn); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + incremental: false + }); + var c = db._collection(cn); + c.truncate(); // but empty it for (var i = 0; i < 5000; ++i) { c.save({