1
0
Fork 0

Use physical collection directly (#6526)

This commit is contained in:
Simon 2018-09-18 14:55:58 +02:00 committed by Jan
parent f8aae2811d
commit 611e4a7010
1 changed files with 88 additions and 47 deletions

View File

@ -48,7 +48,7 @@ namespace arangodb {
// remove all keys that are below first remote key or beyond last remote key
Result removeKeysOutsideRange(VPackSlice chunkSlice,
LogicalCollection* col,
LogicalCollection* coll,
OperationOptions& options,
InitialSyncerIncrementalSyncStats& stats) {
size_t const numChunks = chunkSlice.length();
@ -59,8 +59,8 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
}
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(col->vocbase()),
*col,
transaction::StandaloneContext::Create(coll->vocbase()),
*coll,
AccessMode::Type::EXCLUSIVE
);
@ -69,6 +69,8 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
// turn on intermediate commits as the number of keys to delete can be huge here
trx.addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
RocksDBCollection *physical = static_cast<RocksDBCollection*>(coll->getPhysical());
Result res = trx.begin();
if (!res.ok()) {
@ -92,11 +94,13 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
TRI_ASSERT(highSlice.isString());
StringRef highRef(highSlice);
LogicalCollection* coll = trx.documentCollection();
auto iterator = createPrimaryIndexIterator(&trx, coll);
VPackBuilder builder;
ManagedDocumentResult mdr;
TRI_voc_tick_t tick;
TRI_voc_rid_t prevRev, revisionId;
// remove everything from the beginning of the key range until the lowest
// remote key
iterator.next(
@ -106,7 +110,12 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
builder.clear();
builder.add(velocypack::ValuePair(docKey.data(), docKey.size(),
velocypack::ValueType::String));
trx.remove(col->name(), builder.slice(), options);
Result r = physical->remove(&trx, builder.slice(), mdr, options,
tick, false, prevRev, revisionId);
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
// ignore not found, we remove conflicting docs ahead of time
THROW_ARANGO_EXCEPTION(r);
}
++stats.numDocsRemoved;
// continue iteration
return true;
@ -118,7 +127,7 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
std::numeric_limits<std::uint64_t>::max());
// remove everything from the highest remote key until the end of the key range
auto index = col->lookupIndex(0); //RocksDBCollection->primaryIndex() is private
auto index = coll->lookupIndex(0); //RocksDBCollection->primaryIndex() is private
TRI_ASSERT(index->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX);
auto primaryIndex = static_cast<RocksDBPrimaryIndex*>(index.get());
@ -132,8 +141,13 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
if (docKey.compare(highRef) > 0) {
builder.clear();
builder.add(velocypack::ValuePair(docKey.data(), docKey.size(),
velocypack::ValueType::String));
trx.remove(col->name(), builder.slice(), options);
velocypack::ValueType::String));
Result r = physical->remove(&trx, builder.slice(), mdr, options,
tick, false, prevRev, revisionId);
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
// ignore not found, we remove conflicting docs ahead of time
THROW_ARANGO_EXCEPTION(r);
}
++stats.numDocsRemoved;
}
@ -161,8 +175,9 @@ Result syncChunkRocksDB(
std::string const baseUrl = replutils::ReplicationUrl + "/keys";
TRI_voc_tick_t const chunkSize = 5000;
std::string const& collectionName = trx->documentCollection()->name();
RocksDBCollection* physical = static_cast<RocksDBCollection*>(trx->documentCollection()->getPhysical());
LogicalCollection* coll = trx->documentCollection();
std::string const& collectionName = coll->name();
RocksDBCollection* physical = static_cast<RocksDBCollection*>(coll->getPhysical());
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
@ -239,6 +254,11 @@ Result syncChunkRocksDB(
collectionName + " Chunk: " + std::to_string(chunkId));
}
TRI_ASSERT(numKeys > 0);
// state for RocksDBCollection insert/replace/remove
ManagedDocumentResult mdr, previous;
TRI_voc_tick_t resultTick;
TRI_voc_rid_t prevRev, revisionId;
transaction::BuilderLeaser keyBuilder(trx);
std::vector<size_t> toFetch;
@ -281,7 +301,14 @@ Result syncChunkRocksDB(
// we have a local key that is not present remotely
keyBuilder->clear();
keyBuilder->add(VPackValue(localKey));
trx->remove(collectionName, keyBuilder->slice(), options);
Result r = physical->remove(trx, keyBuilder->slice(), mdr, options,
resultTick, false, prevRev, revisionId);
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
// ignore not found, we remove conflicting docs ahead of time
return r;
}
++stats.numDocsRemoved;
++nextStart;
@ -325,7 +352,13 @@ Result syncChunkRocksDB(
// we have a local key that is not present remotely
keyBuilder->clear();
keyBuilder->add(VPackValue(localKey));
trx->remove(collectionName, keyBuilder->slice(), options);
Result r = physical->remove(trx, keyBuilder->slice(), mdr, options,
resultTick, false, prevRev, revisionId);
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
// ignore not found, we remove conflicting docs ahead of time
return r;
}
++stats.numDocsRemoved;
}
++nextStart;
@ -449,19 +482,12 @@ Result syncChunkRocksDB(
auto removeConflict =
[&](std::string const& conflictingKey) -> Result {
Result res(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
keyBuilder->clear();
keyBuilder->add(VPackValue(conflictingKey));
LocalDocumentId conflictId = physical->lookupKey(trx, keyBuilder->slice());
if (conflictId.isSet()) {
physical->readDocumentWithCallback(trx, conflictId, [&](LocalDocumentId const&, VPackSlice doc) {
res = trx->remove(collectionName, doc, options).result;
});
}
return res;
return physical->remove(trx, keyBuilder->slice(), mdr, options,
resultTick, false, prevRev, revisionId);
};
LocalDocumentId const documentId = physical->lookupKey(trx, keySlice);
@ -469,44 +495,49 @@ Result syncChunkRocksDB(
// INSERT
TRI_ASSERT(options.indexOperationMode == Index::OperationMode::internal);
OperationResult opRes = trx->insert(collectionName, it, options);
if (opRes.fail()) {
if (opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
opRes.errorMessage() > keySlice.copyString()) {
Result res = physical->insert(trx, it, mdr, options,
resultTick, false, revisionId);
if (res.fail()) {
if (res.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
res.errorMessage() > keySlice.copyString()) { // WTF ?!!
// remove conflict and retry
auto inner = removeConflict(opRes.errorMessage());
auto inner = removeConflict(res.errorMessage());
if (inner.fail()) {
return opRes.result;
return res;
}
opRes = trx->insert(collectionName, it, options);
if (opRes.fail()) {
return opRes.result;
res = physical->insert(trx, it, mdr, options,
resultTick, false, revisionId);
if (res.fail()) {
return res;
}
// fall-through
} else {
return opRes.result;
return res;
}
}
} else {
// REPLACE
TRI_ASSERT(options.indexOperationMode == Index::OperationMode::internal);
OperationResult opRes = trx->replace(collectionName, it, options);
if (opRes.fail()) {
if (opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
opRes.errorMessage() > keySlice.copyString()) {
Result res = physical->replace(trx, it, mdr, options, resultTick,
false, prevRev, previous);
if (res.fail()) {
if (res.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
res.errorMessage() > keySlice.copyString()) { // WTF!?
// remove conflict and retry
auto inner = removeConflict(opRes.errorMessage());
auto inner = removeConflict(res.errorMessage());
if (inner.fail()) {
return opRes.result;
return res;
}
opRes = trx->replace(collectionName, it, options);
if (opRes.fail()) {
return opRes.result;
res = physical->replace(trx, it, mdr, options, resultTick,
false, prevRev, previous);
if (res.fail()) {
return res;
}
// fall-through
} else {
return opRes.result;
return res;
}
}
}
@ -630,7 +661,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
trx.addHint(transaction::Hints::Hint::NO_INDEXING);
// turn on intermediate commits as the number of operations can be huge here
trx.addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
Result res = trx.begin();
if (!res.ok()) {
@ -643,6 +674,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
// The LogicalCollection is protected by trx.
// Neither it nor its indexes can be invalidated
RocksDBCollection *physical = static_cast<RocksDBCollection*>(col->getPhysical());
size_t currentChunkId = 0;
std::string lowKey;
@ -700,7 +732,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
VPackBuilder tempBuilder;
std::function<void(std::string, std::uint64_t)> compareChunk =
[&trx, &col, &options, &foundLowKey, &markers, &localHash, &hashString,
[&trx, &physical, &options, &foundLowKey, &markers, &localHash, &hashString,
&syncer, &currentChunkId, &numChunks, &keysId, &resetChunk,
&compareChunk, &lowKey, &highKey,
&tempBuilder, &stats](std::string const& docKey, std::uint64_t docRev) {
@ -711,7 +743,17 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
// smaller values than lowKey mean they don't exist remotely
tempBuilder.clear();
tempBuilder.add(VPackValue(docKey));
trx.remove(col->name(), tempBuilder.slice(), options);
ManagedDocumentResult previous;
TRI_voc_rid_t resultMarkerTick;
TRI_voc_rid_t prevRev, revisionId;
Result r = physical->remove(&trx, tempBuilder.slice(), previous, options,
resultMarkerTick, false, prevRev, revisionId);
if (r.fail() && r.isNot(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND)) {
// ignore not found, we remove conflicting docs ahead of time
THROW_ARANGO_EXCEPTION(r);
}
++stats.numDocsRemoved;
return;
}
@ -775,8 +817,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
}
}; // compare chunk - end
LogicalCollection* coll = trx.documentCollection();
auto iterator = createPrimaryIndexIterator(&trx, coll);
auto iterator = createPrimaryIndexIterator(&trx, col);
iterator.next(
[&](rocksdb::Slice const& rocksKey, rocksdb::Slice const& rocksValue) {
std::string docKey = RocksDBKey::primaryKey(rocksKey).toString();