diff --git a/arangod/Replication/ReplicationFetcher.cpp b/arangod/Replication/ReplicationFetcher.cpp index cf05929c21..d18d9db3a9 100644 --- a/arangod/Replication/ReplicationFetcher.cpp +++ b/arangod/Replication/ReplicationFetcher.cpp @@ -50,6 +50,8 @@ using namespace triagens::basics; using namespace triagens::rest; using namespace triagens::arango; using namespace triagens::httpclient; + +#define LOGGER_REPLICATION LOGGER_INFO // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors @@ -487,10 +489,12 @@ int ReplicationFetcher::getMasterState (string& errorMsg) { } map headers; + const string url = "/_api/replication/state"; // send request + LOGGER_REPLICATION("fetching master state from " << url); SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, - "/_api/replication/state", + url, 0, 0, headers); @@ -525,6 +529,12 @@ int ReplicationFetcher::getMasterState (string& errorMsg) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } + else { + res = TRI_ERROR_REPLICATION_INVALID_RESPONSE; + + errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + + ": invalid JSON"; + } } // std::cout << response->getBody().str() << std::endl; } @@ -544,10 +554,12 @@ int ReplicationFetcher::getMasterInventory (string& errorMsg) { } map headers; + const string url = "/_api/replication/inventory"; // send request + LOGGER_REPLICATION("fetching master inventory from " << url); SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, - "/_api/replication/inventory", + url, 0, 0, headers); @@ -603,8 +615,6 @@ int ReplicationFetcher::getMasterInventory (string& errorMsg) { int ReplicationFetcher::handleCollectionDump (TRI_transaction_collection_t* trxCollection, TRI_voc_tick_t maxTick, string& errorMsg) { - static const uint64_t chunkSize = 2 * 1024 * 1024; - if (_client == 0) { return TRI_ERROR_INTERNAL; } @@ -612,7 +622,7 @@ int ReplicationFetcher::handleCollectionDump (TRI_transaction_collection_t* trxC const string baseUrl = "/_api/replication/dump" "?collection=" + StringUtils::itoa(trxCollection->_cid) + - "&chunkSize=" + StringUtils::itoa(chunkSize); + "&chunkSize=" + StringUtils::itoa(getChunkSize()); map headers; @@ -620,11 +630,12 @@ int ReplicationFetcher::handleCollectionDump (TRI_transaction_collection_t* trxC uint64_t markerCount = 0; while (true) { - string url = baseUrl + - "&from=" + StringUtils::itoa(fromTick) + - "&to=" + StringUtils::itoa(maxTick); + const string url = baseUrl + + "&from=" + StringUtils::itoa(fromTick) + + "&to=" + StringUtils::itoa(maxTick); // send request + LOGGER_REPLICATION("fetching master collection dump from " << url); SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, url, 0, @@ -875,6 +886,39 @@ int ReplicationFetcher::handleCollectionInitial (TRI_json_t const* parameters, } + if (res == TRI_ERROR_NO_ERROR) { + // now create indexes + const size_t n = indexes->_value._objects._length; + + if (n > 0) { + LOGGER_INFO("creating indexes for collection '" << masterName->_value._string.data << "', id " << cid); + + for (size_t i = 0; i < n; ++i) { + TRI_json_t const* idxDef = (TRI_json_t const*) TRI_AtVector(&indexes->_value._objects, i); + TRI_index_t* idx = 0; + + // {"id":"229907440927234","type":"hash","unique":false,"fields":["x","Y"]} + + res = TRI_FromJsonIndexDocumentCollection((TRI_document_collection_t*) trxCollection->_collection->_collection, idxDef, &idx); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "could not create index: " + string(TRI_errno_string(res)); + break; + } + else { + assert(idx != 0); + + res = TRI_SaveIndex((TRI_primary_collection_t*) trxCollection->_collection->_collection, idx); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "could not save index: " + string(TRI_errno_string(res)); + break; + } + } + } + } + } + if (res == TRI_ERROR_NO_ERROR) { TRI_CommitTransaction(trx, 0); } @@ -883,54 +927,8 @@ int ReplicationFetcher::handleCollectionInitial (TRI_json_t const* parameters, return res; } + - // create indexes - // ------------------------------------------------------------------------------------- - - else if (phase == PHASE_INDEXES) { - - const size_t n = indexes->_value._objects._length; - int res = TRI_ERROR_NO_ERROR; - - if (n > 0) { - LOGGER_INFO("creating indexes for collection '" << masterName->_value._string.data << "', id " << cid); - - TRI_vocbase_col_t* col = TRI_UseCollectionByIdVocBase(_vocbase, cid); - - if (col == 0 || col->_collection == 0) { - return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; - } - - for (size_t i = 0; i < n; ++i) { - TRI_json_t const* idxDef = (TRI_json_t const*) TRI_AtVector(&indexes->_value._objects, i); - TRI_index_t* idx = 0; - - // {"id":"229907440927234","type":"hash","unique":false,"fields":["x","Y"]} - - res = TRI_FromJsonIndexDocumentCollection((TRI_document_collection_t*) col->_collection, idxDef, &idx); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "could not create index: " + string(TRI_errno_string(res)); - break; - } - else { - assert(idx != 0); - - res = TRI_SaveIndex((TRI_primary_collection_t*) col->_collection, idx); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "could not save index: " + string(TRI_errno_string(res)); - break; - } - } - } - - TRI_ReleaseCollectionVocBase(_vocbase, col); - } - - return res; - } - // we won't get here assert(false); return TRI_ERROR_INTERNAL; @@ -1107,7 +1105,7 @@ int ReplicationFetcher::handleInventoryResponse (TRI_json_t const* json, } - // STEP 4: sync collection data from master + // STEP 4: sync collection data from master and create initial indexes // ---------------------------------------------------------------------------------- res = iterateCollections(collections, errorMsg, PHASE_DATA); @@ -1117,15 +1115,6 @@ int ReplicationFetcher::handleInventoryResponse (TRI_json_t const* json, } - // STEP 5: create indexes - // ---------------------------------------------------------------------------------- - - res = iterateCollections(collections, errorMsg, PHASE_INDEXES); - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - return TRI_ERROR_NO_ERROR; } @@ -1183,19 +1172,18 @@ int ReplicationFetcher::runContinuous (string& errorMsg) { return TRI_ERROR_INTERNAL; } - static const uint64_t chunkSize = 2 * 1024 * 1024; - - const string baseUrl = "/_api/replication/log" - "?chunkSize=" + StringUtils::itoa(chunkSize); + const string baseUrl = "/_api/replication/follow" + "?chunkSize=" + StringUtils::itoa(getChunkSize()); map headers; - TRI_voc_tick_t fromTick = 0; + TRI_voc_tick_t fromTick = _masterInfo._state._lastTick; while (true) { - string url = baseUrl + "&from=" + StringUtils::itoa(fromTick); + const string url = baseUrl + "&from=" + StringUtils::itoa(fromTick); // send request + LOGGER_REPLICATION("fetching master log from " << url); SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET, url, 0, @@ -1229,6 +1217,8 @@ int ReplicationFetcher::runContinuous (string& errorMsg) { int res; bool checkMore = false; + bool active = false; + bool found; string header = response->getHeaderField(TRI_REPLICATION_HEADER_CHECKMORE, found); @@ -1241,6 +1231,11 @@ int ReplicationFetcher::runContinuous (string& errorMsg) { errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) + ": header '" TRI_REPLICATION_HEADER_CHECKMORE "' is missing"; } + + header = response->getHeaderField(TRI_REPLICATION_HEADER_ACTIVE, found); + if (found) { + active = StringUtils::boolean(header); + } if (checkMore) { header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTFOUND, found); @@ -1275,7 +1270,12 @@ int ReplicationFetcher::runContinuous (string& errorMsg) { if (! checkMore || fromTick == 0) { // nothing to do. sleep before we poll again - sleep(1); + if (active) { + sleep(1); + } + else { + sleep(10); + } } } @@ -1283,6 +1283,16 @@ int ReplicationFetcher::runContinuous (string& errorMsg) { return TRI_ERROR_INTERNAL; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief get chunk size for a transfer +//////////////////////////////////////////////////////////////////////////////// + +uint64_t ReplicationFetcher::getChunkSize () const { + static const uint64_t chunkSize = 4 * 1024 * 1024; + + return chunkSize; +} + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Replication/ReplicationFetcher.h b/arangod/Replication/ReplicationFetcher.h index ce0c0745ac..24a2217dbe 100644 --- a/arangod/Replication/ReplicationFetcher.h +++ b/arangod/Replication/ReplicationFetcher.h @@ -82,8 +82,7 @@ namespace triagens { PHASE_VALIDATE, PHASE_DROP, PHASE_CREATE, - PHASE_DATA, - PHASE_INDEXES + PHASE_DATA } setup_phase_e; @@ -240,6 +239,12 @@ namespace triagens { int runContinuous (string&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief get chunk size for a transfer +//////////////////////////////////////////////////////////////////////////////// + + uint64_t getChunkSize () const; + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/replication.c b/arangod/VocBase/replication.c index 79032bd1e3..0d3bbf4f38 100644 --- a/arangod/VocBase/replication.c +++ b/arangod/VocBase/replication.c @@ -1634,9 +1634,13 @@ int TRI_StopReplicationLogger (TRI_replication_logger_t* logger) { int TRI_StateReplicationLogger (TRI_replication_logger_t* logger, TRI_replication_log_state_t* state) { + TRI_vocbase_t* vocbase; int res; res = TRI_ERROR_NO_ERROR; + vocbase = logger->_vocbase; + + TRI_WriteLockReadWriteLock(&vocbase->_objectLock); TRI_WriteLockReadWriteLock(&logger->_statusLock); @@ -1650,6 +1654,8 @@ int TRI_StateReplicationLogger (TRI_replication_logger_t* logger, } TRI_WriteUnlockReadWriteLock(&logger->_statusLock); + + TRI_WriteUnlockReadWriteLock(&vocbase->_objectLock); return res; }