1
0
Fork 0

fix revision id vs local document id usage in incremental sync (#3916)

This commit is contained in:
Jan 2017-12-07 09:24:20 +01:00 committed by Frank Celler
parent 13ae255d5d
commit 4104296fd3
5 changed files with 130 additions and 87 deletions

View File

@ -712,12 +712,19 @@ int64_t DatabaseInitialSyncer::getSize(arangodb::LogicalCollection* col) {
Result res = trx.begin(); Result res = trx.begin();
if (!res.ok()) { if (res.fail()) {
return -1; return -1;
} }
auto document = trx.documentCollection(); OperationResult result = trx.count(col->name(), false);
return static_cast<int64_t>(document->numberDocuments(&trx)); if (result.result.fail()) {
return -1;
}
VPackSlice s = result.slice();
if (!s.isNumber()) {
return -1;
}
return s.getNumber<int64_t>();
} }
/// @brief handle the information about a collection /// @brief handle the information about a collection
@ -899,7 +906,7 @@ Result DatabaseInitialSyncer::handleCollection(VPackSlice const& parameters,
} }
Result res; Result res;
if (incremental && getSize(col) > 0) { if (incremental && getSize(col) > 0) {
res = handleCollectionSync(col, StringUtils::itoa(masterCid), masterName, _masterInfo._lastLogTick); res = handleCollectionSync(col, StringUtils::itoa(masterCid), masterName, _masterInfo._lastLogTick);
} else { } else {

View File

@ -128,8 +128,8 @@ bool RocksDBAllIndexIterator::nextDocument(
} }
while (limit > 0) { while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key()); TRI_voc_rid_t documentId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key());
cb(LocalDocumentId(revisionId), VPackSlice(_iterator->value().data())); cb(LocalDocumentId(documentId), VPackSlice(_iterator->value().data()));
--limit; --limit;
if (_reverse) { if (_reverse) {
@ -253,8 +253,8 @@ bool RocksDBAnyIndexIterator::nextDocument(
} }
while (limit > 0) { while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key()); TRI_voc_rid_t documentId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key());
cb(LocalDocumentId(revisionId), VPackSlice(_iterator->value().data())); cb(LocalDocumentId(documentId), VPackSlice(_iterator->value().data()));
--limit; --limit;
_returned++; _returned++;
_iterator->Next(); _iterator->Next();
@ -317,8 +317,8 @@ bool RocksDBSortedAllIterator::next(LocalDocumentIdCallback const& cb, size_t li
} }
while (limit > 0) { while (limit > 0) {
LocalDocumentId token(RocksDBValue::revisionId(_iterator->value())); LocalDocumentId documentId(RocksDBValue::revisionId(_iterator->value()));
cb(token); cb(documentId);
--limit; --limit;
@ -331,35 +331,6 @@ bool RocksDBSortedAllIterator::next(LocalDocumentIdCallback const& cb, size_t li
return true; return true;
} }
/// special method to expose the document key for incremental replication
bool RocksDBSortedAllIterator::nextWithKey(TokenKeyCallback const& cb,
size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key()));
#endif
LocalDocumentId token(RocksDBValue::revisionId(_iterator->value()));
StringRef key = RocksDBKey::primaryKey(_iterator->key());
cb(token, key);
--limit;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
return true;
}
void RocksDBSortedAllIterator::seek(StringRef const& key) { void RocksDBSortedAllIterator::seek(StringRef const& key) {
TRI_ASSERT(_trx->state()->isRunning()); TRI_ASSERT(_trx->state()->isRunning());
// don't want to get the index pointer just for this // don't want to get the index pointer just for this

View File

@ -45,9 +45,6 @@ class RocksDBPrimaryIndex;
/// basically sorted after revision ID /// basically sorted after revision ID
class RocksDBAllIndexIterator final : public IndexIterator { class RocksDBAllIndexIterator final : public IndexIterator {
public: public:
typedef std::function<void(LocalDocumentId const& token,
StringRef const& key)>
TokenKeyCallback;
RocksDBAllIndexIterator(LogicalCollection* collection, RocksDBAllIndexIterator(LogicalCollection* collection,
transaction::Methods* trx, transaction::Methods* trx,
ManagedDocumentResult* mmdr, ManagedDocumentResult* mmdr,
@ -104,9 +101,6 @@ class RocksDBAnyIndexIterator final : public IndexIterator {
/// into the document store. E.g. used for incremental sync /// into the document store. E.g. used for incremental sync
class RocksDBSortedAllIterator final : public IndexIterator { class RocksDBSortedAllIterator final : public IndexIterator {
public: public:
typedef std::function<void(LocalDocumentId const& token,
StringRef const& key)>
TokenKeyCallback;
RocksDBSortedAllIterator(LogicalCollection* collection, RocksDBSortedAllIterator(LogicalCollection* collection,
transaction::Methods* trx, transaction::Methods* trx,
ManagedDocumentResult* mmdr, ManagedDocumentResult* mmdr,
@ -120,7 +114,6 @@ class RocksDBSortedAllIterator final : public IndexIterator {
void reset() override; void reset() override;
// engine specific optimizations // engine specific optimizations
bool nextWithKey(TokenKeyCallback const& cb, size_t limit);
void seek(StringRef const& key); void seek(StringRef const& key);
private: private:

View File

@ -374,36 +374,23 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
} }
} }
auto cb = [&](LocalDocumentId const& documentId, StringRef const& key) { auto cb = [&](LocalDocumentId const& documentId, VPackSlice slice) {
TRI_voc_rid_t revisionId = 0;
VPackSlice key;
transaction::helpers::extractKeyAndRevFromDocument(slice, key, revisionId);
TRI_ASSERT(key.isString());
b.openArray(); b.openArray();
b.add(VPackValuePair(key.data(), key.size(), VPackValueType::String)); b.add(key);
///////////////////////////////////////////////////////////////////////////////////////////////////// b.add(VPackValue(TRI_RidToString(revisionId)));
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
b.add(VPackValue(std::to_string(documentId.id()))); // TODO: must return the revision here
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
b.close(); b.close();
}; };
b.openArray(); b.openArray();
// chunkSize is going to be ignored here // chunkSize is going to be ignored here
try { try {
_hasMore = primary->nextWithKey(cb, chunkSize); _hasMore = primary->nextDocument(cb, chunkSize);
_lastIteratorOffset++; _lastIteratorOffset++;
} catch (std::exception const&) { } catch (std::exception const&) {
return rv.reset(TRI_ERROR_INTERNAL); return rv.reset(TRI_ERROR_INTERNAL);

View File

@ -78,7 +78,7 @@ const compare = function(masterFunc, slaveInitFunc, slaveCompareFunc, incrementa
connectToSlave(); connectToSlave();
slaveInitFunc(state); slaveInitFunc(state);
internal.wait(1, false); internal.wait(0.1, false);
var syncResult = replication.syncCollection(cn, { var syncResult = replication.syncCollection(cn, {
endpoint: masterEndpoint, endpoint: masterEndpoint,
@ -313,7 +313,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -372,7 +377,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._createEdgeCollection(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 100; ++i) { for (var i = 0; i < 100; ++i) {
c.save(cn + "/test" + i, cn + "/test" + (i % 10), { c.save(cn + "/test" + i, cn + "/test" + (i % 10), {
@ -427,7 +437,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._createEdgeCollection(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 200; ++i) { for (var i = 0; i < 200; ++i) {
c.save( c.save(
@ -545,7 +560,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
}, },
function(state) { function(state) {
assertEqual(state.count, collectionCount(cn)); assertEqual(state.count, collectionCount(cn));
@ -579,7 +599,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
}, },
function(state) { function(state) {
assertEqual(state.count, collectionCount(cn)); assertEqual(state.count, collectionCount(cn));
@ -607,7 +632,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -641,7 +671,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -681,7 +716,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 500; ++i) { for (var i = 0; i < 500; ++i) {
c.save({ c.save({
@ -721,7 +761,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 500; ++i) { for (var i = 0; i < 500; ++i) {
c.save({ c.save({
@ -761,7 +806,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 6000; ++i) { for (var i = 0; i < 6000; ++i) {
c.save({ c.save({
@ -801,7 +851,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 6000; ++i) { for (var i = 0; i < 6000; ++i) {
c.save({ c.save({
@ -842,7 +897,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -884,7 +944,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -925,7 +990,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -965,7 +1035,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -1007,7 +1082,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({
@ -1051,7 +1131,12 @@ function BaseTestConfig() {
}, },
function(state) { function(state) {
// already create the collection on the slave // already create the collection on the slave
var c = db._create(cn); replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) { for (var i = 0; i < 5000; ++i) {
c.save({ c.save({