diff --git a/UnitTests/HttpInterface/api-replication-rocksdb-spec.rb b/UnitTests/HttpInterface/api-replication-rocksdb-spec.rb index 6061cd9ea3..0cbc81f0b2 100644 --- a/UnitTests/HttpInterface/api-replication-rocksdb-spec.rb +++ b/UnitTests/HttpInterface/api-replication-rocksdb-spec.rb @@ -985,32 +985,11 @@ describe ArangoDB do cmd = api + "/dump?collection=UnitTestsReplication&batchId=#{@batchId}" doc = ArangoDB.log_get("#{prefix}-deleted", cmd, :body => "", :format => :plain) - doc.code.should eq(200) + doc.code.should eq(204) doc.headers["x-arango-replication-checkmore"].should eq("false") - doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/) - doc.headers["x-arango-replication-lastincluded"].should_not eq("0") + doc.headers["x-arango-replication-lastincluded"].should eq("0") doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8") - - body = doc.response.body - i = 0 - while 1 - position = body.index("\n") - - break if position == nil - - part = body.slice(0, position) - - document = JSON.parse(part) - - document['type'].should eq(2302) - document['data']['_key'].should match(/^test[0-9]+$/) - - body = body.slice(position + 1, body.length) - i = i + 1 - end - - i.should eq(10) end it "checks the dump for a truncated collection" do @@ -1037,34 +1016,11 @@ describe ArangoDB do cmd = api + "/dump?collection=UnitTestsReplication&batchId=#{@batchId}" doc = ArangoDB.log_get("#{prefix}-truncated", cmd, :body => "", :format => :plain) - doc.code.should eq(200) + doc.code.should eq(204) doc.headers["x-arango-replication-checkmore"].should eq("false") - doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/) - doc.headers["x-arango-replication-lastincluded"].should_not eq("0") + doc.headers["x-arango-replication-lastincluded"].should eq("0") doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8") - - body = doc.response.body - i = 0 - while 1 - position = body.index("\n") - - break if position == nil - - part = body.slice(0, position) - - document = JSON.parse(part) - - document['type'].should eq(2302) - # truncate order is undefined - document['data']['_key'].should match(/^test\d+$/) - document['data']['_rev'].should match(/^[a-zA-Z0-9_\-]+$/) - - body = body.slice(position + 1, body.length) - i = i + 1 - end - - i.should eq(10) end it "checks the dump for a non-empty collection, 3.0 mode" do diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 08f5117257..34cb0c821f 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -26,7 +26,6 @@ #include "Basics/StringBuffer.h" #include "Basics/StringRef.h" #include "Basics/VPackStringBufferAdapter.h" -#include "VocBase/replication-common.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBPrimaryIndex.h" @@ -130,7 +129,7 @@ RocksDBReplicationContext::getInventory(TRI_vocbase_t* vocbase, // creating a new iterator if one does not exist for this collection RocksDBReplicationResult RocksDBReplicationContext::dump( TRI_vocbase_t* vocbase, std::string const& collectionName, - basics::StringBuffer& buff, uint64_t chunkSize) { + basics::StringBuffer& buff, uint64_t chunkSize, bool compat28) { TRI_ASSERT(vocbase != nullptr); if (_trx.get() == nullptr) { return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick); @@ -142,12 +141,15 @@ RocksDBReplicationResult RocksDBReplicationContext::dump( // set type int type = REPLICATION_MARKER_DOCUMENT; // documents + if (compat28 && (_collection->type() == TRI_COL_TYPE_EDGE)) { + type = 2301; // 2.8 compatibility edges + } arangodb::basics::VPackStringBufferAdapter adapter(buff.stringBuffer()); VPackBuilder builder(&_vpackOptions); - auto cb = [this, &type, &buff, &adapter, + auto cb = [this, &type, &buff, &adapter, &compat28, &builder](DocumentIdentifierToken const& token) { builder.clear(); @@ -167,7 +169,9 @@ RocksDBReplicationResult RocksDBReplicationContext::dump( builder.add(VPackValue("data")); auto key = VPackSlice(_mdr.vpack()).get(StaticStrings::KeyString); _mdr.addToBuilder(builder, false); - builder.add("key", key); + if (compat28) { + builder.add("key", key); + } builder.close(); VPackDumper dumper( @@ -180,7 +184,7 @@ RocksDBReplicationResult RocksDBReplicationContext::dump( while (_hasMore && buff.length() < chunkSize) { try { - _hasMore = _iter->next(cb, 10); // TODO: adjust limit? + _hasMore = _iter->next(cb, 1); // TODO: adjust limit? } catch (std::exception const& ex) { _hasMore = false; return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index 6f8b6f8d62..368002ee2e 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -54,7 +54,7 @@ class RocksDBReplicationContext { TRI_voc_tick_t id() const; uint64_t lastTick() const; uint64_t count() const; - + TRI_vocbase_t* vocbase() const { if (_trx == nullptr) { return nullptr; @@ -74,7 +74,8 @@ class RocksDBReplicationContext { // creating a new iterator if one does not exist for this collection RocksDBReplicationResult dump(TRI_vocbase_t* vocbase, std::string const& collectionName, - basics::StringBuffer&, uint64_t chunkSize); + basics::StringBuffer&, uint64_t chunkSize, + bool compat28); // iterates over all documents in a collection, previously bound with // bindCollection. Generates array of objects with minKey, maxKey and hash diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 969ef83840..0808cb7216 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -80,17 +80,17 @@ class WALParser : public rocksdb::WriteBatch::Handler { public: explicit WALParser(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit, bool includeSystem, VPackBuilder& builder) - : _vocbase(vocbase), - _from(from), - _limit(limit), - _includeSystem(includeSystem), - _builder(builder) {} + : _vocbase(vocbase), + _from(from), + _limit(limit), + _includeSystem(includeSystem), + _builder(builder) {} void LogData(rocksdb::Slice const& blob) override { if (_currentSequence < _from) { return; } - + RocksDBLogType type = RocksDBLogValue::type(blob); TRI_DEFER(_lastLogType = type); switch (type) { @@ -139,9 +139,8 @@ class WALParser : public rocksdb::WriteBatch::Handler { _currentCollectionId = RocksDBLogValue::collectionId(blob); TRI_idx_iid_t iid = RocksDBLogValue::indexId(blob); _builder.openObject(); - _builder.add( - "type", - VPackValue(static_cast(REPLICATION_INDEX_DROP))); + _builder.add("type", + VPackValue(static_cast(REPLICATION_INDEX_DROP))); _builder.add("database", VPackValue(std::to_string(_currentDbId))); _builder.add("cid", VPackValue(std::to_string(_currentCollectionId))); _builder.add("data", VPackValue(VPackValueType::Object)); @@ -221,6 +220,7 @@ class WALParser : public rocksdb::WriteBatch::Handler { _builder.add("type", VPackValue(convertLogType(_lastLogType))); _builder.add("database", VPackValue(std::to_string(_currentDbId))); _builder.add("cid", VPackValue(std::to_string(_currentCollectionId))); + _builder.add("cname", RocksDBValue::data(value).get("name")); if (_lastLogType == RocksDBLogType::CollectionRename) { VPackSlice collectionData(value.data()); @@ -273,7 +273,7 @@ class WALParser : public rocksdb::WriteBatch::Handler { if (_currentSequence < _from) { return; } - + switch (RocksDBKey::type(key)) { case RocksDBEntryType::Collection: { TRI_ASSERT(_lastLogType == RocksDBLogType::CollectionDrop); @@ -302,13 +302,13 @@ class WALParser : public rocksdb::WriteBatch::Handler { TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0); TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0); TRI_ASSERT(!_removeDocumentKey.empty()); - + uint64_t revisionId = RocksDBKey::revisionId(key); _builder.openObject(); _builder.add("tick", VPackValue(std::to_string(_currentSequence))); _builder.add( - "type", - VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); + "type", + VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); _builder.add("database", VPackValue(std::to_string(_currentDbId))); _builder.add("cid", VPackValue(std::to_string(_currentCollectionId))); if (_singleOpTransaction) { // single op is defined to 0 @@ -327,7 +327,7 @@ class WALParser : public rocksdb::WriteBatch::Handler { default: break; // shouldn't get here? } - + if (_limit > 0) { _limit--; } @@ -425,7 +425,6 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, uint64_t from, size_t limit, bool includeSystem, VPackBuilder& builder) { - uint64_t lastTick = from; std::unique_ptr handler( new WALParser(vocbase, from, limit, includeSystem, builder)); diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 2cf1bc9dd6..65790d41d5 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -26,8 +26,8 @@ #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ConditionLocker.h" #include "Basics/ReadLocker.h" -#include "Basics/VelocyPackHelper.h" #include "Basics/VPackStringBufferAdapter.h" +#include "Basics/VelocyPackHelper.h" #include "Basics/conversions.h" #include "Basics/files.h" #include "Cluster/ClusterComm.h" @@ -580,86 +580,87 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { if (_request->transportType() == Endpoint::TransportType::VPP) { useVpp = true; } - + // determine start and end tick TRI_voc_tick_t tickStart = 0; TRI_voc_tick_t tickEnd = UINT64_MAX; - + bool found; std::string const& value1 = _request->value("from", found); - + if (found) { tickStart = static_cast(StringUtils::uint64(value1)); } - + // determine end tick for dump std::string const& value2 = _request->value("to", found); - + if (found) { tickEnd = static_cast(StringUtils::uint64(value2)); } - + if (found && (tickStart > tickEnd || tickEnd == 0)) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return; } - + bool includeSystem = true; std::string const& value4 = _request->value("includeSystem", found); - + if (found) { includeSystem = StringUtils::boolean(value4); } - + size_t limit = 10000; // TODO: determine good default value? std::string const& value5 = _request->value("chunkSize", found); - + if (found) { limit = static_cast(StringUtils::uint64(value5)); } - + std::shared_ptr transactionContext = - transaction::StandaloneContext::Create(_vocbase); - + transaction::StandaloneContext::Create(_vocbase); + VPackBuilder builder(transactionContext->getVPackOptions()); builder.openArray(); auto result = tailWal(_vocbase, tickStart, limit, includeSystem, builder); builder.close(); auto data = builder.slice(); - + if (result.fail()) { - generateError(rest::ResponseCode::SERVER_ERROR, result.errorNumber(), result.errorMessage()); + generateError(rest::ResponseCode::SERVER_ERROR, result.errorNumber(), + result.errorMessage()); return; } - + bool const checkMore = - (result.maxTick() > 0 && result.maxTick() < latestSequenceNumber()); - + (result.maxTick() > 0 && result.maxTick() < latestSequenceNumber()); + // generate the result size_t length = data.length(); - + if (length == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); } - + // transfer ownership of the buffer contents _response->setContentType(rest::ContentType::DUMP); - + // set headers _response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE, checkMore ? "true" : "false"); _response->setHeaderNC( - TRI_REPLICATION_HEADER_LASTINCLUDED, - StringUtils::itoa((length == 0) ? 0 : result.maxTick())); + TRI_REPLICATION_HEADER_LASTINCLUDED, + StringUtils::itoa((length == 0) ? 0 : result.maxTick())); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(latestSequenceNumber())); _response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true"); _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, result.fromTickIncluded() ? "true" : "false"); - + if (length > 0) { if (useVpp) { for (auto message : arangodb::velocypack::ArrayIterator(data)) { @@ -667,18 +668,19 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { transactionContext->getVPackOptions(), true); } } else { - HttpResponse* httpResponse = - dynamic_cast(_response.get()); - + HttpResponse* httpResponse = dynamic_cast(_response.get()); + if (httpResponse == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } - + basics::StringBuffer& buffer = httpResponse->body(); arangodb::basics::VPackStringBufferAdapter adapter(buffer.stringBuffer()); - VPackDumper dumper(&adapter, - transactionContext->getVPackOptions()); // note: we need the CustomTypeHandler here + VPackDumper dumper( + &adapter, + transactionContext + ->getVPackOptions()); // note: we need the CustomTypeHandler here for (auto marker : arangodb::velocypack::ArrayIterator(data)) { dumper.dump(marker); httpResponse->body().appendChar('\n'); @@ -687,10 +689,10 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { // add client bool found; std::string const& value = _request->value("serverId", found); - + if (found) { TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value); - + if (serverId > 0) { _vocbase->updateReplicationClient(serverId, result.maxTick()); } @@ -710,17 +712,18 @@ void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() { if (_request->transportType() == Endpoint::TransportType::VPP) { useVpp = true; } - //_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(dump._lastFoundTick)); + //_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, + // StringUtils::itoa(dump._lastFoundTick)); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, "0"); _response->setContentType(rest::ContentType::DUMP); - //_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, dump._fromTickIncluded ? "true" : "false"); + //_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, + // dump._fromTickIncluded ? "true" : "false"); _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, "true"); VPackSlice slice = VelocyPackHelper::EmptyArrayValue(); if (useVpp) { _response->addPayload(slice, &VPackOptions::Defaults, false); } else { - HttpResponse* httpResponse = - dynamic_cast(_response.get()); + HttpResponse* httpResponse = dynamic_cast(_response.get()); if (httpResponse == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, @@ -789,9 +792,11 @@ void RocksDBRestReplicationHandler::handleCommandInventory() { builder.add("running", VPackValue(true)); builder.add("lastLogTick", VPackValue(std::to_string(ctx->lastTick()))); - builder.add("lastUncommittedLogTick", - VPackValue(std::to_string(ctx->lastTick()))); // s.lastAssignedTick - builder.add("totalEvents", VPackValue(ctx->lastTick())); // s.numEvents + s.numEventsSync + builder.add( + "lastUncommittedLogTick", + VPackValue(std::to_string(ctx->lastTick()))); // s.lastAssignedTick + builder.add("totalEvents", + VPackValue(ctx->lastTick())); // s.numEvents + s.numEventsSync builder.add("time", VPackValue(utilities::timeString())); builder.close(); // state @@ -1285,6 +1290,13 @@ void RocksDBRestReplicationHandler::handleCommandDump() { return; } + bool compat28 = false; + std::string const& value8 = _request->value("compat28", found); + + if (found) { + compat28 = StringUtils::boolean(value8); + } + // print request LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "requested collection dump for collection '" << collection @@ -1299,7 +1311,8 @@ void RocksDBRestReplicationHandler::handleCommandDump() { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } - auto result = context->dump(_vocbase, collection, dump, determineChunkSize()); + auto result = + context->dump(_vocbase, collection, dump, determineChunkSize(), compat28); // generate the result if (dump.length() == 0) {