diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 40a95fd458..5666ce9b7b 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -3517,6 +3517,11 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() { return; } + { + CONDITION_LOCKER(locker, _condVar); + _holdReadLockJobs.emplace(id, false); + } + auto trxContext = StandaloneTransactionContext::Create(_vocbase); SingleCollectionTransaction trx(trxContext, col->cid(), TRI_TRANSACTION_READ); trx.addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false); @@ -3530,7 +3535,16 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() { { CONDITION_LOCKER(locker, _condVar); - _holdReadLockJobs.insert(id); + auto it = _holdReadLockJobs.find(id); + if (it == _holdReadLockJobs.end()) { + // Entry has been removed since, so we cancel the whole thing + // right away and generate an error: + generateError(rest::ResponseCode::SERVER_ERROR, + TRI_ERROR_TRANSACTION_INTERNAL, + "read transaction was cancelled"); + return; + } + it->second = true; // mark the read lock as acquired } double now = TRI_microtime(); @@ -3588,6 +3602,8 @@ void RestReplicationHandler::handleCommandCheckHoldReadLockCollection() { } std::string id = idSlice.copyString(); + bool lockHeld = false; + { CONDITION_LOCKER(locker, _condVar); auto it = _holdReadLockJobs.find(id); @@ -3596,12 +3612,17 @@ void RestReplicationHandler::handleCommandCheckHoldReadLockCollection() { "no hold read lock job found for 'id'"); return; } + if (it->second) { + lockHeld = true; + } + } VPackBuilder b; { VPackObjectBuilder bb(&b); b.add("error", VPackValue(false)); + b.add("lockHeld", VPackValue(lockHeld)); } generateResult(rest::ResponseCode::OK, b.slice()); @@ -3633,11 +3654,19 @@ void RestReplicationHandler::handleCommandCancelHoldReadLockCollection() { } std::string id = idSlice.copyString(); + bool lockHeld = false; { CONDITION_LOCKER(locker, _condVar); auto it = _holdReadLockJobs.find(id); if (it != _holdReadLockJobs.end()) { + // Note that this approach works if the lock has been acquired + // as well as if we still wait for the read lock, in which case + // it will eventually be acquired but immediately released: + if (it->second) { + lockHeld = true; + } _holdReadLockJobs.erase(it); + _condVar.broadcast(); } } @@ -3645,6 +3674,7 @@ void RestReplicationHandler::handleCommandCancelHoldReadLockCollection() { { VPackObjectBuilder bb(&b); b.add("error", VPackValue(false)); + b.add("lockHeld", VPackValue(lockHeld)); } generateResult(rest::ResponseCode::OK, b.slice()); @@ -3677,4 +3707,4 @@ arangodb::basics::ConditionVariable RestReplicationHandler::_condVar; /// the flag is set of the ID of a job, the job is cancelled ////////////////////////////////////////////////////////////////////////////// -std::unordered_set RestReplicationHandler::_holdReadLockJobs; +std::unordered_map RestReplicationHandler::_holdReadLockJobs; diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index 69cfd6cde0..8795a96a98 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -366,13 +366,18 @@ class RestReplicationHandler : public RestVocbaseBaseHandler { static arangodb::basics::ConditionVariable _condVar; ////////////////////////////////////////////////////////////////////////////// - /// @brief global set of ids of holdReadLockCollection jobs, if - /// an id is removed here (under the protection of the mutex of - /// condVar) and a broadcast is sent, the job with that id is - /// terminated. + /// @brief global set of ids of holdReadLockCollection jobs, an + /// id mapping to false here indicates that a request to get the + /// read lock has been started, the bool is changed to true once + /// this read lock is acquired. To cancel the read lock, remove + /// the entry here (under the protection of the mutex of + /// condVar) and send a broadcast to the condition variable, + /// the job with that id is terminated. If it timeouts, then + /// the read lock is released automatically and the entry here + /// is deleted. ////////////////////////////////////////////////////////////////////////////// - static std::unordered_set _holdReadLockJobs; + static std::unordered_map _holdReadLockJobs; }; } diff --git a/arangod/Utils/Transaction.cpp b/arangod/Utils/Transaction.cpp index a2bd243167..33528c3a85 100644 --- a/arangod/Utils/Transaction.cpp +++ b/arangod/Utils/Transaction.cpp @@ -1824,6 +1824,25 @@ OperationResult Transaction::insertCoordinator(std::string const& collectionName } #endif +////////////////////////////////////////////////////////////////////////////// +/// @brief choose a timeout for synchronous replication, based on the +/// number of documents we ship over +////////////////////////////////////////////////////////////////////////////// + +static double chooseTimeout(size_t count) { + // We usually assume that a server can process at least 5000 documents + // per second (this is a low estimate), and use a low limit of 0.5s + // and a high timeout of 120s + double timeout = count / 5000; + if (timeout < 0.5) { + return 0.5; + } else if (timeout > 120) { + return 120.0; + } else { + return timeout; + } +} + ////////////////////////////////////////////////////////////////////////////// /// @brief create one or multiple documents in a collection, local /// the single-document variant of this operation will either succeed or, @@ -1981,7 +2000,7 @@ OperationResult Transaction::insertLocal(std::string const& collectionName, } auto cc = arangodb::ClusterComm::instance(); size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), nrDone, Logger::REPLICATION); if (nrGood < followers->size()) { // we drop all followers that were not successful: @@ -2275,6 +2294,7 @@ OperationResult Transaction::modifyLocal( }; VPackSlice ourResult = resultBuilder.slice(); + size_t count = 0; if (multiCase) { VPackArrayBuilder guard(&payload); VPackArrayIterator itValue(newValue); @@ -2283,6 +2303,7 @@ OperationResult Transaction::modifyLocal( TRI_ASSERT((*itResult).isObject()); if (!(*itResult).hasKey("error")) { doOneDoc(itValue.value(), itResult.value()); + count++; } itValue.next(); itResult.next(); @@ -2290,43 +2311,46 @@ OperationResult Transaction::modifyLocal( } else { VPackArrayBuilder guard(&payload); doOneDoc(newValue, ourResult); + count++; } - auto body = std::make_shared(); - *body = payload.slice().toJson(); + if (count > 0) { + auto body = std::make_shared(); + *body = payload.slice().toJson(); - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? - arangodb::rest::RequestType::PUT : - arangodb::rest::RequestType::PATCH, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, nrDone, - Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; - } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "modifyLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? + arangodb::rest::RequestType::PUT : + arangodb::rest::RequestType::PATCH, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "modifyLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } @@ -2524,6 +2548,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, }; VPackSlice ourResult = resultBuilder.slice(); + size_t count = 0; if (value.isArray()) { VPackArrayBuilder guard(&payload); VPackArrayIterator itValue(value); @@ -2532,6 +2557,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, TRI_ASSERT((*itResult).isObject()); if (!(*itResult).hasKey("error")) { doOneDoc(itValue.value(), itResult.value()); + count++; } itValue.next(); itResult.next(); @@ -2539,41 +2565,44 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, } else { VPackArrayBuilder guard(&payload); doOneDoc(value, ourResult); + count++; } - auto body = std::make_shared(); - *body = payload.slice().toJson(); + if (count > 0) { + auto body = std::make_shared(); + *body = payload.slice().toJson(); - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - arangodb::rest::RequestType::DELETE_REQ, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, nrDone, - Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; - } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "removeLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + arangodb::rest::RequestType::DELETE_REQ, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "removeLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 8f14c2685f..e594b6aaed 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -65,7 +65,7 @@ var endpointToURL = function (endpoint) { function startReadLockOnLeader (endpoint, database, collName, timeout) { var url = endpointToURL(endpoint) + '/_db/' + database; var r = request({ url: url + '/_api/replication/holdReadLockCollection', - method: 'GET' }); + method: 'GET' }); if (r.status !== 200) { console.error('startReadLockOnLeader: Could not get ID for shard', collName, r); @@ -83,30 +83,47 @@ function startReadLockOnLeader (endpoint, database, collName, timeout) { var body = { 'id': id, 'collection': collName, 'ttl': timeout }; r = request({ url: url + '/_api/replication/holdReadLockCollection', - body: JSON.stringify(body), - method: 'POST', headers: {'x-arango-async': 'store'} }); + body: JSON.stringify(body), + method: 'POST', headers: {'x-arango-async': true} }); if (r.status !== 202) { console.error('startReadLockOnLeader: Could not start read lock for shard', collName, r); return false; } - var rr = r; // keep a copy var count = 0; while (++count < 20) { // wait for some time until read lock established: // Now check that we hold the read lock: r = request({ url: url + '/_api/replication/holdReadLockCollection', - body: JSON.stringify(body), - method: 'PUT' }); + body: JSON.stringify(body), method: 'PUT' }); if (r.status === 200) { - return id; + let ansBody = {}; + try { + ansBody = JSON.parse(r.body); + } catch (err) { + } + if (ansBody.lockHeld) { + return id; + } else { + console.debug('startReadLockOnLeader: Lock not yet acquired...'); + } + } else { + console.debug('startReadLockOnLeader: Do not see read lock yet...'); } - console.debug('startReadLockOnLeader: Do not see read lock yet...'); wait(0.5); } - var asyncJobId = rr.headers['x-arango-async-id']; - r = request({ url: url + '/_api/job/' + asyncJobId, body: '', method: 'PUT'}); - console.error('startReadLockOnLeader: giving up, async result:', r); + console.error('startReadLockOnLeader: giving up'); + try { + r = request({ url: url + '/_api/replication/holdReadLockCollection', + body: JSON.stringify({'id': id}), method: 'DELETE' }); + } catch (err2) { + console.error('startReadLockOnLeader: expection in cancel:', + JSON.stringify(err2)); + } + if (r.status !== 200) { + console.error('startReadLockOnLeader: cancelation error for shard', + collName, r); + } return false; } @@ -527,8 +544,7 @@ function synchronizeOneShard (database, shard, planId, leader) { 'syncCollectionFinalize:', err3); } finally { - if (!cancelReadLockOnLeader(ep, database, - lockJobId)) { + if (!cancelReadLockOnLeader(ep, database, lockJobId)) { console.error('synchronizeOneShard: read lock has timed out', 'for shard', shard); ok = false; @@ -539,7 +555,7 @@ function synchronizeOneShard (database, shard, planId, leader) { shard); } if (ok) { - console.debug('synchronizeOneShard: synchronization worked for shard', + console.info('synchronizeOneShard: synchronization worked for shard', shard); } else { throw 'Did not work for shard ' + shard + '.'; diff --git a/js/server/modules/@arangodb/replication.js b/js/server/modules/@arangodb/replication.js index ace0b12a4e..fac2c75e0b 100644 --- a/js/server/modules/@arangodb/replication.js +++ b/js/server/modules/@arangodb/replication.js @@ -339,7 +339,7 @@ function syncCollectionFinalize (database, collname, from, config) { l = l.map(JSON.parse); } catch (err) { return {error: true, errorMessage: 'could not parse body', - response: chunk, exception: err}; + response: chunk, exception: err}; } console.debug('Applying chunk:', l); @@ -349,7 +349,7 @@ function syncCollectionFinalize (database, collname, from, config) { } } catch (err2) { return {error: true, errorMessage: 'could not replicate body ops', - response: chunk, exception: err2}; + response: chunk, exception: err2}; } if (chunk.headers['x-arango-replication-checkmore'] !== 'true') { break; // all done