diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index a1e54ee165..1a0811421f 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -51,6 +51,57 @@ using namespace triagens::arango; using namespace triagens::httpclient; using namespace triagens::rest; +static size_t BinarySearch (std::vector const& markers, + std::string const& key) { + TRI_ASSERT(! markers.empty()); + + size_t l = 0; + size_t r = markers.size() - 1; + + while (true) { + // determine midpoint + size_t m = l + ((r - l) / 2); + + char const* other = TRI_EXTRACT_MARKER_KEY(markers[m]); + + int res = strcmp(key.c_str(), other); + + if (res == 0) { + return m; + } + if (res < 0) { + if (m == 0) { + return SIZE_MAX; + } + r = m - 1; + } + else { + l = m + 1; + } + + if (r < l) { + return SIZE_MAX; + } + } +} + +static bool FindRange (std::vector const& markers, + std::string const& lower, + std::string const& upper, + size_t& lowerPos, + size_t& upperPos) { + bool found = false; + + if (! markers.empty()) { + lowerPos = BinarySearch(markers, lower); + if (lowerPos != SIZE_MAX) { + upperPos = BinarySearch(markers, upper); + } + } + + return found; +} + // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- @@ -107,7 +158,8 @@ InitialSyncer::~InitialSyncer () { /// @brief run method, performs a full synchronization //////////////////////////////////////////////////////////////////////////////// -int InitialSyncer::run (string& errorMsg) { +int InitialSyncer::run (string& errorMsg, + bool incremental) { if (_client == nullptr || _connection == nullptr || _endpoint == nullptr) { @@ -167,7 +219,7 @@ int InitialSyncer::run (string& errorMsg) { std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, response->getBody().c_str())); if (JsonHelper::isObject(json.get())) { - res = handleInventoryResponse(json.get(), errorMsg); + res = handleInventoryResponse(json.get(), incremental, errorMsg); } else { res = TRI_ERROR_REPLICATION_INVALID_RESPONSE; @@ -569,6 +621,233 @@ int InitialSyncer::handleCollectionDump (string const& cid, return TRI_ERROR_INTERNAL; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief incrementally fetch data from a collection +//////////////////////////////////////////////////////////////////////////////// + +int InitialSyncer::handleCollectionSync (string const& cid, + TRI_document_collection_t* document, + TRI_transaction_collection_t* trxCollection, + string const& collectionName, + TRI_voc_tick_t maxTick, + string& errorMsg) { + + string const baseUrl = BaseUrl + "/keys"; + + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_POST, + BaseUrl + "?collection=" + cid, + nullptr, + 0)); + + if (response == nullptr || ! response->isComplete()) { + errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + StringBuffer& data = response->getBody(); + + // order collection keys + std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.c_str())); + + if (! TRI_IsObjectJson(json.get())) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response is no object"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + TRI_json_t const* idJson = TRI_LookupObjectJson(json.get(), "id"); + + if (! TRI_IsStringJson(idJson)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response does not contain 'id' attribute"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + std::string const id(idJson->_value._string.data, idJson->_value._string.length - 1); + + // now we can fetch the complete chunk information from the master + + int res = handleSyncKeys(id, cid, document, trxCollection, collectionName, maxTick, errorMsg); + + { + // now delete the keys we ordered + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_DELETE, + BaseUrl + "/" + id, + nullptr, + 0)); + + if (response == nullptr || ! response->isComplete()) { + errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + } + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief incrementally fetch data from a collection +//////////////////////////////////////////////////////////////////////////////// + +#include +int InitialSyncer::handleSyncKeys (std::string const& keysId, + std::string const& cid, + TRI_document_collection_t* document, + TRI_transaction_collection_t* trxCollection, + std::string const& collectionName, + TRI_voc_tick_t maxTick, + std::string& errorMsg) { + + // fetch all local keys from primary index + std::vector markers; + + auto idx = document->primaryIndex(); + markers.reserve(idx->size()); + + triagens::basics::BucketPosition position; + + uint64_t total = 0; + while (true) { + auto ptr = idx->lookupSequential(position, total); + + if (ptr == nullptr) { + // done + break; + } + + void const* marker = ptr->getDataPtr(); + auto df = static_cast(marker); + + if (df->_tick >= maxTick) { + continue; + } + + markers.emplace_back(df); + } + + // sort all our local keys + std::sort(markers.begin(), markers.end(), [] (TRI_df_marker_t const* lhs, TRI_df_marker_t const* rhs) -> bool { + int res = strcmp(TRI_EXTRACT_MARKER_KEY(lhs), TRI_EXTRACT_MARKER_KEY(rhs)); + + return res < 0; + }); + + + TRI_voc_tick_t const chunkSize = 5000; + string const baseUrl = BaseUrl + "/keys"; + + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_PUT, + BaseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize), + nullptr, + 0)); + + if (response == nullptr || ! response->isComplete()) { + errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + StringBuffer& data = response->getBody(); + + // parse chunks + std::unique_ptr json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.c_str())); + + if (! TRI_IsArrayJson(json.get())) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + size_t const n = TRI_LengthArrayJson(json.get()); + + // now process each chunk + for (size_t i = 0; i < n; ++i) { + // read remote chunk + auto chunk = static_cast(TRI_AtVector(&(json.get()->_value._objects), i)); + + if (! TRI_IsObjectJson(chunk)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": chunk is no object"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + auto lowJson = TRI_LookupObjectJson(chunk, "low"); + auto highJson = TRI_LookupObjectJson(chunk, "high"); + auto hashJson = TRI_LookupObjectJson(chunk, "hash"); + +std::cout << "i: " << i << ", RANGE LOW: " << std::string(lowJson->_value._string.data) << ", HIGH: " << std::string(highJson->_value._string.data) << ", HASH: " << std::string(hashJson->_value._string.data) << "\n"; + if (! TRI_IsStringJson(lowJson) || ! TRI_IsStringJson(highJson) || ! TRI_IsStringJson(hashJson)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": chunks in response have an invalid format"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + size_t localFrom; + size_t localTo; + bool match = FindRange(markers, + std::string(lowJson->_value._string.data, lowJson->_value._string.length - 1), + std::string(highJson->_value._string.data, highJson->_value._string.length - 1), + localFrom, + localTo); + + if (match) { + // now must hash the range + uint64_t hash = 0x012345678; + + for (size_t i = localFrom; i < localTo; ++i) { + auto marker = markers.at(i); + char const* key = TRI_EXTRACT_MARKER_KEY(marker); + + hash ^= TRI_FnvHashString(key); + hash ^= TRI_EXTRACT_MARKER_RID(marker); + } + + if (std::to_string(hash) != std::string(hashJson->_value._string.data, hashJson->_value._string.length - 1)) { + match = false; + } + } + +std::cout << "RANGE DOES MATCH: " << (int) match << "\n"; + if (! match) { + // must transfer keys for non-matching range + } + } + + // TODO: remove all keys that are below first remote key or beyond last remote key + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief handle the information about a collection //////////////////////////////////////////////////////////////////////////////// @@ -705,7 +984,7 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, // ------------------------------------------------------------------------------------- else if (phase == PHASE_DUMP) { - string const progress = "syncing data for " + collectionMsg; + string const progress = "dumping data for " + collectionMsg; setProgress(progress.c_str()); TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid); @@ -811,6 +1090,55 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, return res; } + + // sync collection data incrementally + // ------------------------------------------------------------------------------------- + + else if (phase == PHASE_SYNC) { + string const progress = "syncing data for " + collectionMsg; + setProgress(progress.c_str()); + + TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid); + + if (col == nullptr && ! masterName.empty()) { + // not found, try name next + col = TRI_LookupCollectionByNameVocBase(_vocbase, masterName.c_str()); + } + + if (col == nullptr) { + errorMsg = "cannot sync: " + collectionMsg + " not found"; + + return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; + } + + int res = TRI_ERROR_INTERNAL; + + { + SingleCollectionWriteTransaction trx(new StandaloneTransactionContext(), _vocbase, col->_cid); + + res = trx.begin(); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "unable to start transaction: " + string(TRI_errno_string(res)); + + return res; + } + + TRI_transaction_collection_t* trxCollection = trx.trxCollection(); + + if (trxCollection == nullptr) { + res = TRI_ERROR_INTERNAL; + errorMsg = "unable to start transaction: " + string(TRI_errno_string(res)); + } + else { + res = handleCollectionSync(StringUtils::itoa(cid), trx.documentCollection(), trxCollection, masterName, _masterInfo._lastLogTick, errorMsg); + } + + res = trx.finish(res); + } + + return res; + } // we won't get here @@ -823,7 +1151,8 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, //////////////////////////////////////////////////////////////////////////////// int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, - string& errorMsg) { + bool incremental, + std::string& errorMsg) { TRI_json_t const* data = JsonHelper::getObjectElement(json, "collections"); if (! JsonHelper::isArray(data)) { @@ -892,7 +1221,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, } } - collections.emplace_back(std::make_pair(parameters, indexes)); + collections.emplace_back(parameters, indexes); } int res; @@ -906,10 +1235,18 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, if (res != TRI_ERROR_NO_ERROR) { return res; } + + if (incremental) { + // STEP 2: sync collection data from master and create initial indexes + // ---------------------------------------------------------------------------------- + return iterateCollections(collections, errorMsg, PHASE_SYNC); + } + + TRI_ASSERT(! incremental); // STEP 2: drop collections locally if they are also present on the master (clean up) - // ---------------------------------------------------------------------------------- + // ---------------------------------------------------------------------------------- res = iterateCollections(collections, errorMsg, PHASE_DROP); @@ -917,7 +1254,6 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, return res; } - // STEP 3: re-create empty collections locally // ---------------------------------------------------------------------------------- @@ -926,8 +1262,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, if (res != TRI_ERROR_NO_ERROR) { return res; } - - + // STEP 4: sync collection data from master and create initial indexes // ---------------------------------------------------------------------------------- @@ -939,7 +1274,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, //////////////////////////////////////////////////////////////////////////////// int InitialSyncer::iterateCollections (std::vector> const& collections, - string& errorMsg, + std::string& errorMsg, sync_phase_e phase) { std::string phaseMsg("starting phase " + translatePhase(phase) + " with " + std::to_string(collections.size()) + " collections"); setProgress(phaseMsg); diff --git a/arangod/Replication/InitialSyncer.h b/arangod/Replication/InitialSyncer.h index 4ac910b98e..29289ef662 100644 --- a/arangod/Replication/InitialSyncer.h +++ b/arangod/Replication/InitialSyncer.h @@ -72,7 +72,8 @@ namespace triagens { PHASE_VALIDATE, PHASE_DROP, PHASE_CREATE, - PHASE_DUMP + PHASE_DUMP, + PHASE_SYNC } sync_phase_e; @@ -108,7 +109,7 @@ namespace triagens { /// @brief run method, performs a full synchronization //////////////////////////////////////////////////////////////////////////////// - int run (std::string&); + int run (std::string&, bool); //////////////////////////////////////////////////////////////////////////////// /// @brief return the last log tick of the master at start @@ -134,6 +135,8 @@ namespace triagens { return "create"; case PHASE_DUMP: return "dump"; + case PHASE_SYNC: + return "sync"; case PHASE_NONE: break; } @@ -203,6 +206,29 @@ namespace triagens { TRI_voc_tick_t, std::string&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief incrementally fetch data from a collection +//////////////////////////////////////////////////////////////////////////////// + + int handleCollectionSync (std::string const&, + struct TRI_document_collection_t*, + struct TRI_transaction_collection_s*, + std::string const&, + TRI_voc_tick_t, + std::string&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief incrementally fetch data from a collection +//////////////////////////////////////////////////////////////////////////////// + + int handleSyncKeys (std::string const&, + std::string const&, + struct TRI_document_collection_t*, + struct TRI_transaction_collection_s*, + std::string const&, + TRI_voc_tick_t, + std::string&); + //////////////////////////////////////////////////////////////////////////////// /// @brief handle the information about a collection //////////////////////////////////////////////////////////////////////////////// @@ -217,6 +243,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// int handleInventoryResponse (struct TRI_json_t const*, + bool, std::string&); //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 0baf8e8553..d5c9a082e1 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -3158,7 +3158,7 @@ void RestReplicationHandler::handleCommandCreateKeys () { } //////////////////////////////////////////////////////////////////////////////// -/// @brief returns a key range +/// @brief returns all key ranges //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandGetKeys () { @@ -3170,32 +3170,23 @@ void RestReplicationHandler::handleCommandGetKeys () { "expecting GET /_api/replication/keys/"); return; } + + static TRI_voc_tick_t const DefaultChunkSize = 5000; + TRI_voc_tick_t chunkSize = DefaultChunkSize; + + // determine chunk size + bool found; + char const* value = _request->value("chunkSize", found); + + if (found) { + chunkSize = static_cast(StringUtils::uint64(value)); + if (chunkSize < 100) { + chunkSize = DefaultChunkSize; + } + } std::string const& id = suffix[1]; - TRI_voc_tick_t tickStart = 0; - TRI_voc_tick_t tickEnd = 0; - - // determine start and end tick for keys - bool found; - char const* value = _request->value("from", found); - - if (found) { - tickStart = static_cast(StringUtils::uint64(value)); - } - - value = _request->value("to", found); - if (found) { - tickEnd = static_cast(StringUtils::uint64(value)); - } - - if (tickStart > tickEnd || tickEnd == 0) { - generateError(HttpResponse::BAD, - TRI_ERROR_HTTP_BAD_PARAMETER, - "invalid from/to values"); - return; - } - int res = TRI_ERROR_NO_ERROR; try { @@ -3218,14 +3209,27 @@ void RestReplicationHandler::handleCommandGetKeys () { } try { - auto result = collectionKeys->hashChunk(tickStart, tickEnd); + + triagens::basics::Json json(triagens::basics::Json::Array, 200); + + TRI_voc_tick_t max = static_cast(collectionKeys->count()); + + for (TRI_voc_tick_t from = 0; from < max; from += chunkSize) { + TRI_voc_tick_t to = from + chunkSize; + if (to > max) { + to = max; + } + auto result = collectionKeys->hashChunk(from, to); + + triagens::basics::Json chunk(triagens::basics::Json::Object, 3); + chunk.set("low", triagens::basics::Json(std::get<0>(result))); + chunk.set("high", triagens::basics::Json(std::get<1>(result))); + chunk.set("hash", triagens::basics::Json(std::to_string(std::get<2>(result)))); + + json.add(chunk); + } collectionKeys->release(); - - triagens::basics::Json json(triagens::basics::Json::Object, 3); - json.set("low", triagens::basics::Json(std::get<0>(result))); - json.set("high", triagens::basics::Json(std::get<1>(result))); - json.set("hash", triagens::basics::Json(std::to_string(std::get<2>(result)))); generateResult(HttpResponse::OK, json.json()); } @@ -3816,7 +3820,7 @@ void RestReplicationHandler::handleCommandMakeSlave () { res = TRI_ERROR_NO_ERROR; try { - res = syncer.run(errorMsg); + res = syncer.run(errorMsg, false); } catch (...) { errorMsg = "caught an exception"; @@ -4003,7 +4007,7 @@ void RestReplicationHandler::handleCommandSync () { string errorMsg = ""; try { - res = syncer.run(errorMsg); + res = syncer.run(errorMsg, false); } catch (...) { errorMsg = "caught an exception"; diff --git a/arangod/V8Server/v8-replication.cpp b/arangod/V8Server/v8-replication.cpp index 2f0d269d06..686d4299c3 100644 --- a/arangod/V8Server/v8-replication.cpp +++ b/arangod/V8Server/v8-replication.cpp @@ -278,6 +278,13 @@ static void JS_SynchronizeReplication (const v8::FunctionCallbackInfo config._requireFromPresent = TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("requireFromPresent"))); } } + + bool incremental = false; + if (object->Has(TRI_V8_ASCII_STRING("incremental"))) { + if (object->Get(TRI_V8_ASCII_STRING("incremental"))->IsBoolean()) { + incremental = TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("incremental"))); + } + } string errorMsg = ""; InitialSyncer syncer(vocbase, &config, restrictCollections, restrictType, verbose); @@ -287,7 +294,7 @@ static void JS_SynchronizeReplication (const v8::FunctionCallbackInfo v8::Handle result = v8::Object::New(isolate); try { - res = syncer.run(errorMsg); + res = syncer.run(errorMsg, incremental); result->Set(TRI_V8_ASCII_STRING("lastLogTick"), V8TickId(isolate, syncer.getLastLogTick()));