1
0
Fork 0

fix revision id vs local document id usage in incremental sync (#3887)

This commit is contained in:
Jan 2017-12-06 16:33:57 +01:00 committed by GitHub
parent 7dd439f203
commit 73b3c65153
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 130 additions and 87 deletions

View File

@ -712,12 +712,19 @@ int64_t DatabaseInitialSyncer::getSize(arangodb::LogicalCollection* col) {
Result res = trx.begin();
if (!res.ok()) {
if (res.fail()) {
return -1;
}
auto document = trx.documentCollection();
return static_cast<int64_t>(document->numberDocuments(&trx));
OperationResult result = trx.count(col->name(), false);
if (result.result.fail()) {
return -1;
}
VPackSlice s = result.slice();
if (!s.isNumber()) {
return -1;
}
return s.getNumber<int64_t>();
}
/// @brief handle the information about a collection
@ -899,7 +906,7 @@ Result DatabaseInitialSyncer::handleCollection(VPackSlice const& parameters,
}
Result res;
if (incremental && getSize(col) > 0) {
res = handleCollectionSync(col, StringUtils::itoa(masterCid), masterName, _masterInfo._lastLogTick);
} else {

View File

@ -128,8 +128,8 @@ bool RocksDBAllIndexIterator::nextDocument(
}
while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key());
cb(LocalDocumentId(revisionId), VPackSlice(_iterator->value().data()));
TRI_voc_rid_t documentId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key());
cb(LocalDocumentId(documentId), VPackSlice(_iterator->value().data()));
--limit;
if (_reverse) {
@ -253,8 +253,8 @@ bool RocksDBAnyIndexIterator::nextDocument(
}
while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key());
cb(LocalDocumentId(revisionId), VPackSlice(_iterator->value().data()));
TRI_voc_rid_t documentId = RocksDBKey::revisionId(RocksDBEntryType::Document, _iterator->key());
cb(LocalDocumentId(documentId), VPackSlice(_iterator->value().data()));
--limit;
_returned++;
_iterator->Next();
@ -317,8 +317,8 @@ bool RocksDBSortedAllIterator::next(LocalDocumentIdCallback const& cb, size_t li
}
while (limit > 0) {
LocalDocumentId token(RocksDBValue::revisionId(_iterator->value()));
cb(token);
LocalDocumentId documentId(RocksDBValue::revisionId(_iterator->value()));
cb(documentId);
--limit;
@ -331,35 +331,6 @@ bool RocksDBSortedAllIterator::next(LocalDocumentIdCallback const& cb, size_t li
return true;
}
/// special method to expose the document key for incremental replication
bool RocksDBSortedAllIterator::nextWithKey(TokenKeyCallback const& cb,
size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key()));
#endif
LocalDocumentId token(RocksDBValue::revisionId(_iterator->value()));
StringRef key = RocksDBKey::primaryKey(_iterator->key());
cb(token, key);
--limit;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
return true;
}
void RocksDBSortedAllIterator::seek(StringRef const& key) {
TRI_ASSERT(_trx->state()->isRunning());
// don't want to get the index pointer just for this

View File

@ -45,9 +45,6 @@ class RocksDBPrimaryIndex;
/// basically sorted after revision ID
class RocksDBAllIndexIterator final : public IndexIterator {
public:
typedef std::function<void(LocalDocumentId const& token,
StringRef const& key)>
TokenKeyCallback;
RocksDBAllIndexIterator(LogicalCollection* collection,
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
@ -104,9 +101,6 @@ class RocksDBAnyIndexIterator final : public IndexIterator {
/// into the document store. E.g. used for incremental sync
class RocksDBSortedAllIterator final : public IndexIterator {
public:
typedef std::function<void(LocalDocumentId const& token,
StringRef const& key)>
TokenKeyCallback;
RocksDBSortedAllIterator(LogicalCollection* collection,
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
@ -120,7 +114,6 @@ class RocksDBSortedAllIterator final : public IndexIterator {
void reset() override;
// engine specific optimizations
bool nextWithKey(TokenKeyCallback const& cb, size_t limit);
void seek(StringRef const& key);
private:

View File

@ -374,36 +374,23 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
}
}
auto cb = [&](LocalDocumentId const& documentId, StringRef const& key) {
auto cb = [&](LocalDocumentId const& documentId, VPackSlice slice) {
TRI_voc_rid_t revisionId = 0;
VPackSlice key;
transaction::helpers::extractKeyAndRevFromDocument(slice, key, revisionId);
TRI_ASSERT(key.isString());
b.openArray();
b.add(VPackValuePair(key.data(), key.size(), VPackValueType::String));
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
b.add(VPackValue(std::to_string(documentId.id()))); // TODO: must return the revision here
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
b.add(key);
b.add(VPackValue(TRI_RidToString(revisionId)));
b.close();
};
b.openArray();
// chunkSize is going to be ignored here
try {
_hasMore = primary->nextWithKey(cb, chunkSize);
_hasMore = primary->nextDocument(cb, chunkSize);
_lastIteratorOffset++;
} catch (std::exception const&) {
return rv.reset(TRI_ERROR_INTERNAL);

View File

@ -78,7 +78,7 @@ const compare = function(masterFunc, slaveInitFunc, slaveCompareFunc, incrementa
connectToSlave();
slaveInitFunc(state);
internal.wait(1, false);
internal.wait(0.1, false);
var syncResult = replication.syncCollection(cn, {
endpoint: masterEndpoint,
@ -313,7 +313,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -372,7 +377,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._createEdgeCollection(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 100; ++i) {
c.save(cn + "/test" + i, cn + "/test" + (i % 10), {
@ -427,7 +437,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._createEdgeCollection(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 200; ++i) {
c.save(
@ -545,7 +560,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
},
function(state) {
assertEqual(state.count, collectionCount(cn));
@ -579,7 +599,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
},
function(state) {
assertEqual(state.count, collectionCount(cn));
@ -607,7 +632,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -641,7 +671,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -681,7 +716,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 500; ++i) {
c.save({
@ -721,7 +761,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 500; ++i) {
c.save({
@ -761,7 +806,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 6000; ++i) {
c.save({
@ -801,7 +851,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 6000; ++i) {
c.save({
@ -842,7 +897,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -884,7 +944,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -925,7 +990,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -965,7 +1035,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -1007,7 +1082,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({
@ -1051,7 +1131,12 @@ function BaseTestConfig() {
},
function(state) {
// already create the collection on the slave
var c = db._create(cn);
replication.syncCollection(cn, {
endpoint: masterEndpoint,
incremental: false
});
var c = db._collection(cn);
c.truncate(); // but empty it
for (var i = 0; i < 5000; ++i) {
c.save({