diff --git a/arangod/Replication/ReplicationFetcher.cpp b/arangod/Replication/ReplicationFetcher.cpp index e0094a3e71..ca9e20dc26 100644 --- a/arangod/Replication/ReplicationFetcher.cpp +++ b/arangod/Replication/ReplicationFetcher.cpp @@ -182,6 +182,49 @@ int ReplicationFetcher::run () { /// @{ //////////////////////////////////////////////////////////////////////////////// +int ReplicationFetcher::applyCollectionDump (TRI_voc_cid_t cid, + SimpleHttpResult* response, + string& errorMsg) { + + std::stringstream& data = response->getBody(); + + while (true) { + string line; + + std::getline(data, line, '\n'); + + if (line.size() < 2) { + return TRI_ERROR_NO_ERROR; + } + + TRI_json_t* json = TRI_JsonString(TRI_CORE_MEM_ZONE, line.c_str()); + + if (! JsonHelper::isArray(json)) { + if (json != 0) { + TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + } + + errorMsg = "received invalid JSON data for collection " + StringUtils::itoa(cid); + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + TRI_json_t* type = JsonHelper::getArrayElement(json, "type"); + + if (! JsonHelper::isString(type)) { + TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + + errorMsg = "received invalid JSON data for collection " + StringUtils::itoa(cid); + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + +// std::cout << "type: " << type->_value._string.data << "\n"; + + TRI_FreeJson(TRI_CORE_MEM_ZONE, json); + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief get local replication apply state //////////////////////////////////////////////////////////////////////////////// @@ -333,17 +376,25 @@ int ReplicationFetcher::getMasterInventory (string& errorMsg) { int ReplicationFetcher::handleCollectionDump (TRI_voc_cid_t cid, TRI_voc_tick_t maxTick, string& errorMsg) { + static const uint64_t chunkSize = 1024 * 1024; + if (_client == 0) { return TRI_ERROR_INTERNAL; } - const string baseUrl = "/_api/replication/dump?collection=" + StringUtils::itoa(cid); + + const string baseUrl = "/_api/replication/dump" + "?collection=" + StringUtils::itoa(cid) + + "&chunkSize=" + StringUtils::itoa(chunkSize); + map headers; TRI_voc_tick_t fromTick = 0; while (true) { - string url = baseUrl + "&from=" + StringUtils::itoa(fromTick) + "&to=" + StringUtils::itoa(maxTick); + string url = baseUrl + + "&from=" + StringUtils::itoa(fromTick) + + "&to=" + StringUtils::itoa(maxTick); // send request SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, @@ -405,13 +456,15 @@ int ReplicationFetcher::handleCollectionDump (TRI_voc_cid_t cid, } } -std::cout << "GOT: " << response->getBody().str().c_str() << "\n\n\n"; + int res = applyCollectionDump(cid, response, errorMsg); delete response; - if (! hasMore || fromTick == 0) { + if (res != TRI_ERROR_NO_ERROR || + ! hasMore || + fromTick == 0) { // done - return TRI_ERROR_NO_ERROR; + return res; } } @@ -508,6 +561,8 @@ int ReplicationFetcher::handleCollectionInitial (TRI_json_t const* json, } else if (phase == PHASE_DATA) { int res; + + LOGGER_INFO("syncing data for collection '" << masterName->_value._string.data << "', id " << id); res = handleCollectionDump(id, _masterInfo._state._lastTick, errorMsg); diff --git a/arangod/Replication/ReplicationFetcher.h b/arangod/Replication/ReplicationFetcher.h index dc10a678f4..9b2f70e768 100644 --- a/arangod/Replication/ReplicationFetcher.h +++ b/arangod/Replication/ReplicationFetcher.h @@ -46,6 +46,7 @@ namespace triagens { namespace httpclient { class GeneralClientConnection; class SimpleHttpClient; + class SimpleHttpResult; } namespace rest { @@ -145,6 +146,16 @@ namespace triagens { /// @{ //////////////////////////////////////////////////////////////////////////////// + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief apply the data from a collection dump +//////////////////////////////////////////////////////////////////////////////// + + int applyCollectionDump (TRI_voc_cid_t, + httpclient::SimpleHttpResult*, + string&); + //////////////////////////////////////////////////////////////////////////////// /// @brief get local replication apply state //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/ShapedJson/shaped-json.c b/lib/ShapedJson/shaped-json.c index 9141c14076..88906214c7 100644 --- a/lib/ShapedJson/shaped-json.c +++ b/lib/ShapedJson/shaped-json.c @@ -1781,13 +1781,7 @@ static bool StringifyJsonShapeDataArray (TRI_shaper_t* shaper, return false; } - res = TRI_AppendCharStringBuffer(buffer, '"'); - - if (res != TRI_ERROR_NO_ERROR) { - return false; - } - - res = TRI_AppendCharStringBuffer(buffer, ':'); + res = TRI_AppendString2StringBuffer(buffer, "\":", 2); if (res != TRI_ERROR_NO_ERROR) { return false; @@ -1866,13 +1860,7 @@ static bool StringifyJsonShapeDataArray (TRI_shaper_t* shaper, return false; } - res = TRI_AppendCharStringBuffer(buffer, '"'); - - if (res != TRI_ERROR_NO_ERROR) { - return false; - } - - res = TRI_AppendCharStringBuffer(buffer, ':'); + res = TRI_AppendString2StringBuffer(buffer, "\":", 2); if (res != TRI_ERROR_NO_ERROR) { return false;