From 2269ca7b0a58b088aa1376bc503f6010588c487a Mon Sep 17 00:00:00 2001 From: Jan Christoph Uhde Date: Mon, 24 Apr 2017 11:47:54 +0200 Subject: [PATCH] add batchId and logging --- arangod/Replication/InitialSyncer.cpp | 14 +++++++++++--- arangod/Replication/Syncer.cpp | 5 ++++- .../RocksDBEngine/RocksDBReplicationContext.cpp | 9 ++++++--- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index d81eeebcb8..60b32e5d08 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -179,11 +179,14 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) { try { setProgress("fetching master state"); + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: getting master state"; res = getMasterState(errorMsg); if (res != TRI_ERROR_NO_ERROR) { + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res << " " << errorMsg; return res; } + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res << " " << errorMsg; if (incremental) { if (_masterInfo._majorVersion == 1 || @@ -210,7 +213,8 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) { return res; } - std::string url = BaseUrl + "/inventory?serverId=" + _localServerIdString; + std::string url = BaseUrl + "/inventory?serverId=" + _localServerIdString + + "&batchId=" + std::to_string(_batchId); if (_includeSystem) { url += "&includeSystem=true"; } @@ -255,6 +259,7 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) { VPackSlice const slice = builder->slice(); if (!slice.isObject()) { + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: InitialSyncer::run - inventoryResponse is not an object"; res = TRI_ERROR_REPLICATION_INVALID_RESPONSE; errorMsg = "got invalid response from master at " + @@ -615,7 +620,10 @@ int InitialSyncer::handleCollectionDump( uint64_t chunkSize = _chunkSize; - std::string const baseUrl = BaseUrl + "/dump?collection=" + cid + appendix; + TRI_ASSERT(_batchId); //should not be equal to 0 + std::string const baseUrl = BaseUrl + "/dump?collection=" + cid + + "&batchId=" + std::to_string(_batchId) + + appendix; TRI_voc_tick_t fromTick = 0; int batch = 1; @@ -1691,7 +1699,7 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters, if (checkAborted()) { return TRI_ERROR_REPLICATION_APPLIER_STOPPED; } - + if (!parameters.isObject() || !indexes.isArray()) { return TRI_ERROR_REPLICATION_INVALID_RESPONSE; } diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index a4b163953c..cd1fd4751f 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -662,7 +662,6 @@ int Syncer::getMasterState(std::string& errorMsg) { return TRI_ERROR_REPLICATION_MASTER_ERROR; } - auto builder = std::make_shared(); int res = parseResponse(builder, response.get()); @@ -670,6 +669,7 @@ int Syncer::getMasterState(std::string& errorMsg) { VPackSlice const slice = builder->slice(); if (!slice.isObject()) { + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "synger::getMasterState - state is not an object"; res = TRI_ERROR_REPLICATION_INVALID_RESPONSE; errorMsg = "got invalid response from master at " + _masterInfo._endpoint + ": invalid JSON"; @@ -679,6 +679,9 @@ int Syncer::getMasterState(std::string& errorMsg) { } } + if (res != TRI_ERROR_NO_ERROR){ + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "synger::getMasterState - handleStateResponse failed"; + } return res; } diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 47a3b0f8ab..48bce97157 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -147,9 +147,10 @@ RocksDBReplicationResult RocksDBReplicationContext::dump( // set data bool ok = _collection->readDocument(_trx.get(), token, _mdr); + if (!ok) { - // TODO: do something here? - return; + LOG_TOPIC(ERR, Logger::REPLICATION) << "could not get document with token: " << token._data; + throw RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick); } builder.add(VPackValue("data")); @@ -170,6 +171,8 @@ RocksDBReplicationResult RocksDBReplicationContext::dump( _hasMore = _iter->next(cb, 10); // TODO: adjust limit? } catch (std::exception const& ex) { return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick); + } catch (RocksDBReplicationResult const& ex) { + return ex; } } @@ -369,7 +372,7 @@ void RocksDBReplicationContext::releaseDumpingResources() { std::unique_ptr RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) { double lockTimeout = transaction::Methods::DefaultLockTimeout; - auto ctx = transaction::StandaloneContext::Create(vocbase); + std::shared_ptr ctx = transaction::StandaloneContext::Create(vocbase); std::unique_ptr trx(new transaction::UserTransaction( ctx, {}, {}, {}, lockTimeout, false, true)); Result res = trx->begin();