1
0
Fork 0

fixed incremental replication

This commit is contained in:
Simon Grätzer 2017-05-31 18:08:05 +02:00
parent 36d440767b
commit 72661d85c3
11 changed files with 290 additions and 168 deletions

View File

@ -2332,13 +2332,13 @@ int MMFilesCollection::cleanupIndexes() {
}
std::unique_ptr<IndexIterator> MMFilesCollection::getAllIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) {
transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) const {
return std::unique_ptr<IndexIterator>(
primaryIndex()->allIterator(trx, mdr, reverse));
}
std::unique_ptr<IndexIterator> MMFilesCollection::getAnyIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) {
transaction::Methods* trx, ManagedDocumentResult* mdr) const {
return std::unique_ptr<IndexIterator>(primaryIndex()->anyIterator(trx, mdr));
}

View File

@ -279,9 +279,9 @@ class MMFilesCollection final : public PhysicalCollection {
std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx,
ManagedDocumentResult* mdr,
bool reverse) override;
bool reverse) const override;
std::unique_ptr<IndexIterator> getAnyIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) override;
transaction::Methods* trx, ManagedDocumentResult* mdr) const override;
void invokeOnAllElements(
transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) override;

View File

@ -396,7 +396,8 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
transaction::Methods* trx, arangodb::velocypack::Slice const& info,
bool& created) {
// prevent concurrent dropping
bool isLocked = trx->isLocked(_logicalCollection, AccessMode::Type::EXCLUSIVE);
bool isLocked =
trx->isLocked(_logicalCollection, AccessMode::Type::EXCLUSIVE);
CONDITIONAL_WRITE_LOCKER(guard, _exclusiveLock, !isLocked);
std::shared_ptr<Index> idx;
{
@ -606,22 +607,29 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) {
}
std::unique_ptr<IndexIterator> RocksDBCollection::getAllIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) {
return std::unique_ptr<IndexIterator>(
new RocksDBAllIndexIterator(_logicalCollection, trx, mdr, primaryIndex(), reverse));
transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) const {
return std::unique_ptr<IndexIterator>(new RocksDBAllIndexIterator(
_logicalCollection, trx, mdr, primaryIndex(), reverse));
}
std::unique_ptr<IndexIterator> RocksDBCollection::getAnyIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) {
return std::unique_ptr<IndexIterator>(
new RocksDBAnyIndexIterator(_logicalCollection, trx, mdr, primaryIndex()));
transaction::Methods* trx, ManagedDocumentResult* mdr) const {
return std::unique_ptr<IndexIterator>(new RocksDBAnyIndexIterator(
_logicalCollection, trx, mdr, primaryIndex()));
}
std::unique_ptr<IndexIterator> RocksDBCollection::getSortedAllIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) const {
return std::unique_ptr<RocksDBSortedAllIterator>(new RocksDBSortedAllIterator(
_logicalCollection, trx, mdr, primaryIndex()));
}
void RocksDBCollection::invokeOnAllElements(
transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) {
ManagedDocumentResult mmdr;
std::unique_ptr<IndexIterator> cursor(this->getAllIterator(trx, &mmdr, false));
std::unique_ptr<IndexIterator> cursor(
this->getAllIterator(trx, &mmdr, false));
bool cnt = true;
auto cb = [&](DocumentIdentifierToken token) {
if (cnt) {
@ -645,21 +653,23 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// delete documents
RocksDBMethods* mthd;
mthd = state->rocksdbMethods();
RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId());
rocksdb::Comparator const * cmp = RocksDBColumnFamily::documents()->GetComparator();
rocksdb::Comparator const* cmp =
RocksDBColumnFamily::documents()->GetComparator();
rocksdb::ReadOptions ro = mthd->readOptions();
rocksdb::Slice const end = documentBounds.end();
ro.iterate_upper_bound = &end;
std::unique_ptr<rocksdb::Iterator> iter = mthd->NewIterator(ro, RocksDBColumnFamily::documents());
std::unique_ptr<rocksdb::Iterator> iter =
mthd->NewIterator(ro, RocksDBColumnFamily::documents());
iter->Seek(documentBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key()));
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
@ -734,9 +744,9 @@ bool RocksDBCollection::readDocument(transaction::Methods* trx,
}
// read using a token, bypassing the cache
bool RocksDBCollection::readDocumentNoCache(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result) {
bool RocksDBCollection::readDocumentNoCache(
transaction::Methods* trx, DocumentIdentifierToken const& token,
ManagedDocumentResult& result) {
// TODO: why do we have read(), readDocument() and lookupKey()?
auto tkn = static_cast<RocksDBToken const*>(&token);
TRI_voc_rid_t revisionId = tkn->revisionId();
@ -899,7 +909,7 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
return TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES;
}
}
VPackSlice const newDoc(builder->slice());
RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
@ -917,9 +927,9 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
TRI_ASSERT(!mdr.empty());
// report document and key size
RocksDBOperationResult result = state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_UPDATE,
newDoc.byteSize(), res.keySize());
RocksDBOperationResult result = state->addOperation(
_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_UPDATE, newDoc.byteSize(), res.keySize());
// transaction size limit reached -- fail
if (result.fail()) {
@ -1000,21 +1010,20 @@ int RocksDBCollection::replace(
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId, StringRef(),
TRI_VOC_DOCUMENT_OPERATION_REPLACE);
VPackSlice const newDoc(builder->slice());
RocksDBOperationResult opResult =
updateDocument(trx, oldRevisionId, oldDoc, revisionId,
newDoc, options.waitForSync);
RocksDBOperationResult opResult = updateDocument(
trx, oldRevisionId, oldDoc, revisionId, newDoc, options.waitForSync);
if (opResult.ok()) {
mdr.setManaged(newDoc.begin(), revisionId);
TRI_ASSERT(!mdr.empty());
// report document and key size
RocksDBOperationResult result = state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REPLACE,
newDoc.byteSize(),
opResult.keySize());
RocksDBOperationResult result =
state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REPLACE,
newDoc.byteSize(), opResult.keySize());
// transaction size limit reached -- fail
if (result.fail()) {
@ -1211,9 +1220,9 @@ arangodb::Result RocksDBCollection::fillIndexes(
ManagedDocumentResult mmdr;
RocksDBIndex* ridx = static_cast<RocksDBIndex*>(added.get());
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
std::unique_ptr<IndexIterator> it(
new RocksDBAllIndexIterator(_logicalCollection, trx, &mmdr, primaryIndex(), false));
std::unique_ptr<IndexIterator> it(new RocksDBAllIndexIterator(
_logicalCollection, trx, &mmdr, primaryIndex(), false));
rocksdb::TransactionDB* db = globalRocksDB();
uint64_t numDocsWritten = 0;
// write batch will be reset each 5000 documents
@ -1333,11 +1342,11 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
res.keySize(key.string().size());
return res;
}
/*LOG_TOPIC(ERR, Logger::FIXME) << "PUT rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId
<< " name: " << _logicalCollection->name();*/
/*LOG_TOPIC(ERR, Logger::FIXME)
<< "PUT rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId << " name: " << _logicalCollection->name();*/
RocksDBOperationResult innerRes;
READ_LOCKER(guard, _indexesLock);
@ -1390,17 +1399,18 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
// Simon: actually we do, because otherwise the counter recovery is broken
// if (!isUpdate) {
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
RocksDBOperationResult res = mthd->Delete(RocksDBColumnFamily::documents(), key);
RocksDBOperationResult res =
mthd->Delete(RocksDBColumnFamily::documents(), key);
if (!res.ok()) {
return res;
}
//}
/*LOG_TOPIC(ERR, Logger::FIXME) << "Delete rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId
<< " name: " << _logicalCollection->name();*/
/*LOG_TOPIC(ERR, Logger::FIXME)
<< "Delete rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId << " name: " << _logicalCollection->name();*/
RocksDBOperationResult resInner;
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
@ -1488,8 +1498,7 @@ Result RocksDBCollection::lookupDocumentToken(transaction::Methods* trx,
arangodb::Result RocksDBCollection::lookupRevisionVPack(
TRI_voc_rid_t revisionId, transaction::Methods* trx,
arangodb::ManagedDocumentResult& mdr,
bool withCache) const {
arangodb::ManagedDocumentResult& mdr, bool withCache) const {
TRI_ASSERT(trx->state()->isRunning());
TRI_ASSERT(_objectId != 0);
@ -1527,10 +1536,10 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
mdr.setManaged(std::move(value), revisionId);
} else {
LOG_TOPIC(ERR, Logger::FIXME) << "NOT FOUND rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId
<< " name: " << _logicalCollection->name();
LOG_TOPIC(ERR, Logger::FIXME)
<< "NOT FOUND rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId << " name: " << _logicalCollection->name();
mdr.reset();
}
return res;
@ -1660,7 +1669,7 @@ uint64_t RocksDBCollection::recalculateCounts() {
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
rocksdb::ReadOptions readOptions;
readOptions.fill_cache = false;

View File

@ -38,7 +38,7 @@ class Transaction;
namespace arangodb {
namespace cache {
class Cache;
class Cache;
}
class LogicalCollection;
class ManagedDocumentResult;
@ -106,9 +106,12 @@ class RocksDBCollection final : public PhysicalCollection {
bool dropIndex(TRI_idx_iid_t iid) override;
std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx,
ManagedDocumentResult* mdr,
bool reverse) override;
bool reverse) const override;
std::unique_ptr<IndexIterator> getAnyIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) override;
transaction::Methods* trx, ManagedDocumentResult* mdr) const override;
std::unique_ptr<IndexIterator> getSortedAllIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) const;
void invokeOnAllElements(
transaction::Methods* trx,
@ -134,10 +137,10 @@ class RocksDBCollection final : public PhysicalCollection {
bool readDocument(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result) override;
bool readDocumentNoCache(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result);
DocumentIdentifierToken const& token,
ManagedDocumentResult& result);
int insert(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const newSlice,
@ -219,7 +222,8 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::RocksDBOperationResult removeDocument(
arangodb::transaction::Methods* trx, TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc, bool isUpdate, bool& waitForSync) const;
arangodb::velocypack::Slice const& doc, bool isUpdate,
bool& waitForSync) const;
arangodb::RocksDBOperationResult lookupDocument(
transaction::Methods* trx, arangodb::velocypack::Slice key,

View File

@ -129,38 +129,36 @@ void RocksDBExportCursor::dump(VPackBuilder& builder) {
builder.add("result", VPackValue(VPackValueType::Array));
size_t const n = batchSize();
auto cb = [&, this](DocumentIdentifierToken const& token) {
auto cb = [&, this](ManagedDocumentResult const& mdr) {
if (_position == _size) {
return false;
}
if (_collection->readDocument(_trx.get(), token, _mdr)) {
builder.openObject();
VPackSlice const slice(_mdr.vpack());
// Copy over shaped values
for (auto const& entry : VPackObjectIterator(slice)) {
std::string key(entry.key.copyString());
builder.openObject();
VPackSlice const slice(mdr.vpack());
// Copy over shaped values
for (auto const& entry : VPackObjectIterator(slice)) {
std::string key(entry.key.copyString());
if (!CollectionExport::IncludeAttribute(restrictionType,
_restrictions.fields, key)) {
// Ignore everything that should be excluded or not included
continue;
}
// If we get here we need this entry in the final result
if (entry.value.isCustom()) {
builder.add(key,
VPackValue(builder.options->customTypeHandler->toString(
entry.value, builder.options, slice)));
} else {
builder.add(key, entry.value);
}
if (!CollectionExport::IncludeAttribute(restrictionType,
_restrictions.fields, key)) {
// Ignore everything that should be excluded or not included
continue;
}
// If we get here we need this entry in the final result
if (entry.value.isCustom()) {
builder.add(key,
VPackValue(builder.options->customTypeHandler->toString(
entry.value, builder.options, slice)));
} else {
builder.add(key, entry.value);
}
builder.close();
_position++;
}
builder.close();
_position++;
return true;
};
_iter->next(cb, n);
_iter->nextDocument(cb, n);
builder.close(); // close Array

View File

@ -25,10 +25,10 @@
#define ARANGOD_ROCKSDB_ROCKSDB_EXPORT_CURSOR_H 1
#include "Basics/Common.h"
#include "Utils/Cursor.h"
#include "Utils/CollectionExport.h"
#include "Utils/CollectionGuard.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/Cursor.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
@ -38,7 +38,7 @@ namespace velocypack {
class Slice;
}
namespace transaction {
class Methods;
class Methods;
}
class IndexIterator;
@ -46,8 +46,8 @@ class IndexIterator;
class RocksDBExportCursor final : public Cursor {
public:
RocksDBExportCursor(TRI_vocbase_t*, std::string const&,
CollectionExport::Restrictions const&, CursorId, size_t, size_t,
double, bool);
CollectionExport::Restrictions const&, CursorId, size_t,
size_t, double, bool);
~RocksDBExportCursor();

View File

@ -5,11 +5,11 @@
#include "Indexes/IndexIterator.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "StorageEngine/PhysicalCollection.h"
#include "Transaction/Helpers.h"
#include "Utils/OperationOptions.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "StorageEngine/PhysicalCollection.h"
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
@ -46,8 +46,9 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
syncer._client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
errorMsg = "could not connect to master at " +
syncer._masterInfo._endpoint + ": " +
syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
@ -55,9 +56,10 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": HTTP " + basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
@ -66,16 +68,16 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": response is no array";
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const responseBody = builder->slice();
if (!responseBody.isArray()) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": response is no array";
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
@ -104,7 +106,8 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
size_t const numKeys = static_cast<size_t>(responseBody.length());
if (numKeys == 0) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint +
": response contains an empty chunk. Collection: " +
collectionName + " Chunk: " + std::to_string(chunkId);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
@ -204,7 +207,7 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" +
std::to_string(chunkId) + "&chunkSize=" +
std::to_string(chunkSize) + "&low=" + lowString ;
std::to_string(chunkSize) + "&low=" + lowString;
progress = "fetching documents chunk " + std::to_string(chunkId) +
" for collection '" + collectionName + "' from " + url;
@ -214,11 +217,13 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::PUT, url,
keyJsonString.c_str(), keyJsonString.size()));
keyJsonString.c_str(),
keyJsonString.size()));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
errorMsg = "could not connect to master at " +
syncer._masterInfo._endpoint + ": " +
syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
@ -228,8 +233,8 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
@ -239,7 +244,8 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " +
std::string(syncer._masterInfo._endpoint) + ": response is no array";
std::string(syncer._masterInfo._endpoint) +
": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
@ -273,7 +279,8 @@ int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
if (!revSlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document revision is invalid";
syncer._masterInfo._endpoint +
": document revision is invalid";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
@ -407,8 +414,9 @@ int handleSyncKeysRocksDB(InitialSyncer& syncer,
std::string const highKey(highSlice.copyString());
LogicalCollection* coll = trx.documentCollection();
auto ph = static_cast<RocksDBCollection*>(coll->getPhysical());
std::unique_ptr<IndexIterator> iterator =
coll->getAllIterator(&trx, &mmdr, false);
ph->getSortedAllIterator(&trx, &mmdr);
iterator->next(
[&](DocumentIdentifierToken const& token) {
if (coll->readDocument(&trx, token, mmdr) == false) {
@ -541,8 +549,8 @@ int handleSyncKeysRocksDB(InitialSyncer& syncer,
TRI_ASSERT(!rangeUnequal || nextChunk); // A => B
if (nextChunk) { // we are out of range, see next chunk
if (rangeUnequal && currentChunkId < numChunks) {
int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey,
highKey, markers, errorMsg);
int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId,
lowKey, highKey, markers, errorMsg);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
@ -558,8 +566,9 @@ int handleSyncKeysRocksDB(InitialSyncer& syncer,
}
};
auto ph = static_cast<RocksDBCollection*>(col->getPhysical());
std::unique_ptr<IndexIterator> iterator =
col->getAllIterator(&trx, &mmdr, false);
ph->getSortedAllIterator(&trx, &mmdr);
iterator->next(
[&](DocumentIdentifierToken const& token) {
if (col->readDocument(&trx, token, mmdr) == false) {
@ -573,8 +582,8 @@ int handleSyncKeysRocksDB(InitialSyncer& syncer,
// we might have missed chunks, if the keys don't exist at all locally
while (currentChunkId < numChunks) {
int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey, highKey,
markers, errorMsg);
int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey,
highKey, markers, errorMsg);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}

View File

@ -40,8 +40,7 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
_bounds(RocksDBKeyBounds::CollectionDocuments(
static_cast<RocksDBCollection*>(col->getPhysical())->objectId())),
_iterator(),
_cmp(RocksDBColumnFamily::documents()->GetComparator()),
_index(index) {
_cmp(RocksDBColumnFamily::documents()->GetComparator()) {
// acquire rocksdb transaction
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
rocksdb::ColumnFamilyHandle* cf = RocksDBColumnFamily::documents();
@ -140,25 +139,6 @@ bool RocksDBAllIndexIterator::nextDocument(
return true;
}
void RocksDBAllIndexIterator::seek(StringRef const& key) {
TRI_ASSERT(_trx->state()->isRunning());
RocksDBToken token = _index->lookupKey(_trx, key);
if (!token.revisionId()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
// don't want to get the index pointer just for this
uint64_t objectId = _bounds.objectId();
RocksDBKey val = RocksDBKey::Document(objectId, token.revisionId());
if (_reverse) {
_iterator->SeekForPrev(val.string());
} else {
_iterator->Seek(val.string());
TRI_ASSERT(_iterator->Valid());
TRI_ASSERT(RocksDBKey::revisionId(_iterator->key()) == token.revisionId());
}
}
void RocksDBAllIndexIterator::reset() {
TRI_ASSERT(_trx->state()->isRunning());
@ -274,3 +254,105 @@ void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); }
bool RocksDBAnyIndexIterator::outOfRange() const {
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}
// ================ Sorted All Iterator ==================
RocksDBSortedAllIterator::RocksDBSortedAllIterator(
LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index)
: IndexIterator(collection, trx, mmdr, index),
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())),
_iterator(),
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
_index(index),
#endif
_cmp(index->comparator()) {
// acquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
TRI_ASSERT(state != nullptr);
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
// intentional copy of the read options
auto options = mthds->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
TRI_ASSERT(options.prefix_same_as_start);
options.verify_checksums = false;
_iterator = mthds->NewIterator(options, index->columnFamily());
_iterator->Seek(_bounds.start());
}
bool RocksDBSortedAllIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}
bool RocksDBSortedAllIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
RocksDBToken token(RocksDBValue::revisionId(_iterator->value()));
cb(token);
--limit;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
return true;
}
/// special method to expose the document key for incremental replication
bool RocksDBSortedAllIterator::nextWithKey(TokenKeyCallback const& cb,
size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key()));
#endif
RocksDBToken token(RocksDBValue::revisionId(_iterator->value()));
StringRef key = RocksDBKey::primaryKey(_iterator->key());
cb(token, key);
--limit;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
return true;
}
void RocksDBSortedAllIterator::seek(StringRef const& key) {
TRI_ASSERT(_trx->state()->isRunning());
// don't want to get the index pointer just for this
uint64_t objectId = _bounds.objectId();
RocksDBKey val = RocksDBKey::PrimaryIndexValue(objectId, key);
_iterator->Seek(val.string());
TRI_ASSERT(_iterator->Valid());
}
void RocksDBSortedAllIterator::reset() {
TRI_ASSERT(_trx->state()->isRunning());
_iterator->Seek(_bounds.start());
}

View File

@ -61,7 +61,6 @@ class RocksDBAllIndexIterator final : public IndexIterator {
bool nextDocument(DocumentCallback const& cb, size_t limit) override;
void reset() override;
void seek(StringRef const& key);
private:
bool outOfRange() const;
@ -70,7 +69,6 @@ class RocksDBAllIndexIterator final : public IndexIterator {
RocksDBKeyBounds const _bounds;
std::unique_ptr<rocksdb::Iterator> _iterator;
rocksdb::Comparator const* _cmp;
RocksDBPrimaryIndex const* _index;
};
class RocksDBAnyIndexIterator final : public IndexIterator {
@ -100,6 +98,38 @@ class RocksDBAnyIndexIterator final : public IndexIterator {
uint64_t _total;
uint64_t _returned;
};
class RocksDBSortedAllIterator final : public IndexIterator {
public:
typedef std::function<void(DocumentIdentifierToken const& token,
StringRef const& key)>
TokenKeyCallback;
RocksDBSortedAllIterator(LogicalCollection* collection,
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
RocksDBPrimaryIndex const* index);
~RocksDBSortedAllIterator() {}
char const* typeName() const override { return "sorted-all-index-iterator"; }
bool next(TokenCallback const& cb, size_t limit) override;
void reset() override;
// engine specific optimizations
bool nextWithKey(TokenKeyCallback const& cb, size_t limit);
void seek(StringRef const& key);
private:
bool outOfRange() const;
RocksDBKeyBounds const _bounds;
std::unique_ptr<rocksdb::Iterator> _iterator;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
RocksDBPrimaryIndex const* _index;
#endif
rocksdb::Comparator const* _cmp;
};
}
#endif

View File

@ -98,8 +98,9 @@ int RocksDBReplicationContext::bindCollection(
}
_trx->addCollectionAtRuntime(collectionName);
_iter = _collection->getAllIterator(_trx.get(), &_mdr,
false); //_mdr is not used nor updated
_iter = static_cast<RocksDBCollection*>(_collection->getPhysical())
->getSortedAllIterator(_trx.get(),
&_mdr); //_mdr is not used nor updated
_currentTick = 1;
_hasMore = true;
}
@ -205,13 +206,9 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
uint64_t chunkSize) {
TRI_ASSERT(_trx);
TRI_ASSERT(_iter);
RocksDBAllIndexIterator* primary =
static_cast<RocksDBAllIndexIterator*>(_iter.get());
std::string lowKey;
VPackSlice highKey; // FIXME: no good keeping this
uint64_t hash = 0x012345678;
auto cb = [&](DocumentIdentifierToken const& token) {
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
@ -219,7 +216,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
// TODO: do something here?
return;
}
VPackSlice doc(_mdr.vpack());
highKey = doc.get(StaticStrings::KeyString);
// set type
@ -236,14 +233,14 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
b.openArray();
while (_hasMore) {
try {
_hasMore = primary->next(cb, chunkSize);
_hasMore = _iter->next(cb, chunkSize);
b.add(VPackValue(VPackValueType::Object));
b.add("low", VPackValue(lowKey));
b.add("high", VPackValue(highKey.copyString()));
b.add("hash", VPackValue(std::to_string(hash)));
b.close();
lowKey.clear();// reset string
lowKey.clear(); // reset string
} catch (std::exception const&) {
return Result(TRI_ERROR_INTERNAL);
}
@ -262,8 +259,8 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
std::string const& lowKey) {
TRI_ASSERT(_trx);
TRI_ASSERT(_iter);
RocksDBAllIndexIterator* primary =
static_cast<RocksDBAllIndexIterator*>(_iter.get());
RocksDBSortedAllIterator* primary =
static_cast<RocksDBSortedAllIterator*>(_iter.get());
// Position the iterator correctly
size_t from = chunk * chunkSize;
@ -287,26 +284,19 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
}
}
auto cb = [&](DocumentIdentifierToken const& token) {
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
if (!ok) {
// TODO: do something here?
return;
}
VPackSlice doc(_mdr.vpack());
VPackSlice key = doc.get(StaticStrings::KeyString);
auto cb = [&](DocumentIdentifierToken const& token, StringRef const& key) {
RocksDBToken const& rt = static_cast<RocksDBToken const&>(token);
b.openArray();
b.add(key);
b.add(VPackValue(std::to_string(_mdr.lastRevisionId())));
b.add(VPackValuePair(key.data(), key.size(), VPackValueType::String));
b.add(VPackValue(std::to_string(rt.revisionId())));
b.close();
};
b.openArray();
// chunkSize is going to be ignored here
try {
_hasMore = primary->next(cb, chunkSize);
_hasMore = primary->nextWithKey(cb, chunkSize);
_lastIteratorOffset++;
} catch (std::exception const&) {
return Result(TRI_ERROR_INTERNAL);
@ -322,16 +312,16 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(
VPackSlice const& ids) {
TRI_ASSERT(_trx);
TRI_ASSERT(_iter);
RocksDBAllIndexIterator* primary =
static_cast<RocksDBAllIndexIterator*>(_iter.get());
RocksDBSortedAllIterator* primary =
static_cast<RocksDBSortedAllIterator*>(_iter.get());
// Position the iterator must be reset to the beginning
// after calls to dumpKeys moved it forwards
size_t from = chunk * chunkSize;
if (from != _lastIteratorOffset) {
if (!lowKey.empty()) {
primary->seek(StringRef(lowKey));
_lastIteratorOffset = from;
primary->seek(StringRef(lowKey));
_lastIteratorOffset = from;
} else { // no low key supplied, we can not use seek
if (from == 0 || !_hasMore || from < _lastIteratorOffset) {
_iter->reset();
@ -433,8 +423,8 @@ RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
std::shared_ptr<transaction::StandaloneContext> ctx =
transaction::StandaloneContext::Create(vocbase);
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
ctx, {}, {}, {}, transactionOptions));
std::unique_ptr<transaction::Methods> trx(
new transaction::UserTransaction(ctx, {}, {}, {}, transactionOptions));
Result res = trx->begin();
if (!res.ok()) {
_guard.reset();

View File

@ -116,9 +116,9 @@ class PhysicalCollection {
virtual bool dropIndex(TRI_idx_iid_t iid) = 0;
virtual std::unique_ptr<IndexIterator> getAllIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) = 0;
transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) const = 0;
virtual std::unique_ptr<IndexIterator> getAnyIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) = 0;
transaction::Methods* trx, ManagedDocumentResult* mdr) const = 0;
virtual void invokeOnAllElements(
transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) = 0;