diff --git a/LES-TODOS b/LES-TODOS index 53d01b1d38..c5b88e46a9 100644 --- a/LES-TODOS +++ b/LES-TODOS @@ -18,11 +18,11 @@ done in progress ----------- - Rename OperationCursor->getMoreMptr => getMoreTokens, "returns" std::vector& +- move engine-specific parts of transaction.cpp into engine +- transaction API to do ----- -- move engine-specific parts of transaction.cpp into engine -- transaction API - check for illegal includes - fix includes during API conversion - DML API diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 1e1ddf358f..5594fdc557 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -23,6 +23,8 @@ //////////////////////////////////////////////////////////////////////////////// #include "AgencyComm.h" +#include "ApplicationFeatures/ApplicationServer.h" +#include "RestServer/ServerFeature.h" #include #ifdef DEBUG_SYNC_REPLICATION @@ -1322,6 +1324,18 @@ AgencyCommResult AgencyComm::sendWithFailover( // timeout exit startegy if (std::chrono::steady_clock::now() < timeOut) { if (tries > 0) { + auto serverFeature = + application_features::ApplicationServer::getFeature( + "Server"); + if (serverFeature->isStopping()) { + LOG_TOPIC(INFO, Logger::AGENCYCOMM) + << "Unsuccessful AgencyComm: Timeout because of shutdown " + << "errorCode: " << result.errorCode() + << " errorMessage: " << result.errorMessage() + << " errorDetails: " << result.errorDetails(); + return result; + } + std::this_thread::sleep_for(waitUntil-std::chrono::steady_clock::now()); if (waitInterval.count() == 0.0) { waitInterval = std::chrono::duration(0.25); diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 40a95fd458..5666ce9b7b 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -3517,6 +3517,11 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() { return; } + { + CONDITION_LOCKER(locker, _condVar); + _holdReadLockJobs.emplace(id, false); + } + auto trxContext = StandaloneTransactionContext::Create(_vocbase); SingleCollectionTransaction trx(trxContext, col->cid(), TRI_TRANSACTION_READ); trx.addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false); @@ -3530,7 +3535,16 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() { { CONDITION_LOCKER(locker, _condVar); - _holdReadLockJobs.insert(id); + auto it = _holdReadLockJobs.find(id); + if (it == _holdReadLockJobs.end()) { + // Entry has been removed since, so we cancel the whole thing + // right away and generate an error: + generateError(rest::ResponseCode::SERVER_ERROR, + TRI_ERROR_TRANSACTION_INTERNAL, + "read transaction was cancelled"); + return; + } + it->second = true; // mark the read lock as acquired } double now = TRI_microtime(); @@ -3588,6 +3602,8 @@ void RestReplicationHandler::handleCommandCheckHoldReadLockCollection() { } std::string id = idSlice.copyString(); + bool lockHeld = false; + { CONDITION_LOCKER(locker, _condVar); auto it = _holdReadLockJobs.find(id); @@ -3596,12 +3612,17 @@ void RestReplicationHandler::handleCommandCheckHoldReadLockCollection() { "no hold read lock job found for 'id'"); return; } + if (it->second) { + lockHeld = true; + } + } VPackBuilder b; { VPackObjectBuilder bb(&b); b.add("error", VPackValue(false)); + b.add("lockHeld", VPackValue(lockHeld)); } generateResult(rest::ResponseCode::OK, b.slice()); @@ -3633,11 +3654,19 @@ void RestReplicationHandler::handleCommandCancelHoldReadLockCollection() { } std::string id = idSlice.copyString(); + bool lockHeld = false; { CONDITION_LOCKER(locker, _condVar); auto it = _holdReadLockJobs.find(id); if (it != _holdReadLockJobs.end()) { + // Note that this approach works if the lock has been acquired + // as well as if we still wait for the read lock, in which case + // it will eventually be acquired but immediately released: + if (it->second) { + lockHeld = true; + } _holdReadLockJobs.erase(it); + _condVar.broadcast(); } } @@ -3645,6 +3674,7 @@ void RestReplicationHandler::handleCommandCancelHoldReadLockCollection() { { VPackObjectBuilder bb(&b); b.add("error", VPackValue(false)); + b.add("lockHeld", VPackValue(lockHeld)); } generateResult(rest::ResponseCode::OK, b.slice()); @@ -3677,4 +3707,4 @@ arangodb::basics::ConditionVariable RestReplicationHandler::_condVar; /// the flag is set of the ID of a job, the job is cancelled ////////////////////////////////////////////////////////////////////////////// -std::unordered_set RestReplicationHandler::_holdReadLockJobs; +std::unordered_map RestReplicationHandler::_holdReadLockJobs; diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index 69cfd6cde0..8795a96a98 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -366,13 +366,18 @@ class RestReplicationHandler : public RestVocbaseBaseHandler { static arangodb::basics::ConditionVariable _condVar; ////////////////////////////////////////////////////////////////////////////// - /// @brief global set of ids of holdReadLockCollection jobs, if - /// an id is removed here (under the protection of the mutex of - /// condVar) and a broadcast is sent, the job with that id is - /// terminated. + /// @brief global set of ids of holdReadLockCollection jobs, an + /// id mapping to false here indicates that a request to get the + /// read lock has been started, the bool is changed to true once + /// this read lock is acquired. To cancel the read lock, remove + /// the entry here (under the protection of the mutex of + /// condVar) and send a broadcast to the condition variable, + /// the job with that id is terminated. If it timeouts, then + /// the read lock is released automatically and the entry here + /// is deleted. ////////////////////////////////////////////////////////////////////////////// - static std::unordered_set _holdReadLockJobs; + static std::unordered_map _holdReadLockJobs; }; } diff --git a/arangod/RestServer/ServerFeature.cpp b/arangod/RestServer/ServerFeature.cpp index d601dba8d7..9f932fa412 100644 --- a/arangod/RestServer/ServerFeature.cpp +++ b/arangod/RestServer/ServerFeature.cpp @@ -190,6 +190,7 @@ void ServerFeature::beginShutdown() { std::string msg = ArangoGlobalContext::CONTEXT->binaryName() + " [shutting down]"; TRI_SetProcessTitle(msg.c_str()); + _isStopping = true; } void ServerFeature::waitForHeartbeat() { diff --git a/arangod/RestServer/ServerFeature.h b/arangod/RestServer/ServerFeature.h index 60a121a197..5e4ccf1c6a 100644 --- a/arangod/RestServer/ServerFeature.h +++ b/arangod/RestServer/ServerFeature.h @@ -47,6 +47,7 @@ class ServerFeature final : public application_features::ApplicationFeature { void validateOptions(std::shared_ptr) override final; void start() override final; void beginShutdown() override final; + bool isStopping() const { return _isStopping; } public: OperationMode operationMode() const { return _operationMode; } @@ -70,6 +71,7 @@ class ServerFeature final : public application_features::ApplicationFeature { uint32_t _vppMaxSize; int* _result; OperationMode _operationMode; + bool _isStopping = false; }; } diff --git a/arangod/Scheduler/Scheduler.cpp b/arangod/Scheduler/Scheduler.cpp index e9706bac46..7f4c8754aa 100644 --- a/arangod/Scheduler/Scheduler.cpp +++ b/arangod/Scheduler/Scheduler.cpp @@ -166,7 +166,12 @@ Scheduler::Scheduler(size_t nrThreads, size_t maxQueueSize) initializeSignalHandlers(); } -Scheduler::~Scheduler() { deleteOldThreads(); } +Scheduler::~Scheduler() { + if (_threadManager != nullptr) { + _threadManager->cancel(); + } + deleteOldThreads(); +} // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors diff --git a/arangod/Utils/Transaction.cpp b/arangod/Utils/Transaction.cpp index 0a2d9db663..3cbdff3b60 100644 --- a/arangod/Utils/Transaction.cpp +++ b/arangod/Utils/Transaction.cpp @@ -1823,6 +1823,25 @@ OperationResult Transaction::insertCoordinator(std::string const& collectionName } #endif +////////////////////////////////////////////////////////////////////////////// +/// @brief choose a timeout for synchronous replication, based on the +/// number of documents we ship over +////////////////////////////////////////////////////////////////////////////// + +static double chooseTimeout(size_t count) { + // We usually assume that a server can process at least 5000 documents + // per second (this is a low estimate), and use a low limit of 0.5s + // and a high timeout of 120s + double timeout = count / 5000; + if (timeout < 0.5) { + return 0.5; + } else if (timeout > 120) { + return 120.0; + } else { + return timeout; + } +} + ////////////////////////////////////////////////////////////////////////////// /// @brief create one or multiple documents in a collection, local /// the single-document variant of this operation will either succeed or, @@ -1980,7 +1999,7 @@ OperationResult Transaction::insertLocal(std::string const& collectionName, } auto cc = arangodb::ClusterComm::instance(); size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), nrDone, Logger::REPLICATION); if (nrGood < followers->size()) { // we drop all followers that were not successful: @@ -2274,6 +2293,7 @@ OperationResult Transaction::modifyLocal( }; VPackSlice ourResult = resultBuilder.slice(); + size_t count = 0; if (multiCase) { VPackArrayBuilder guard(&payload); VPackArrayIterator itValue(newValue); @@ -2282,6 +2302,7 @@ OperationResult Transaction::modifyLocal( TRI_ASSERT((*itResult).isObject()); if (!(*itResult).hasKey("error")) { doOneDoc(itValue.value(), itResult.value()); + count++; } itValue.next(); itResult.next(); @@ -2289,43 +2310,46 @@ OperationResult Transaction::modifyLocal( } else { VPackArrayBuilder guard(&payload); doOneDoc(newValue, ourResult); + count++; } - auto body = std::make_shared(); - *body = payload.slice().toJson(); + if (count > 0) { + auto body = std::make_shared(); + *body = payload.slice().toJson(); - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? - arangodb::rest::RequestType::PUT : - arangodb::rest::RequestType::PATCH, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, nrDone, - Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; - } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "modifyLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? + arangodb::rest::RequestType::PUT : + arangodb::rest::RequestType::PATCH, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "modifyLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } @@ -2523,6 +2547,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, }; VPackSlice ourResult = resultBuilder.slice(); + size_t count = 0; if (value.isArray()) { VPackArrayBuilder guard(&payload); VPackArrayIterator itValue(value); @@ -2531,6 +2556,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, TRI_ASSERT((*itResult).isObject()); if (!(*itResult).hasKey("error")) { doOneDoc(itValue.value(), itResult.value()); + count++; } itValue.next(); itResult.next(); @@ -2538,41 +2564,44 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, } else { VPackArrayBuilder guard(&payload); doOneDoc(value, ourResult); + count++; } - auto body = std::make_shared(); - *body = payload.slice().toJson(); + if (count > 0) { + auto body = std::make_shared(); + *body = payload.slice().toJson(); - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - arangodb::rest::RequestType::DELETE_REQ, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, nrDone, - Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; - } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "removeLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + arangodb::rest::RequestType::DELETE_REQ, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "removeLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index e7c2da8188..aef6350d2d 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -192,6 +192,7 @@ const optionsDefaults = { 'valgrindArgs': {}, 'valgrindHosts': false, 'verbose': false, + 'walFlushTimeout': 30000, 'writeXmlReport': true }; @@ -248,7 +249,7 @@ let LOGS_DIR; let UNITTESTS_DIR; let GDB_OUTPUT=""; -function makeResults (testname) { +function makeResults (testname, instanceInfo) { const startTime = time(); return function (status, message) { @@ -259,7 +260,7 @@ function makeResults (testname) { let result; try { - result = JSON.parse(fs.read('testresult.json')); + result = JSON.parse(fs.read(instanceInfo.rootDir + '/testresult.json')); if ((typeof result[0] === 'object') && result[0].hasOwnProperty('status')) { @@ -312,6 +313,7 @@ function makeArgsArangod (options, appDir, role) { return { 'configuration': 'etc/testing/' + config, 'define': 'TOP_DIR=' + TOP_DIR, + 'wal.flush-timeout': options.walFlushTimeout, 'javascript.app-path': appDir, 'http.trusted-origin': options.httpTrustedOrigin || 'all' }; @@ -691,9 +693,7 @@ function runThere (options, instanceInfo, file) { 'return runTest(' + JSON.stringify(file) + ', true' + mochaGrep + ');'; } - if (options.propagateInstanceInfo) { - testCode = 'global.instanceInfo = ' + JSON.stringify(instanceInfo) + ';\n' + testCode; - } + testCode = 'global.instanceInfo = ' + JSON.stringify(instanceInfo) + ';\n' + testCode; let httpOptions = makeAuthorizationHeaders(options); httpOptions.method = 'POST'; @@ -1078,12 +1078,12 @@ function runInArangosh (options, instanceInfo, file, addArgs) { if (addArgs !== undefined) { args = Object.assign(args, addArgs); } - fs.write('instanceinfo.json', JSON.stringify(instanceInfo)); + require('internal').env.INSTANCEINFO = JSON.stringify(instanceInfo); let rc = executeAndWait(ARANGOSH_BIN, toArgv(args), options); let result; try { - result = JSON.parse(fs.read('testresult.json')); + result = JSON.parse(fs.read(instanceInfo.rootDir + '/testresult.json')); } catch (x) { return rc; } @@ -1116,6 +1116,7 @@ function runArangoshCmd (options, instanceInfo, addArgs, cmds) { args = Object.assign(args, addArgs); } + require('internal').env.INSTANCEINFO = JSON.stringify(instanceInfo); const argv = toArgv(args).concat(cmds); return executeAndWait(ARANGOSH_BIN, argv, options); } @@ -3357,10 +3358,10 @@ testFuncs.recovery = function (options) { // ////////////////////////////////////////////////////////////////////////////// testFuncs.replication_ongoing = function (options) { - const mr = makeResults('replication'); - let master = startInstance('tcp', options, {}, 'master_ongoing'); + const mr = makeResults('replication', master); + if (master === false) { return mr(false, 'failed to start master!'); } @@ -3399,11 +3400,11 @@ testFuncs.replication_ongoing = function (options) { // ////////////////////////////////////////////////////////////////////////////// testFuncs.replication_static = function (options) { - const mr = makeResults('replication'); - let master = startInstance('tcp', options, { 'server.authentication': 'true' }, 'master_static'); + + const mr = makeResults('replication', master); if (master === false) { return mr(false, 'failed to start master!'); @@ -3455,9 +3456,9 @@ testFuncs.replication_static = function (options) { // ////////////////////////////////////////////////////////////////////////////// testFuncs.replication_sync = function (options) { - const mr = makeResults('replication'); let master = startInstance('tcp', options, {}, 'master_sync'); + const mr = makeResults('replication', master); if (master === false) { return mr(false, 'failed to start master!'); } diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index cac13a2687..47f092f1f0 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -56,25 +56,7 @@ function agencyTestSuite () { /// @brief the agency servers //////////////////////////////////////////////////////////////////////////////// - var count = 20; - while (true) { - if (require('fs').exists('instanceinfo.json')) { - var instanceInfoData = require('fs').read('instanceinfo.json'); - var instanceInfo; - try { - instanceInfo = JSON.parse(instanceInfoData); - break; - } catch (err) { - console.error('Failed to parse JSON: instanceinfo.json'); - console.error(instanceInfoData); - } - } - wait(1.0); - if (--count <= 0) { - throw 'peng'; - } - } - + var instanceInfo = JSON.parse(require('internal').env.INSTANCEINFO); var agencyServers = instanceInfo.arangods.map(arangod => { return arangod.url; }); diff --git a/js/common/modules/@arangodb/testrunner.js b/js/common/modules/@arangodb/testrunner.js index 9b605cc55d..9b2935cdbd 100644 --- a/js/common/modules/@arangodb/testrunner.js +++ b/js/common/modules/@arangodb/testrunner.js @@ -10,6 +10,10 @@ var runTest = require('jsunity').runTest, // ////////////////////////////////////////////////////////////////////////////// function runJSUnityTests (tests) { + let instanceinfo = JSON.parse(require('internal').env.INSTANCEINFO); + if (!instanceinfo) { + throw new Error('env.INSTANCEINFO was not set by caller!'); + } var result = true; var allResults = []; var failed = []; @@ -45,7 +49,7 @@ function runJSUnityTests (tests) { internal.wait(0); // force GC }); - require('fs').write('testresult.json', JSON.stringify(allResults)); + require('fs').write(instanceinfo.rootDir + '/testresult.json', JSON.stringify(allResults)); if (failed.length > 1) { print('The following ' + failed.length + ' test files produced errors: ', failed.join(', ')); diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 8f14c2685f..2f7cc1238c 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -65,7 +65,7 @@ var endpointToURL = function (endpoint) { function startReadLockOnLeader (endpoint, database, collName, timeout) { var url = endpointToURL(endpoint) + '/_db/' + database; var r = request({ url: url + '/_api/replication/holdReadLockCollection', - method: 'GET' }); + method: 'GET' }); if (r.status !== 200) { console.error('startReadLockOnLeader: Could not get ID for shard', collName, r); @@ -83,30 +83,47 @@ function startReadLockOnLeader (endpoint, database, collName, timeout) { var body = { 'id': id, 'collection': collName, 'ttl': timeout }; r = request({ url: url + '/_api/replication/holdReadLockCollection', - body: JSON.stringify(body), - method: 'POST', headers: {'x-arango-async': 'store'} }); + body: JSON.stringify(body), + method: 'POST', headers: {'x-arango-async': true} }); if (r.status !== 202) { console.error('startReadLockOnLeader: Could not start read lock for shard', collName, r); return false; } - var rr = r; // keep a copy var count = 0; while (++count < 20) { // wait for some time until read lock established: // Now check that we hold the read lock: r = request({ url: url + '/_api/replication/holdReadLockCollection', - body: JSON.stringify(body), - method: 'PUT' }); + body: JSON.stringify(body), method: 'PUT' }); if (r.status === 200) { - return id; + let ansBody = {}; + try { + ansBody = JSON.parse(r.body); + } catch (err) { + } + if (ansBody.lockHeld) { + return id; + } else { + console.debug('startReadLockOnLeader: Lock not yet acquired...'); + } + } else { + console.debug('startReadLockOnLeader: Do not see read lock yet...'); } - console.debug('startReadLockOnLeader: Do not see read lock yet...'); wait(0.5); } - var asyncJobId = rr.headers['x-arango-async-id']; - r = request({ url: url + '/_api/job/' + asyncJobId, body: '', method: 'PUT'}); - console.error('startReadLockOnLeader: giving up, async result:', r); + console.error('startReadLockOnLeader: giving up'); + try { + r = request({ url: url + '/_api/replication/holdReadLockCollection', + body: JSON.stringify({'id': id}), method: 'DELETE' }); + } catch (err2) { + console.error('startReadLockOnLeader: expection in cancel:', + JSON.stringify(err2)); + } + if (r.status !== 200) { + console.error('startReadLockOnLeader: cancelation error for shard', + collName, r); + } return false; } @@ -527,8 +544,7 @@ function synchronizeOneShard (database, shard, planId, leader) { 'syncCollectionFinalize:', err3); } finally { - if (!cancelReadLockOnLeader(ep, database, - lockJobId)) { + if (!cancelReadLockOnLeader(ep, database, lockJobId)) { console.error('synchronizeOneShard: read lock has timed out', 'for shard', shard); ok = false; @@ -539,7 +555,7 @@ function synchronizeOneShard (database, shard, planId, leader) { shard); } if (ok) { - console.debug('synchronizeOneShard: synchronization worked for shard', + console.info('synchronizeOneShard: synchronization worked for shard', shard); } else { throw 'Did not work for shard ' + shard + '.'; @@ -920,12 +936,14 @@ function updateCurrentForCollections(localErrors, current) { return payload; } - function makeDropCurrentEntryCollection(dbname, col, shard, trx) { - trx[0][curCollections + dbname + '/' + col + '/' + shard] = + function makeDropCurrentEntryCollection(dbname, col, shard) { + let trx = {}; + trx[curCollections + dbname + '/' + col + '/' + shard] = {op: 'delete'}; + return trx; } - let trx = [{}]; + let trx = {}; // Go through local databases and collections and add stuff to Current // as needed: @@ -946,7 +964,7 @@ function updateCurrentForCollections(localErrors, current) { let currentCollectionInfo = fetchKey(current, 'Collections', database, shardInfo.planId, shard); if (!_.isEqual(localCollectionInfo, currentCollectionInfo)) { - trx[0][curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name] = { + trx[curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name] = { op: 'set', new: localCollectionInfo, }; @@ -955,7 +973,7 @@ function updateCurrentForCollections(localErrors, current) { let currentServers = fetchKey(current, 'Collections', database, shardInfo.planId, shard, 'servers'); // we were previously leader and we are done resigning. update current and let supervision handle the rest if (Array.isArray(currentServers) && currentServers[0] === ourselves) { - trx[0][curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name + '/servers'] = { + trx[curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name + '/servers'] = { op: 'set', new: ['_' + ourselves].concat(db._collection(shardInfo.name).getFollowers()), }; @@ -993,8 +1011,7 @@ function updateCurrentForCollections(localErrors, current) { let cur = currentCollections[database][collection][shard]; if (!localCollections.hasOwnProperty(shard) && cur.servers[0] === ourselves) { - makeDropCurrentEntryCollection(database, collection, shard, - trx); + Object.assign(trx, makeDropCurrentEntryCollection(database, collection, shard)); } } } @@ -1104,8 +1121,9 @@ function migratePrimary(plan, current) { // diff current and local and prepare agency transactions or whatever // to update current. Will report the errors created locally to the agency let trx = updateCurrentForCollections(localErrors, current); - if (trx.length > 0 && Object.keys(trx[0]).length !== 0) { - trx[0][curVersion] = {op: 'increment'}; + if (Object.keys(trx).length > 0) { + trx[curVersion] = {op: 'increment'}; + trx = [trx]; // TODO: reduce timeout when we can: try { let res = global.ArangoAgency.write([trx]); @@ -1272,9 +1290,9 @@ function migrateAnyServer(plan, current) { // diff current and local and prepare agency transactions or whatever // to update current. will report the errors created locally to the agency let trx = updateCurrentForDatabases(localErrors, current.Databases); - if (Object.keys(trx).length !== 0) { + if (Object.keys(trx).length > 0) { + trx[curVersion] = {op: 'increment'}; trx = [trx]; - trx[0][curVersion] = {op: 'increment'}; // TODO: reduce timeout when we can: try { let res = global.ArangoAgency.write([trx]); diff --git a/js/server/modules/@arangodb/replication.js b/js/server/modules/@arangodb/replication.js index ace0b12a4e..fac2c75e0b 100644 --- a/js/server/modules/@arangodb/replication.js +++ b/js/server/modules/@arangodb/replication.js @@ -339,7 +339,7 @@ function syncCollectionFinalize (database, collname, from, config) { l = l.map(JSON.parse); } catch (err) { return {error: true, errorMessage: 'could not parse body', - response: chunk, exception: err}; + response: chunk, exception: err}; } console.debug('Applying chunk:', l); @@ -349,7 +349,7 @@ function syncCollectionFinalize (database, collname, from, config) { } } catch (err2) { return {error: true, errorMessage: 'could not replicate body ops', - response: chunk, exception: err2}; + response: chunk, exception: err2}; } if (chunk.headers['x-arango-replication-checkmore'] !== 'true') { break; // all done diff --git a/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js b/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js index 79ac4cce66..353403e8c4 100644 --- a/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js +++ b/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js @@ -740,7 +740,7 @@ describe('Cluster sync', function() { expect(db._collection('s100001').isLeader()).to.equal(true); }); }); - describe('Update current', function() { + describe('Update current database', function() { beforeEach(function() { db._databases().forEach(database => { if (database !== '_system') { @@ -854,4 +854,7 @@ describe('Cluster sync', function() { expect(result['/arango/Current/Databases/testi/repltest']).to.have.deep.property('new.error', false); }); }); + describe('Update current collection', function() { + + }); });