diff --git a/arangod/VocBase/replication-dump.cpp b/arangod/VocBase/replication-dump.cpp index 345097a7e3..40e3c62196 100644 --- a/arangod/VocBase/replication-dump.cpp +++ b/arangod/VocBase/replication-dump.cpp @@ -37,8 +37,8 @@ #include "Wal/Marker.h" #include -#include #include +#include #include using namespace arangodb; @@ -167,8 +167,8 @@ static TRI_replication_operation_e TranslateType( static int StringifyMarker(TRI_replication_dump_t* dump, TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, - TRI_df_marker_t const* marker, - bool isDump, bool withTicks, bool isEdgeCollection) { + TRI_df_marker_t const* marker, bool isDump, + bool withTicks, bool isEdgeCollection) { TRI_ASSERT(MustReplicateWalMarkerType(marker)); TRI_df_marker_type_t const type = marker->getType(); @@ -179,13 +179,16 @@ static int StringifyMarker(TRI_replication_dump_t* dump, // for debugging use the following // Append(dump, "\",\"typeName\":\""); // Append(dump, TRI_NameMarkerDatafile(marker)); - - if (dump->_compat28 && - (type == TRI_DF_MARKER_VPACK_DOCUMENT || type == TRI_DF_MARKER_VPACK_REMOVE)) { + + if (dump->_compat28 && (type == TRI_DF_MARKER_VPACK_DOCUMENT || + type == TRI_DF_MARKER_VPACK_REMOVE)) { // 2.8-compatible format - VPackSlice slice(reinterpret_cast(marker) + DatafileHelper::VPackOffset(type)); - arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer); - VPackDumper dumper(&adapter, &dump->_vpackOptions); // note: we need the CustomTypeHandler here + VPackSlice slice(reinterpret_cast(marker) + + DatafileHelper::VPackOffset(type)); + arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer); + VPackDumper dumper( + &adapter, + &dump->_vpackOptions); // note: we need the CustomTypeHandler here // additionally dump "key" and "rev" attributes on the top-level Append(dump, "\",\"key\":"); @@ -206,7 +209,7 @@ static int StringifyMarker(TRI_replication_dump_t* dump, Append(dump, "\",\"type\":"); Append(dump, static_cast(TranslateType(marker))); } - + if (type == TRI_DF_MARKER_VPACK_DOCUMENT || type == TRI_DF_MARKER_VPACK_REMOVE || type == TRI_DF_MARKER_VPACK_BEGIN_TRANSACTION || @@ -237,8 +240,7 @@ static int StringifyMarker(TRI_replication_dump_t* dump, } } } - } - else { + } else { // collection dump if (withTicks) { Append(dump, "{\"tick\":\""); @@ -248,12 +250,15 @@ static int StringifyMarker(TRI_replication_dump_t* dump, Append(dump, "{"); } - if (dump->_compat28 && - (type == TRI_DF_MARKER_VPACK_DOCUMENT || type == TRI_DF_MARKER_VPACK_REMOVE)) { + if (dump->_compat28 && (type == TRI_DF_MARKER_VPACK_DOCUMENT || + type == TRI_DF_MARKER_VPACK_REMOVE)) { // 2.8-compatible format - VPackSlice slice(reinterpret_cast(marker) + DatafileHelper::VPackOffset(type)); - arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer); - VPackDumper dumper(&adapter, &dump->_vpackOptions); // note: we need the CustomTypeHandler here + VPackSlice slice(reinterpret_cast(marker) + + DatafileHelper::VPackOffset(type)); + arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer); + VPackDumper dumper( + &adapter, + &dump->_vpackOptions); // note: we need the CustomTypeHandler here // additionally dump "key" and "rev" attributes on the top-level Append(dump, "\"key\":"); @@ -269,29 +274,31 @@ static int StringifyMarker(TRI_replication_dump_t* dump, } else { Append(dump, static_cast(TranslateType(marker))); } - } - else { + } else { Append(dump, "\"type\":"); Append(dump, static_cast(TranslateType(marker))); } } switch (type) { - case TRI_DF_MARKER_VPACK_DOCUMENT: - case TRI_DF_MARKER_VPACK_REMOVE: - case TRI_DF_MARKER_VPACK_CREATE_DATABASE: - case TRI_DF_MARKER_VPACK_CREATE_COLLECTION: + case TRI_DF_MARKER_VPACK_DOCUMENT: + case TRI_DF_MARKER_VPACK_REMOVE: + case TRI_DF_MARKER_VPACK_CREATE_DATABASE: + case TRI_DF_MARKER_VPACK_CREATE_COLLECTION: case TRI_DF_MARKER_VPACK_CREATE_INDEX: - case TRI_DF_MARKER_VPACK_RENAME_COLLECTION: - case TRI_DF_MARKER_VPACK_CHANGE_COLLECTION: - case TRI_DF_MARKER_VPACK_DROP_DATABASE: - case TRI_DF_MARKER_VPACK_DROP_COLLECTION: + case TRI_DF_MARKER_VPACK_RENAME_COLLECTION: + case TRI_DF_MARKER_VPACK_CHANGE_COLLECTION: + case TRI_DF_MARKER_VPACK_DROP_DATABASE: + case TRI_DF_MARKER_VPACK_DROP_COLLECTION: case TRI_DF_MARKER_VPACK_DROP_INDEX: { Append(dump, ",\"data\":"); - VPackSlice slice(reinterpret_cast(marker) + DatafileHelper::VPackOffset(type)); - arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer); - VPackDumper dumper(&adapter, &dump->_vpackOptions); // note: we need the CustomTypeHandler here + VPackSlice slice(reinterpret_cast(marker) + + DatafileHelper::VPackOffset(type)); + arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer); + VPackDumper dumper( + &adapter, + &dump->_vpackOptions); // note: we need the CustomTypeHandler here dumper.dump(slice); break; } @@ -350,7 +357,7 @@ static bool MustReplicateWalMarker( if (dump->_vocbase->id() != databaseId) { return false; } - + // finally check if the marker is for a collection that we want to ignore TRI_voc_cid_t cid = collectionId; @@ -362,14 +369,14 @@ static bool MustReplicateWalMarker( return false; } } - - if (dump->_restrictCollection > 0 && - (cid != dump->_restrictCollection && ! IsTransactionWalMarker(dump, marker))) { + + if (dump->_restrictCollection > 0 && + (cid != dump->_restrictCollection && + !IsTransactionWalMarker(dump, marker))) { // restrict output to a single collection, but a different one return false; } - if (marker->getTick() >= firstRegularTick) { return true; } @@ -393,7 +400,8 @@ static int DumpCollection(TRI_replication_dump_t* dump, TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax, bool withTicks) { - LOG(TRACE) << "dumping collection " << collection->cid() << ", tick range " << dataMin << " - " << dataMax; + LOG(TRACE) << "dumping collection " << collection->cid() << ", tick range " + << dataMin << " - " << dataMax; bool const isEdgeCollection = (collection->type() == TRI_COL_TYPE_EDGE); @@ -401,28 +409,32 @@ static int DumpCollection(TRI_replication_dump_t* dump, TRI_voc_tick_t lastFoundTick = 0; bool bufferFull = false; - auto callback = [&dump, &lastFoundTick, &databaseId, &collectionId, &withTicks, &isEdgeCollection, &bufferFull](TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker) { + auto callback = [&dump, &lastFoundTick, &databaseId, &collectionId, + &withTicks, &isEdgeCollection, &bufferFull]( + TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker) { // note the last tick we processed lastFoundTick = foundTick; - int res = StringifyMarker(dump, databaseId, collectionId, marker, true, withTicks, isEdgeCollection); + int res = StringifyMarker(dump, databaseId, collectionId, marker, true, + withTicks, isEdgeCollection); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } - - if (static_cast(TRI_LengthStringBuffer(dump->_buffer)) > dump->_chunkSize) { + + if (static_cast(TRI_LengthStringBuffer(dump->_buffer)) > + dump->_chunkSize) { // abort the iteration bufferFull = true; - return false; // stop iterating + return false; // stop iterating } - return true; // continue iterating + return true; // continue iterating }; try { bool hasMore = collection->applyForTickRange(dataMin, dataMax, callback); - + if (lastFoundTick > 0) { // data available for requested range dump->_lastFoundTick = lastFoundTick; @@ -452,7 +464,7 @@ int TRI_DumpCollectionReplication(TRI_replication_dump_t* dump, TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax, bool withTicks) { TRI_ASSERT(collection != nullptr); - + // get a custom type handler auto customTypeHandler = dump->_transactionContext->orderCustomTypeHandler(); dump->_vpackOptions.customTypeHandler = customTypeHandler.get(); @@ -470,7 +482,8 @@ int TRI_DumpCollectionReplication(TRI_replication_dump_t* dump, CompactionPreventer compactionPreventer(collection); try { - res = DumpCollection(dump, collection, collection->vocbase()->id(), collection->cid(), dataMin, dataMax, withTicks); + res = DumpCollection(dump, collection, collection->vocbase()->id(), + collection->cid(), dataMin, dataMax, withTicks); } catch (...) { res = TRI_ERROR_INTERNAL; } @@ -535,31 +548,31 @@ int TRI_DumpLogReplication( // end of datafile break; } - + TRI_df_marker_type_t type = marker->getType(); if (type <= TRI_DF_MARKER_MIN || type >= TRI_DF_MARKER_MAX) { break; } - + // handle special markers if (type == TRI_DF_MARKER_PROLOGUE) { lastDatabaseId = DatafileHelper::DatabaseId(marker); lastCollectionId = DatafileHelper::CollectionId(marker); - } - else if (type == TRI_DF_MARKER_HEADER || type == TRI_DF_MARKER_FOOTER) { + } else if (type == TRI_DF_MARKER_HEADER || + type == TRI_DF_MARKER_FOOTER) { lastDatabaseId = 0; lastCollectionId = 0; - } - else if (type == TRI_DF_MARKER_VPACK_CREATE_COLLECTION) { + } else if (type == TRI_DF_MARKER_VPACK_CREATE_COLLECTION) { // fill collection name cache TRI_voc_tick_t databaseId = DatafileHelper::DatabaseId(marker); TRI_ASSERT(databaseId != 0); TRI_voc_cid_t collectionId = DatafileHelper::CollectionId(marker); TRI_ASSERT(collectionId != 0); - + if (dump->_vocbase->id() == databaseId) { - VPackSlice slice(reinterpret_cast(marker) + DatafileHelper::VPackOffset(type)); + VPackSlice slice(reinterpret_cast(marker) + + DatafileHelper::VPackOffset(type)); VPackSlice name = slice.get("name"); if (name.isString()) { dump->_collectionNames[collectionId] = name.copyString(); @@ -585,21 +598,21 @@ int TRI_DumpLogReplication( break; } } - + TRI_voc_tick_t databaseId; TRI_voc_cid_t collectionId; - - if (type == TRI_DF_MARKER_VPACK_DOCUMENT || type == TRI_DF_MARKER_VPACK_REMOVE) { + + if (type == TRI_DF_MARKER_VPACK_DOCUMENT || + type == TRI_DF_MARKER_VPACK_REMOVE) { databaseId = lastDatabaseId; collectionId = lastCollectionId; - } - else { + } else { databaseId = DatafileHelper::DatabaseId(marker); collectionId = DatafileHelper::CollectionId(marker); } - - if (!MustReplicateWalMarker(dump, marker, databaseId, collectionId, firstRegularTick, - transactionIds)) { + + if (!MustReplicateWalMarker(dump, marker, databaseId, collectionId, + firstRegularTick, transactionIds)) { continue; } @@ -614,7 +627,8 @@ int TRI_DumpLogReplication( } } - res = StringifyMarker(dump, databaseId, collectionId, marker, false, true, false); + res = StringifyMarker(dump, databaseId, collectionId, marker, false, + true, false); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); @@ -632,7 +646,7 @@ int TRI_DumpLogReplication( break; } } - + if (outputAsArray) { Append(dump, "]"); } @@ -671,7 +685,8 @@ int TRI_DumpLogReplication( int TRI_DetermineOpenTransactionsReplication(TRI_replication_dump_t* dump, TRI_voc_tick_t tickMin, TRI_voc_tick_t tickMax) { - LOG(TRACE) << "determining transactions, tick range " << tickMin << " - " << tickMax; + LOG(TRACE) << "determining transactions, tick range " << tickMin << " - " + << tickMax; std::unordered_map transactions; @@ -706,7 +721,7 @@ int TRI_DetermineOpenTransactionsReplication(TRI_replication_dump_t* dump, // end of datafile break; } - + TRI_df_marker_type_t const type = marker->getType(); if (type <= TRI_DF_MARKER_MIN || type >= TRI_DF_MARKER_MAX) { @@ -747,7 +762,8 @@ int TRI_DetermineOpenTransactionsReplication(TRI_replication_dump_t* dump, type == TRI_DF_MARKER_VPACK_ABORT_TRANSACTION) { transactions.erase(tid); } else { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "found invalid marker type"); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "found invalid marker type"); } } } diff --git a/arangod/VocBase/replication-dump.h b/arangod/VocBase/replication-dump.h index bde298014a..7cb8f3fd05 100644 --- a/arangod/VocBase/replication-dump.h +++ b/arangod/VocBase/replication-dump.h @@ -39,9 +39,10 @@ //////////////////////////////////////////////////////////////////////////////// struct TRI_replication_dump_t { - TRI_replication_dump_t(std::shared_ptr transactionContext, - size_t chunkSize, - bool includeSystem, TRI_voc_cid_t restrictCollection) + TRI_replication_dump_t(std::shared_ptr + transactionContext, + size_t chunkSize, bool includeSystem, + TRI_voc_cid_t restrictCollection, bool useVpp = false) : _transactionContext(transactionContext), _vocbase(transactionContext->vocbase()), _buffer(nullptr), @@ -95,8 +96,9 @@ struct TRI_replication_dump_t { /// @brief dump data from a single collection //////////////////////////////////////////////////////////////////////////////// -int TRI_DumpCollectionReplication(TRI_replication_dump_t*, arangodb::LogicalCollection*, - TRI_voc_tick_t, TRI_voc_tick_t, bool); +int TRI_DumpCollectionReplication(TRI_replication_dump_t*, + arangodb::LogicalCollection*, TRI_voc_tick_t, + TRI_voc_tick_t, bool); //////////////////////////////////////////////////////////////////////////////// /// @brief dump data from the replication log