From d7a3b51e1be992fcf3e14f9f5e52d60d51638306 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Fri, 11 Sep 2015 17:13:31 +0200 Subject: [PATCH] additional sync method --- arangod/Replication/InitialSyncer.cpp | 555 ++++++++++++++---- arangod/Replication/InitialSyncer.h | 14 +- .../RestHandler/RestReplicationHandler.cpp | 123 +++- arangod/RestHandler/RestReplicationHandler.h | 6 + arangod/Utils/CollectionKeys.cpp | 121 ++++ arangod/Utils/CollectionKeys.h | 18 + arangod/Utils/CollectionKeysRepository.cpp | 9 +- arangod/Utils/CollectionKeysRepository.h | 3 +- 8 files changed, 698 insertions(+), 151 deletions(-) diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 1a0811421f..33fba4adee 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -51,8 +51,9 @@ using namespace triagens::arango; using namespace triagens::httpclient; using namespace triagens::rest; -static size_t BinarySearch (std::vector const& markers, - std::string const& key) { +static bool BinarySearch (std::vector const& markers, + std::string const& key, + size_t& position) { TRI_ASSERT(! markers.empty()); size_t l = 0; @@ -60,27 +61,29 @@ static size_t BinarySearch (std::vector const& markers, while (true) { // determine midpoint - size_t m = l + ((r - l) / 2); + position = l + ((r - l) / 2); - char const* other = TRI_EXTRACT_MARKER_KEY(markers[m]); + TRI_ASSERT(position < markers.size()); + char const* other = TRI_EXTRACT_MARKER_KEY(markers.at(position)); int res = strcmp(key.c_str(), other); if (res == 0) { - return m; + return true; } + if (res < 0) { - if (m == 0) { - return SIZE_MAX; + if (position == 0) { + return false; } - r = m - 1; + r = position - 1; } else { - l = m + 1; + l = position + 1; } if (r < l) { - return SIZE_MAX; + return false; } } } @@ -93,9 +96,10 @@ static bool FindRange (std::vector const& markers, bool found = false; if (! markers.empty()) { - lowerPos = BinarySearch(markers, lower); - if (lowerPos != SIZE_MAX) { - upperPos = BinarySearch(markers, upper); + found = BinarySearch(markers, lower, lowerPos); + + if (found) { + found = BinarySearch(markers, upper, upperPos); } } @@ -625,17 +629,16 @@ int InitialSyncer::handleCollectionDump (string const& cid, /// @brief incrementally fetch data from a collection //////////////////////////////////////////////////////////////////////////////// -int InitialSyncer::handleCollectionSync (string const& cid, - TRI_document_collection_t* document, - TRI_transaction_collection_t* trxCollection, - string const& collectionName, +int InitialSyncer::handleCollectionSync (std::string const& cid, + SingleCollectionWriteTransaction& trx, + std::string const& collectionName, TRI_voc_tick_t maxTick, string& errorMsg) { string const baseUrl = BaseUrl + "/keys"; - + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_POST, - BaseUrl + "?collection=" + cid, + baseUrl + "?collection=" + cid + "&to=" + std::to_string(maxTick), nullptr, 0)); @@ -672,27 +675,58 @@ int InitialSyncer::handleCollectionSync (string const& cid, if (! TRI_IsStringJson(idJson)) { errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + - ": response does not contain 'id' attribute"; + ": response does not contain valid 'id' attribute"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + TRI_json_t const* countJson = TRI_LookupObjectJson(json.get(), "count"); + + if (! TRI_IsNumberJson(countJson)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response does not contain valid 'count' attribute"; return TRI_ERROR_REPLICATION_INVALID_RESPONSE; } + if (countJson->_value._number <= 0.0) { + int res = trx.truncate(false); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "unable to truncate collection '" + collectionName + "': " + TRI_errno_string(res); + + return res; + } + + return TRI_ERROR_NO_ERROR; + } + + std::string const id(idJson->_value._string.data, idJson->_value._string.length - 1); // now we can fetch the complete chunk information from the master + int res; - int res = handleSyncKeys(id, cid, document, trxCollection, collectionName, maxTick, errorMsg); + try { + res = handleSyncKeys(id, cid, trx, collectionName, maxTick, errorMsg); + } + catch (triagens::basics::Exception const& ex) { + res = ex.code(); + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } { // now delete the keys we ordered std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_DELETE, - BaseUrl + "/" + id, + baseUrl + "/" + id, nullptr, 0)); if (response == nullptr || ! response->isComplete()) { errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + - ": " + _client->getErrorMessage(); + ": " + _client->getErrorMessage(); return TRI_ERROR_REPLICATION_INVALID_RESPONSE; } @@ -705,55 +739,61 @@ int InitialSyncer::handleCollectionSync (string const& cid, /// @brief incrementally fetch data from a collection //////////////////////////////////////////////////////////////////////////////// -#include int InitialSyncer::handleSyncKeys (std::string const& keysId, std::string const& cid, - TRI_document_collection_t* document, - TRI_transaction_collection_t* trxCollection, + SingleCollectionWriteTransaction& trx, std::string const& collectionName, TRI_voc_tick_t maxTick, std::string& errorMsg) { + TRI_doc_update_policy_t policy(TRI_DOC_UPDATE_LAST_WRITE, 0, nullptr); + auto shaper = trx.documentCollection()->getShaper(); + + bool const isEdge = (trx.documentCollection()->_info._type == TRI_COL_TYPE_EDGE); + // fetch all local keys from primary index std::vector markers; - auto idx = document->primaryIndex(); + auto idx = trx.documentCollection()->primaryIndex(); markers.reserve(idx->size()); - triagens::basics::BucketPosition position; + { + triagens::basics::BucketPosition position; - uint64_t total = 0; - while (true) { - auto ptr = idx->lookupSequential(position, total); + uint64_t total = 0; + while (true) { + auto ptr = idx->lookupSequential(position, total); - if (ptr == nullptr) { - // done - break; + if (ptr == nullptr) { + // done + break; + } + + void const* marker = ptr->getDataPtr(); + auto df = static_cast(marker); + + if (df->_tick >= maxTick) { + continue; + } + + markers.emplace_back(df); } + + // sort all our local keys + std::sort(markers.begin(), markers.end(), [] (TRI_df_marker_t const* lhs, TRI_df_marker_t const* rhs) -> bool { + int res = strcmp(TRI_EXTRACT_MARKER_KEY(lhs), TRI_EXTRACT_MARKER_KEY(rhs)); - void const* marker = ptr->getDataPtr(); - auto df = static_cast(marker); - - if (df->_tick >= maxTick) { - continue; - } - - markers.emplace_back(df); + return res < 0; + }); } - - // sort all our local keys - std::sort(markers.begin(), markers.end(), [] (TRI_df_marker_t const* lhs, TRI_df_marker_t const* rhs) -> bool { - int res = strcmp(TRI_EXTRACT_MARKER_KEY(lhs), TRI_EXTRACT_MARKER_KEY(rhs)); - - return res < 0; - }); - + + std::vector toFetch; TRI_voc_tick_t const chunkSize = 5000; string const baseUrl = BaseUrl + "/keys"; - - std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_PUT, - BaseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize), + + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_GET, + baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize), nullptr, 0)); @@ -787,9 +827,51 @@ int InitialSyncer::handleSyncKeys (std::string const& keysId, } size_t const n = TRI_LengthArrayJson(json.get()); + + // remove all keys that are below first remote key or beyond last remote key + if (n > 0) { + // first chunk + auto chunk = static_cast(TRI_AtVector(&(json.get()->_value._objects), 0)); + + TRI_ASSERT(TRI_IsObjectJson(chunk)); + auto lowJson = TRI_LookupObjectJson(chunk, "low"); + TRI_ASSERT(TRI_IsStringJson(lowJson)); + + char const* lowKey = lowJson->_value._string.data; + + for (size_t i = 0; i < markers.size(); ++i) { + auto key = TRI_EXTRACT_MARKER_KEY(markers[i]); + if (strcmp(key, lowKey) >= 0) { + break; + } + + TRI_RemoveShapedJsonDocumentCollection(trx.trxCollection(), (TRI_voc_key_t) key, 0, nullptr, &policy, false, false); + } + + // last high + chunk = static_cast(TRI_AtVector(&(json.get()->_value._objects), n - 1)); + auto highJson = TRI_LookupObjectJson(chunk, "high"); + TRI_ASSERT(TRI_IsStringJson(highJson)); + + char const* highKey = highJson->_value._string.data; + + for (size_t i = markers.size(); i >= 1; --i) { + auto key = TRI_EXTRACT_MARKER_KEY(markers[i - 1]); + if (strcmp(key, highKey) <= 0) { + break; + } + + TRI_RemoveShapedJsonDocumentCollection(trx.trxCollection(), (TRI_voc_key_t) key, 0, nullptr, &policy, false, false); + } + } + + size_t nextStart = 0; + size_t currentChunkId; // now process each chunk for (size_t i = 0; i < n; ++i) { + currentChunkId = i; + // read remote chunk auto chunk = static_cast(TRI_AtVector(&(json.get()->_value._objects), i)); @@ -803,8 +885,8 @@ int InitialSyncer::handleSyncKeys (std::string const& keysId, auto lowJson = TRI_LookupObjectJson(chunk, "low"); auto highJson = TRI_LookupObjectJson(chunk, "high"); auto hashJson = TRI_LookupObjectJson(chunk, "hash"); - -std::cout << "i: " << i << ", RANGE LOW: " << std::string(lowJson->_value._string.data) << ", HIGH: " << std::string(highJson->_value._string.data) << ", HASH: " << std::string(hashJson->_value._string.data) << "\n"; + + //std::cout << "i: " << i << ", RANGE LOW: " << std::string(lowJson->_value._string.data) << ", HIGH: " << std::string(highJson->_value._string.data) << ", HASH: " << std::string(hashJson->_value._string.data) << "\n"; if (! TRI_IsStringJson(lowJson) || ! TRI_IsStringJson(highJson) || ! TRI_IsStringJson(hashJson)) { errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + ": chunks in response have an invalid format"; @@ -824,7 +906,8 @@ std::cout << "i: " << i << ", RANGE LOW: " << std::string(lowJson->_value._strin // now must hash the range uint64_t hash = 0x012345678; - for (size_t i = localFrom; i < localTo; ++i) { + for (size_t i = localFrom; i <= localTo; ++i) { + TRI_ASSERT(i < markers.size()); auto marker = markers.at(i); char const* key = TRI_EXTRACT_MARKER_KEY(marker); @@ -837,14 +920,270 @@ std::cout << "i: " << i << ", RANGE LOW: " << std::string(lowJson->_value._strin } } -std::cout << "RANGE DOES MATCH: " << (int) match << "\n"; - if (! match) { + if (match) { + // match + nextStart = localTo + 1; + } + else { + // no match // must transfer keys for non-matching range + + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_PUT, + baseUrl + "/" + keysId + "?type=keys&chunk=" + std::to_string(i) + "&chunkSize=" + std::to_string(chunkSize), + nullptr, + 0)); + + if (response == nullptr || ! response->isComplete()) { + errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + StringBuffer& rangeKeys = response->getBody(); + + // parse keys + std::unique_ptr rangeKeysJson(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, rangeKeys.c_str())); + + if (! TRI_IsArrayJson(rangeKeysJson.get())) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + size_t const n = TRI_LengthArrayJson(rangeKeysJson.get()); + TRI_ASSERT(n > 0); + + + // delete all keys at start of the range + while (nextStart < markers.size()) { + auto df = markers[nextStart]; + char const* localKey = TRI_EXTRACT_MARKER_KEY(df); + int res = strcmp(localKey, lowJson->_value._string.data); + + if (res < 0) { + // we have a local key that is not present remotely + TRI_RemoveShapedJsonDocumentCollection(trx.trxCollection(), (TRI_voc_key_t) localKey, 0, nullptr, &policy, false, false); + ++nextStart; + } + else { + break; + } + } + + toFetch.clear(); + + for (size_t i = 0; i < n; ++i) { + auto chunk = static_cast(TRI_AtVector(&(rangeKeysJson.get()->_value._objects), i)); + + if (! TRI_IsArrayJson(chunk) || TRI_LengthArrayJson(chunk) != 2) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response chunk is no valid array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + // key + auto keyJson = static_cast(TRI_AtVector(&chunk->_value._objects, 0)); + // rid + auto ridJson = static_cast(TRI_AtVector(&chunk->_value._objects, 1)); + + while (nextStart < markers.size()) { + auto df = markers[nextStart]; + char const* localKey = TRI_EXTRACT_MARKER_KEY(df); + + int res = strcmp(localKey, lowJson->_value._string.data); + + if (res < 0) { + // we have a local key that is not present remotely + TRI_RemoveShapedJsonDocumentCollection(trx.trxCollection(), (TRI_voc_key_t) localKey, 0, nullptr, &policy, false, false); + ++nextStart; + } + else if (res >= 0) { + // key matches or key too high + break; + } + } + + auto mptr = idx->lookupKey(keyJson->_value._string.data); + + if (mptr == nullptr) { + // key not found locally + toFetch.emplace_back(i); + } + else if (std::to_string(mptr->_rid) != std::string(ridJson->_value._string.data, ridJson->_value._string.length - 1)) { + // key found, but rid differs + toFetch.emplace_back(i); + } + else { + // a match - nothing to do! + } + } + + // calculate next starting point + BinarySearch(markers, highJson->_value._string.data, nextStart); + while (nextStart < markers.size()) { + TRI_ASSERT(nextStart < markers.size()); + char const* key = TRI_EXTRACT_MARKER_KEY(markers.at(nextStart)); + int res = strcmp(key, highJson->_value._string.data); + if (res <= 0) { + ++nextStart; + } + else { + break; + } + } + + /* + if (nextStart < markers.size()) { + std::cout << "LOW: " << lowJson->_value._string.data << ", HIGH: " << highJson->_value._string.data << ", NEXT: " << TRI_EXTRACT_MARKER_KEY(markers.at(nextStart)) << "\n"; + } + */ + + if (! toFetch.empty()) { + triagens::basics::Json keysJson(triagens::basics::Json::Array, toFetch.size()); + + for (auto& it : toFetch) { + keysJson.add(triagens::basics::Json(static_cast(it))); + } + + auto const keyJsonString = triagens::basics::JsonHelper::toString(keysJson.json()); + + std::unique_ptr response(_client->request(HttpRequest::HTTP_REQUEST_PUT, + baseUrl + "/" + keysId + "?type=docs&chunk=" + std::to_string(currentChunkId) + "&chunkSize=" + std::to_string(chunkSize), + keyJsonString.c_str(), + keyJsonString.size())); + + if (response == nullptr || ! response->isComplete()) { + errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) + + ": " + _client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + StringBuffer& documentsData = response->getBody(); + + // parse keys + std::unique_ptr documentsJson(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, documentsData.c_str())); + + if (! TRI_IsArrayJson(documentsJson.get())) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + size_t const n = TRI_LengthArrayJson(documentsJson.get()); + + for (size_t i = 0; i < n; ++i) { + auto documentJson = static_cast(TRI_AtVector(&(documentsJson.get()->_value._objects), i)); + + if (! TRI_IsObjectJson(documentJson)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": document is no object"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + auto const keyJson = TRI_LookupObjectJson(documentJson, "_key"); + + if (! TRI_IsStringJson(keyJson)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": document key is invalid"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + auto const revJson = TRI_LookupObjectJson(documentJson, "_rev"); + + if (! TRI_IsStringJson(revJson)) { + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": document revision is invalid"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + std::string documentKey(keyJson->_value._string.data, keyJson->_value._string.length - 1); + + TRI_voc_rid_t rid = static_cast(StringUtils::uint64(revJson->_value._string.data)); + + TRI_shaped_json_t* shaped = TRI_ShapedJsonJson(shaper, documentJson, true); // PROTECTED by trx + + if (shaped == nullptr) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + TRI_doc_mptr_copy_t result; + + int res = TRI_ERROR_NO_ERROR; + TRI_document_edge_t* e = nullptr; + auto mptr = idx->lookupKey(documentKey.c_str()); + + if (mptr == nullptr) { + TRI_document_edge_t edge; + + if (isEdge) { + std::string const from = JsonHelper::getStringValue(documentJson, TRI_VOC_ATTRIBUTE_FROM, ""); + std::string const to = JsonHelper::getStringValue(documentJson, TRI_VOC_ATTRIBUTE_TO, ""); + + // parse _from + if (! DocumentHelper::parseDocumentId(*trx.resolver(), from.c_str(), edge._fromCid, &edge._fromKey)) { + res = TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD; + } + + // parse _to + if (! DocumentHelper::parseDocumentId(*trx.resolver(), to.c_str(), edge._toCid, &edge._toKey)) { + res = TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD; + } + + e = &edge; + } + else { + e = nullptr; + } + + // INSERT + if (res == TRI_ERROR_NO_ERROR) { + res = TRI_InsertShapedJsonDocumentCollection(trx.trxCollection(), (TRI_voc_key_t) documentKey.c_str(), rid, nullptr, &result, shaped, e, false, false, true); + } + } + else { + // UPDATE + res = TRI_UpdateShapedJsonDocumentCollection(trx.trxCollection(), (TRI_voc_key_t) documentKey.c_str(), rid, nullptr, &result, shaped, &policy, false, false); + } + + TRI_FreeShapedJson(shaper->memoryZone(), shaped); + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } + + } } } - // TODO: remove all keys that are below first remote key or beyond last remote key - return TRI_ERROR_NO_ERROR; } @@ -854,6 +1193,7 @@ std::cout << "RANGE DOES MATCH: " << (int) match << "\n"; int InitialSyncer::handleCollection (TRI_json_t const* parameters, TRI_json_t const* indexes, + bool incremental, string& errorMsg, sync_phase_e phase) { @@ -895,6 +1235,10 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, // ------------------------------------------------------------------------------------- if (phase == PHASE_DROP) { + if (incremental) { + return TRI_ERROR_NO_ERROR; + } + // first look up the collection by the cid TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid); @@ -968,6 +1312,20 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, string const progress = "creating " + collectionMsg; setProgress(progress.c_str()); + + if (incremental) { + col = TRI_LookupCollectionByIdVocBase(_vocbase, cid); + + if (col == nullptr && ! masterName.empty()) { + // not found, try name next + col = TRI_LookupCollectionByNameVocBase(_vocbase, masterName.c_str()); + } + + if (col != nullptr) { + // collection is already present + return TRI_ERROR_NO_ERROR; + } + } int res = createCollection(parameters, &col); @@ -1020,7 +1378,12 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, errorMsg = "unable to start transaction: " + string(TRI_errno_string(res)); } else { - res = handleCollectionDump(StringUtils::itoa(cid), trxCollection, masterName, _masterInfo._lastLogTick, errorMsg); + if (incremental && trx.documentCollection()->size() > 0) { + res = handleCollectionSync(StringUtils::itoa(cid), trx, masterName, _masterInfo._lastLogTick, errorMsg); + } + else { + res = handleCollectionDump(StringUtils::itoa(cid), trxCollection, masterName, _masterInfo._lastLogTick, errorMsg); + } } res = trx.finish(res); @@ -1091,56 +1454,6 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters, return res; } - // sync collection data incrementally - // ------------------------------------------------------------------------------------- - - else if (phase == PHASE_SYNC) { - string const progress = "syncing data for " + collectionMsg; - setProgress(progress.c_str()); - - TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid); - - if (col == nullptr && ! masterName.empty()) { - // not found, try name next - col = TRI_LookupCollectionByNameVocBase(_vocbase, masterName.c_str()); - } - - if (col == nullptr) { - errorMsg = "cannot sync: " + collectionMsg + " not found"; - - return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; - } - - int res = TRI_ERROR_INTERNAL; - - { - SingleCollectionWriteTransaction trx(new StandaloneTransactionContext(), _vocbase, col->_cid); - - res = trx.begin(); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "unable to start transaction: " + string(TRI_errno_string(res)); - - return res; - } - - TRI_transaction_collection_t* trxCollection = trx.trxCollection(); - - if (trxCollection == nullptr) { - res = TRI_ERROR_INTERNAL; - errorMsg = "unable to start transaction: " + string(TRI_errno_string(res)); - } - else { - res = handleCollectionSync(StringUtils::itoa(cid), trx.documentCollection(), trxCollection, masterName, _masterInfo._lastLogTick, errorMsg); - } - - res = trx.finish(res); - } - - return res; - } - - // we won't get here TRI_ASSERT(false); return TRI_ERROR_INTERNAL; @@ -1230,25 +1543,16 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, // ---------------------------------------------------------------------------------- // iterate over all collections from the master... - res = iterateCollections(collections, errorMsg, PHASE_VALIDATE); + res = iterateCollections(collections, incremental, errorMsg, PHASE_VALIDATE); if (res != TRI_ERROR_NO_ERROR) { return res; } - if (incremental) { - // STEP 2: sync collection data from master and create initial indexes - // ---------------------------------------------------------------------------------- - - return iterateCollections(collections, errorMsg, PHASE_SYNC); - } - - TRI_ASSERT(! incremental); - // STEP 2: drop collections locally if they are also present on the master (clean up) // ---------------------------------------------------------------------------------- - res = iterateCollections(collections, errorMsg, PHASE_DROP); + res = iterateCollections(collections, incremental, errorMsg, PHASE_DROP); if (res != TRI_ERROR_NO_ERROR) { return res; @@ -1257,7 +1561,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, // STEP 3: re-create empty collections locally // ---------------------------------------------------------------------------------- - res = iterateCollections(collections, errorMsg, PHASE_CREATE); + res = iterateCollections(collections, incremental, errorMsg, PHASE_CREATE); if (res != TRI_ERROR_NO_ERROR) { return res; @@ -1266,7 +1570,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, // STEP 4: sync collection data from master and create initial indexes // ---------------------------------------------------------------------------------- - return iterateCollections(collections, errorMsg, PHASE_DUMP); + return iterateCollections(collections, incremental, errorMsg, PHASE_DUMP); } //////////////////////////////////////////////////////////////////////////////// @@ -1274,6 +1578,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json, //////////////////////////////////////////////////////////////////////////////// int InitialSyncer::iterateCollections (std::vector> const& collections, + bool incremental, std::string& errorMsg, sync_phase_e phase) { std::string phaseMsg("starting phase " + translatePhase(phase) + " with " + std::to_string(collections.size()) + " collections"); @@ -1286,7 +1591,7 @@ int InitialSyncer::iterateCollections (std::vector&, std::string const&, TRI_voc_tick_t, std::string&); @@ -223,8 +220,7 @@ namespace triagens { int handleSyncKeys (std::string const&, std::string const&, - struct TRI_document_collection_t*, - struct TRI_transaction_collection_s*, + SingleCollectionWriteTransaction&, std::string const&, TRI_voc_tick_t, std::string&); @@ -235,6 +231,7 @@ namespace triagens { int handleCollection (struct TRI_json_t const*, struct TRI_json_t const*, + bool, std::string&, sync_phase_e); @@ -251,6 +248,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// int iterateCollections (std::vector> const&, + bool, std::string&, sync_phase_e); diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index d5c9a082e1..cd4986762e 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -166,6 +166,7 @@ HttpHandler::status_t RestReplicationHandler::execute () { else if (command == "keys") { if (type != HttpRequest::HTTP_REQUEST_GET && type != HttpRequest::HTTP_REQUEST_POST && + type != HttpRequest::HTTP_REQUEST_PUT && type != HttpRequest::HTTP_REQUEST_DELETE) { goto BAD_CALL; } @@ -180,6 +181,9 @@ HttpHandler::status_t RestReplicationHandler::execute () { else if (type == HttpRequest::HTTP_REQUEST_GET) { handleCommandGetKeys(); } + else if (type == HttpRequest::HTTP_REQUEST_PUT) { + handleCommandFetchKeys(); + } else if (type == HttpRequest::HTTP_REQUEST_DELETE) { handleCommandRemoveKeys(); } @@ -3183,6 +3187,9 @@ void RestReplicationHandler::handleCommandGetKeys () { if (chunkSize < 100) { chunkSize = DefaultChunkSize; } + else if (chunkSize > 20000) { + chunkSize = 20000; + } } std::string const& id = suffix[1]; @@ -3195,16 +3202,10 @@ void RestReplicationHandler::handleCommandGetKeys () { auto collectionKeysId = static_cast(triagens::basics::StringUtils::uint64(id)); - bool busy; - auto collectionKeys = keysRepository->find(collectionKeysId, busy); + auto collectionKeys = keysRepository->find(collectionKeysId); if (collectionKeys == nullptr) { - if (busy) { - generateError(HttpResponse::responseCode(TRI_ERROR_CURSOR_BUSY), TRI_ERROR_CURSOR_BUSY); // TODO: Fix error code - } - else { - generateError(HttpResponse::responseCode(TRI_ERROR_CURSOR_NOT_FOUND), TRI_ERROR_CURSOR_NOT_FOUND); // TODO: fix error code - } + generateError(HttpResponse::responseCode(TRI_ERROR_CURSOR_NOT_FOUND), TRI_ERROR_CURSOR_NOT_FOUND); // TODO: fix error code return; } @@ -3216,9 +3217,11 @@ void RestReplicationHandler::handleCommandGetKeys () { for (TRI_voc_tick_t from = 0; from < max; from += chunkSize) { TRI_voc_tick_t to = from + chunkSize; + if (to > max) { to = max; } + auto result = collectionKeys->hashChunk(from, to); triagens::basics::Json chunk(triagens::basics::Json::Object, 3); @@ -3250,6 +3253,110 @@ void RestReplicationHandler::handleCommandGetKeys () { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns date for a key range +//////////////////////////////////////////////////////////////////////////////// + +void RestReplicationHandler::handleCommandFetchKeys () { + std::vector const& suffix = _request->suffix(); + + if (suffix.size() != 2) { + generateError(HttpResponse::BAD, + TRI_ERROR_HTTP_BAD_PARAMETER, + "expecting PUT /_api/replication/keys/"); + return; + } + + static TRI_voc_tick_t const DefaultChunkSize = 5000; + TRI_voc_tick_t chunkSize = DefaultChunkSize; + + // determine chunk size + bool found; + char const* value = _request->value("chunkSize", found); + + if (found) { + chunkSize = static_cast(StringUtils::uint64(value)); + if (chunkSize < 100) { + chunkSize = DefaultChunkSize; + } + else if (chunkSize > 20000) { + chunkSize = 20000; + } + } + + value = _request->value("chunk", found); + + size_t chunk = 0; + + if (found) { + chunk = static_cast(StringUtils::uint64(value)); + } + + value = _request->value("type", found); + + bool keys = true; + if (strcmp(value, "keys") == 0) { + keys = true; + } + else if (strcmp(value, "docs") == 0) { + keys = false; + } + else { + generateError(HttpResponse::BAD, + TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid 'type' value"); + return; + } + + std::string const& id = suffix[1]; + + int res = TRI_ERROR_NO_ERROR; + + try { + auto keysRepository = static_cast(_vocbase->_collectionKeys); + TRI_ASSERT(keysRepository != nullptr); + + auto collectionKeysId = static_cast(triagens::basics::StringUtils::uint64(id)); + + auto collectionKeys = keysRepository->find(collectionKeysId); + + if (collectionKeys == nullptr) { + generateError(HttpResponse::responseCode(TRI_ERROR_CURSOR_NOT_FOUND), TRI_ERROR_CURSOR_NOT_FOUND); // TODO: fix error code + return; + } + + try { + triagens::basics::Json json(triagens::basics::Json::Array, chunkSize); + + if (keys) { + collectionKeys->dumpKeys(json, chunk, chunkSize); + } + else { + std::unique_ptr idsJson(parseJsonBody()); + collectionKeys->dumpDocs(json, chunk, chunkSize, idsJson.get()); + } + + collectionKeys->release(); + + generateResult(HttpResponse::OK, json.json()); + } + catch (...) { + collectionKeys->release(); + throw; + } + } + catch (triagens::basics::Exception const& ex) { + res = ex.code(); + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } + + if (res != TRI_ERROR_NO_ERROR) { + generateError(HttpResponse::SERVER_ERROR, res); + } +} + void RestReplicationHandler::handleCommandRemoveKeys () { std::vector const& suffix = _request->suffix(); diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index 0c527d1623..da683844d5 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -310,6 +310,12 @@ namespace triagens { void handleCommandGetKeys (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns date for a key range +//////////////////////////////////////////////////////////////////////////////// + + void handleCommandFetchKeys (); + //////////////////////////////////////////////////////////////////////////////// /// @brief remove a list of keys for a specific collection //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Utils/CollectionKeys.cpp b/arangod/Utils/CollectionKeys.cpp index cbda4ec1ed..2fc5ea7851 100644 --- a/arangod/Utils/CollectionKeys.cpp +++ b/arangod/Utils/CollectionKeys.cpp @@ -30,8 +30,10 @@ #include "Utils/CollectionKeys.h" #include "Basics/hashes.h" #include "Basics/JsonHelper.h" +#include "Basics/StringUtils.h" #include "Utils/CollectionGuard.h" #include "Utils/CollectionReadLocker.h" +#include "Utils/DocumentHelper.h" #include "Utils/transactions.h" #include "VocBase/compactor.h" #include "VocBase/Ditch.h" @@ -195,6 +197,125 @@ std::tuple CollectionKeys::hashChunk (size_t return std::make_tuple(first, last, hash); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief dumps keys into the JSON +//////////////////////////////////////////////////////////////////////////////// + +void CollectionKeys::dumpKeys (triagens::basics::Json& json, + size_t chunk, + size_t chunkSize) const { + size_t from = chunk * chunkSize; + size_t to = (chunk + 1) * chunkSize; + + if (to > _markers->size()) { + to = _markers->size(); + } + + if (from >= _markers->size() || from >= to || to == 0) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + } + + for (size_t i = from; i < to; ++i) { + auto marker = _markers->at(i); + + triagens::basics::Json array(triagens::basics::Json::Array, 2); + array.add(triagens::basics::Json(std::string(TRI_EXTRACT_MARKER_KEY(marker)))); + array.add(triagens::basics::Json(std::to_string(TRI_EXTRACT_MARKER_RID(marker)))); + + json.add(array); + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief dumps documents into the JSON +//////////////////////////////////////////////////////////////////////////////// + +void CollectionKeys::dumpDocs (triagens::basics::Json& json, + size_t chunk, + size_t chunkSize, + TRI_json_t const* ids) const { + if (! TRI_IsArrayJson(ids)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + } + + + auto shaper = _document->getShaper(); + CollectionNameResolver resolver(_vocbase); + + + size_t const n = TRI_LengthArrayJson(ids); + + for (size_t i = 0; i < n; ++i) { + auto valueJson = static_cast(TRI_AtVector(&ids->_value._objects, i)); + + if (! TRI_IsNumberJson(valueJson)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + } + + size_t position = chunk * chunkSize + static_cast(valueJson->_value._number); + + if (position >= _markers->size()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + } + + auto df = _markers->at(position); + + TRI_shaped_json_t shapedJson; + TRI_EXTRACT_SHAPED_JSON_MARKER(shapedJson, df); + + auto doc = TRI_JsonShapedJson(shaper, &shapedJson); + + if (! TRI_IsObjectJson(doc)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } + + char const* key = TRI_EXTRACT_MARKER_KEY(df); + TRI_json_t* keyJson = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, key, strlen(key)); + + if (keyJson != nullptr) { + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, doc, TRI_VOC_ATTRIBUTE_KEY, keyJson); + } + + // convert rid from uint64_t to string + std::string const&& rid = triagens::basics::StringUtils::itoa(TRI_EXTRACT_MARKER_RID(df)); + TRI_json_t* revJson = TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, rid.c_str(), rid.size()); + + if (revJson != nullptr) { + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, doc, TRI_VOC_ATTRIBUTE_REV, revJson); + } + + TRI_df_marker_type_t type = df->_type; + + if (type == TRI_DOC_MARKER_KEY_EDGE) { + TRI_doc_edge_key_marker_t const* marker = reinterpret_cast(df); + std::string const&& from = DocumentHelper::assembleDocumentId(resolver.getCollectionNameCluster(marker->_fromCid), std::string((char*) marker + marker->_offsetFromKey)); + std::string const&& to = DocumentHelper::assembleDocumentId(resolver.getCollectionNameCluster(marker->_toCid), std::string((char*) marker + marker->_offsetToKey)); + + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, doc, TRI_VOC_ATTRIBUTE_FROM, TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, from.c_str(), from.size())); + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, doc, TRI_VOC_ATTRIBUTE_TO, TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, to.c_str(), to.size())); + } + else if (type == TRI_WAL_MARKER_EDGE) { + triagens::wal::edge_marker_t const* marker = reinterpret_cast(df); // PROTECTED by trx passed from above + std::string const&& from = DocumentHelper::assembleDocumentId(resolver.getCollectionNameCluster(marker->_fromCid), std::string((char*) marker + marker->_offsetFromKey)); + std::string const&& to = DocumentHelper::assembleDocumentId(resolver.getCollectionNameCluster(marker->_toCid), std::string((char*) marker + marker->_offsetToKey)); + + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, doc, TRI_VOC_ATTRIBUTE_FROM, TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, from.c_str(), from.size())); + TRI_Insert3ObjectJson(TRI_UNKNOWN_MEM_ZONE, doc, TRI_VOC_ATTRIBUTE_TO, TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, to.c_str(), to.size())); + } + + json.transfer(doc); + } +} + +// ----------------------------------------------------------------------------- +// --SECTION-- END-OF-FILE +// ----------------------------------------------------------------------------- + +// Local Variables: +// mode: outline-minor +// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}" +// End: + // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // ----------------------------------------------------------------------------- diff --git a/arangod/Utils/CollectionKeys.h b/arangod/Utils/CollectionKeys.h index cf08f8ffe9..4b820a3344 100644 --- a/arangod/Utils/CollectionKeys.h +++ b/arangod/Utils/CollectionKeys.h @@ -31,6 +31,7 @@ #define ARANGODB_ARANGO_COLLECTION_KEYS_H 1 #include "Basics/Common.h" +#include "Basics/JsonHelper.h" #include "Utils/CollectionNameResolver.h" #include "VocBase/voc-types.h" @@ -122,6 +123,23 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// std::tuple hashChunk (size_t, size_t) const; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief dumps keys into the JSON +//////////////////////////////////////////////////////////////////////////////// + + void dumpKeys (triagens::basics::Json&, + size_t, + size_t) const; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief dumps documents into the JSON +//////////////////////////////////////////////////////////////////////////////// + + void dumpDocs (triagens::basics::Json&, + size_t, + size_t, + TRI_json_t const*) const; // ----------------------------------------------------------------------------- // --SECTION-- private variables diff --git a/arangod/Utils/CollectionKeysRepository.cpp b/arangod/Utils/CollectionKeysRepository.cpp index d83ee7d897..560ccd5cf1 100644 --- a/arangod/Utils/CollectionKeysRepository.cpp +++ b/arangod/Utils/CollectionKeysRepository.cpp @@ -158,10 +158,8 @@ bool CollectionKeysRepository::remove (CollectionKeysId id) { /// they must be returned later using release() //////////////////////////////////////////////////////////////////////////////// -CollectionKeys* CollectionKeysRepository::find (CollectionKeysId id, - bool& busy) { +CollectionKeys* CollectionKeysRepository::find (CollectionKeysId id) { triagens::arango::CollectionKeys* collectionKeys = nullptr; - busy = false; { MUTEX_LOCKER(_lock); @@ -180,11 +178,6 @@ CollectionKeys* CollectionKeysRepository::find (CollectionKeysId id, return nullptr; } - if (collectionKeys->isUsed()) { - busy = true; - return nullptr; - } - collectionKeys->use(); } diff --git a/arangod/Utils/CollectionKeysRepository.h b/arangod/Utils/CollectionKeysRepository.h index 34611d808f..521f711ce7 100644 --- a/arangod/Utils/CollectionKeysRepository.h +++ b/arangod/Utils/CollectionKeysRepository.h @@ -91,8 +91,7 @@ namespace triagens { /// it must be returned later using release() //////////////////////////////////////////////////////////////////////////////// - CollectionKeys* find (CollectionKeysId, - bool&); + CollectionKeys* find (CollectionKeysId); //////////////////////////////////////////////////////////////////////////////// /// @brief return a cursor