diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 502d62939a..6a3bca70ff 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -926,14 +926,14 @@ void Agent::detectActiveAgentFailures() { void Agent::beginShutdown() { Thread::beginShutdown(); + // Stop constituent and key value stores + _constituent.beginShutdown(); + // Stop supervision if (_config.supervision()) { _supervision.beginShutdown(); } - // Stop constituent and key value stores - _constituent.beginShutdown(); - // Stop inception process if (_inception != nullptr) { _inception->beginShutdown(); @@ -954,31 +954,6 @@ void Agent::beginShutdown() { } } - int counter = 0; - while (_spearhead.isRunning() || _readDB.isRunning()) { - usleep(100000); - // emit warning after 5 seconds - if (++counter == 10 * 5) { - LOG_TOPIC(WARN, Logger::AGENCY) << "waiting for key-value threads to finish"; - } - } - - while (_constituent.isRunning()) { - usleep(100000); - // emit warning after 5 seconds - if (++counter == 10 * 5) { - LOG_TOPIC(WARN, Logger::AGENCY) << "waiting for constituent thread to finish"; - } - } - - while (_supervision.isRunning()) { - usleep(100000); - // emit warning after 5 seconds - if (++counter == 10 * 5) { - LOG_TOPIC(WARN, Logger::AGENCY) << "waiting for supervision thread to finish"; - } - } - // Wake up all waiting rest handlers { CONDITION_LOCKER(guardW, _waitForCV); diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 38a690d104..d3c3f19679 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -46,6 +46,41 @@ const agencyOperations = { 'increment' : {'op' : 'increment'} }; +// good enough isEqual which does not depend on equally sorted objects as _.isEqual +var isEqual = function(object, other) { + if (typeof object !== typeof other) { + return false; + } + + if (typeof object !== 'object') { + // explicitly use non strict equal + // eslint-disable-next-line eqeqeq + return object == other; + } + + if (Array.isArray(object)) { + return object.every((value, index) => { + return isEqual(value, other[index]); + }); + } else if (object === null) { + return other === null; + } + + let myKeys = Object.keys(object); + let otherKeys = Object.keys(other); + + if (myKeys.length !== otherKeys.length) { + return false; + } + + return myKeys.every(key => { + if (!isEqual(object[key], other[key])) { + return false; + } + return true; + }); +}; + var endpointToURL = function (endpoint) { if (endpoint.substr(0, 6) === 'ssl://') { return 'https://' + endpoint.substr(6); @@ -251,25 +286,6 @@ function getShardMap (plannedCollections) { return shardMap; } -// ///////////////////////////////////////////////////////////////////////////// -// / @brief return the indexes of a collection as a map -// ///////////////////////////////////////////////////////////////////////////// - -function getIndexMap (shard) { - var indexes = { }, i; - var idx = arangodb.db._collection(shard).getIndexes(); - - for (i = 0; i < idx.length; ++i) { - // fetch id without collection name - var id = idx[i].id.replace(/^[a-zA-Z0-9_\-]*?\/([0-9]+)$/, '$1'); - - idx[i].id = id; - indexes[id] = idx[i]; - } - - return indexes; -} - // ///////////////////////////////////////////////////////////////////////////// // / @brief return a hash with the local databases // ///////////////////////////////////////////////////////////////////////////// @@ -613,6 +629,65 @@ function scheduleOneShardSynchronization (database, shard, planId, leader) { return true; } +function createIndexes(collection, plannedIndexes) { + let existingIndexes = collection.getIndexes(); + + let localId = plannedIndex => { + return collection.name() + '/' + plannedIndex.id; + }; + + let clusterId = existingIndex => { + return existingIndex.split('/')[1]; + }; + + let findExisting = index => { + return existingIndexes.filter(existingIndex => { + return existingIndex.id === localId(index); + })[0]; + }; + let errors = plannedIndexes.reduce((errors, plannedIndex) => { + if (plannedIndex.type !== 'primary' && plannedIndex.type !== 'edge') { + if (findExisting(plannedIndex)) { + return errors; + } + try { + console.debug("creating index '%s/%s': %s", + collection._dbName, + collection.name(), + JSON.stringify(plannedIndex)); + collection.ensureIndex(plannedIndex); + } catch (e) { + errors[plannedIndex.id] = { + errorNum: e.errorNum, + errorMessage: e.errorMessage, + }; + }; + } + return errors; + }, {}); + + let indexesToDelete = existingIndexes.filter(index => { + if (index.type === 'primary' || index.type === 'edge') { + return false; + } + return plannedIndexes.filter(plannedIndex => { + return localId(plannedIndex) === index.id; + }).length === 0; + }); + return indexesToDelete.reduce((errors, index) => { + console.debug("dropping index '%s/%s': %s", + collection._dbName, + collection.name(), + JSON.stringify(index)); + if (!collection.dropIndex(index)) { + errors[clusterId(index)] = { + errorNum: 4, + errorMessage: 'could not delete index locally', + }; + } + }, errors); +} + // ///////////////////////////////////////////////////////////////////////////// // / @brief executePlanForCollections // ///////////////////////////////////////////////////////////////////////////// @@ -624,6 +699,19 @@ function executePlanForCollections(plannedCollections) { let db = require('internal').db; db._useDatabase('_system'); + let createShardError = function(errors, database, planId, shardName) { + if (Object.keys(errors).length > 0) { + let fullError = {}; + fullError[shardName] = { + info: {database, planId, shardName}, + }; + fullError = Object.assign(fullError, errors); + return fullError; + } else { + return errors; + } + }; + let localDatabases = getLocalDatabases(); // Create shards in Plan that are not there locally: Object.keys(plannedCollections).forEach(database => { @@ -637,55 +725,58 @@ function executePlanForCollections(plannedCollections) { let collections = plannedCollections[database]; // diff the collections - Object.keys(collections).forEach(function (collection) { - let collInfo = collections[collection]; - let shards = collInfo.shards; + Object.keys(collections).forEach(function (collectionName) { + let collectionInfo = collections[collectionName]; + let shards = collectionInfo.shards; - collInfo.planId = collInfo.id; - Object.keys(shards).forEach(shard => { - if (shards[shard].indexOf(ourselves) >= 0) { - let shouldBeLeader = shards[shard][0] === ourselves; + collectionInfo.planId = collectionInfo.id; + localErrors = Object.keys(shards).reduce((errors, shardName) => { + let shardErrors = {}; + let plannedServers = shards[shardName]; + if (plannedServers.indexOf(ourselves) >= 0) { + let shouldBeLeader = plannedServers[0] === ourselves; - // found a shard we are responsible for - localErrors[shard] = { error: false, errorNum: 0, - errorMessage: 'no error', indexes: {} }; - - let error = localErrors[shard]; let collectionStatus; - if (!localCollections.hasOwnProperty(shard)) { + let collection; + if (!localCollections.hasOwnProperty(shardName)) { // must create this shard - console.debug("creating local shard '%s/%s' for central '%s/%s'", + console.info("creating local shard '%s/%s' for central '%s/%s'", database, - shard, + shardName, database, - collInfo.planId); + collectionInfo.planId); - let save = {id: collInfo.id, name: collInfo.name}; - delete collInfo.id; // must not - delete collInfo.name; + let save = {id: collectionInfo.id, name: collectionInfo.name}; + delete collectionInfo.id; // must not + delete collectionInfo.name; try { - if (collInfo.type === ArangoCollection.TYPE_EDGE) { - db._createEdgeCollection(shard, collInfo); + if (collectionInfo.type === ArangoCollection.TYPE_EDGE) { + db._createEdgeCollection(shardName, collectionInfo); } else { - db._create(shard, collInfo); + db._create(shardName, collectionInfo); } } catch (err2) { - error = { error: true, errorNum: err2.errorNum, - errorMessage: err2.errorMessage }; + shardErrors.collection = { + errorNum: err2.errorNum, + errorMessage: err2.errorMessage, + }; console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s", database, - shard, + shardName, database, - collInfo.planId, + collectionInfo.planId, JSON.stringify(err2)); + return Object.assign(errors, createShardError(shardErrors, database, collectionInfo.planId, shardName)); } - collInfo.id = save.id; - collInfo.name = save.name; + collectionInfo.id = save.id; + collectionInfo.name = save.name; + collection = db._collection(shardName); if (shouldBeLeader) { - db._collection(shard).assumeLeadership(); + collection.assumeLeadership(); } collectionStatus = ArangoCollection.STATUS_LOADED; } else { + collection = db._collection(shardName); // We adjust local leadership, note that the planned resignation // case is not handled here, since then ourselves does not appear // in shards[shard] but only "_" + ourselves. @@ -695,23 +786,23 @@ function executePlanForCollections(plannedCollections) { // "_" + ourselves. See below under "Drop local shards" // to see the proper handling of this case. Place is marked // with *** in comments. - if (!shouldBeLeader && localCollections[shard].isLeader) { - db._collection(shard).leaderResign(); + if (!shouldBeLeader && localCollections[shardName].isLeader) { + collection.leaderResign(); } else if (shouldBeLeader && - !localCollections[shard].isLeader) { - db._collection(shard).assumeLeadership(); + !localCollections[shardName].isLeader) { + collection.assumeLeadership(); } - collectionStatus = localCollections[shard].status; + collectionStatus = localCollections[shardName].status; // collection exists, now compare collection properties let cmp = [ 'journalSize', 'waitForSync', 'doCompact', 'indexBuckets' ]; let properties = cmp.reduce((obj, key) => { - if (localCollections[shard][key] !== collInfo[key]) { + if (localCollections[shardName][key] !== collectionInfo[key]) { // property change - obj[key] = collInfo[key]; + obj[key] = collectionInfo[key]; } return obj; }, {}); @@ -719,103 +810,49 @@ function executePlanForCollections(plannedCollections) { if (Object.keys(properties).length > 0) { console.info("updating properties for local shard '%s/%s'", database, - shard); + shardName); try { - db._collection(shard).properties(properties); + collection.properties(properties); } catch (err3) { - error = { error: true, errorNum: err3.errorNum, - errorMessage: err3.errorMessage }; + shardErrors.collection = { + errorNum: err3.errorNum, + errorMessage: err3.errorMessage, + }; + return Object.assign(errors, createShardError(shardErrors, database, collectionInfo.planId, shardName)); } } } - if (error.error) { - return; // No point to look for indices, if the - // creation has not worked - } // Now check whether the status is OK: - if (collectionStatus !== collInfo.status) { + if (collectionStatus !== collectionInfo.status) { console.info("detected status change for local shard '%s/%s'", database, - shard); + shardName); - if (collInfo.status === ArangoCollection.STATUS_UNLOADED) { + if (collectionInfo.status === ArangoCollection.STATUS_UNLOADED) { console.info("unloading local shard '%s/%s'", database, - shard); - db._collection(shard).unload(); - } else if (collInfo.status === ArangoCollection.STATUS_LOADED) { + shardName); + collection.unload(); + } else if (collectionInfo.status === ArangoCollection.STATUS_LOADED) { console.info("loading local shard '%s/%s'", database, - shard); - db._collection(shard).load(); + shardName); + collection.load(); } } - let indexes = getIndexMap(shard); - let idx; - let index; - - if (collInfo.hasOwnProperty('indexes')) { - for (let i = 0; i < collInfo.indexes.length; ++i) { - index = collInfo.indexes[i]; - - if (index.type !== 'primary' && index.type !== 'edge' && - !indexes.hasOwnProperty(index.id)) { - console.debug("creating index '%s/%s': %s", - database, - shard, - JSON.stringify(index)); - try { - arangodb.db._collection(shard).ensureIndex(index); - - } catch (err5) { - error.indexes[index.id] = { - id: index.id, - error: true, - errorNum: err5.errorNum, - errorMessage: err5.errorMessage - }; - } - } - } - - for (idx in indexes) { - if (indexes.hasOwnProperty(idx)) { - // found an index in the index map, check if it must be deleted - - if (indexes[idx].type !== 'primary' && indexes[idx].type !== 'edge') { - let found = false; - for (let i = 0; i < collInfo.indexes.length; ++i) { - if (collInfo.indexes[i].id === idx) { - found = true; - break; - } - } - - if (!found) { - // found an index to delete locally - index = indexes[idx]; - - console.info("dropping index '%s/%s': %s", - database, - shard, - JSON.stringify(index)); - - arangodb.db._collection(shard).dropIndex(index); - - delete indexes[idx]; - } - } - } - } + let indexErrors = createIndexes(collection, collectionInfo.indexes || {}); + if (indexErrors > 0) { + shardErrors.indexErrors = indexErrors; } } - }); + return Object.assign(errors, createShardError(shardErrors, database, collectionInfo.planId, shardName)); + }, localErrors); }); } catch(e) { - console.debug("Got error executing plan", e, e.stack); + console.error("Got error executing plan", e, e.stack); } finally { // always return to previous database db._useDatabase('_system'); @@ -902,15 +939,18 @@ function updateCurrentForCollections(localErrors, currentCollections) { let database; function assembleLocalCollectionInfo(info, error) { + if (error.collection) { + return { + error: true, + errorMessage: error.collection.errorMessage, + errorNum: error.collection.errorNum, + indexes: [], + servers: [ourselves], + }; + } let coll = db._collection(info.name); - let payload = { - error: error.error || false, - errorMessage: error.errorMessage || '', - errorNum: error.errorNum || 0, - }; - let indexErrors = fetchKey(error, 'indexes') || {}; - payload.indexes = coll.getIndexes().map(index => { + let indexes = coll.getIndexes().map(index => { let agencyIndex = {}; Object.assign(agencyIndex, index); // Fix up the IDs of the indexes: @@ -920,20 +960,28 @@ function updateCurrentForCollections(localErrors, currentCollections) { } else { agencyIndex.id = index.id; } - - if (indexErrors[agencyIndex.id] !== undefined) { - Object.assign(agencyIndex, indexErrors[agencyIndex.id]); - delete error.indexes[agencyIndex.id]; - } return agencyIndex; }); - // add the remaining errors which do not have a local id - Object.keys(indexErrors).forEach(indexId => { - payload.indexes.push(error.indexes[indexId]); - }); - - payload.servers = [ourselves].concat(coll.getFollowers()); - return payload; + + if (error.indexErrors) { + indexes = indexes.concat(Object.keys(error.indexErrors).map(id => { + let indexError = error.indexErrors[id]; + return { + id, + error: true, + errorMessage: indexError.errorMessage, + errorNum: indexError.errorNum, + }; + })); + } + + return { + error: false, + errorMessage: '', + errorNum: 0, + indexes, + servers: [ourselves].concat(coll.getFollowers()), + }; } function makeDropCurrentEntryCollection(dbname, col, shard) { @@ -963,7 +1011,7 @@ function updateCurrentForCollections(localErrors, currentCollections) { let localCollectionInfo = assembleLocalCollectionInfo(shardInfo, localErrors[shard] || {}); let currentCollectionInfo = fetchKey(currentCollections, database, shardInfo.planId, shard); - if (!_.isEqual(localCollectionInfo, currentCollectionInfo)) { + if (!isEqual(localCollectionInfo, currentCollectionInfo)) { trx[curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name] = { op: 'set', new: localCollectionInfo, @@ -979,6 +1027,8 @@ function updateCurrentForCollections(localErrors, currentCollections) { }; } } + // mark error as handled in any case + delete localErrors[shard]; } } } catch (e) { @@ -994,7 +1044,7 @@ function updateCurrentForCollections(localErrors, currentCollections) { for (database in currentCollections) { if (currentCollections.hasOwnProperty(database)) { if (localDatabases.hasOwnProperty(database)) { - // If a database has vanished locally, it is not our job to + // If a database has vanished locally, it is not our job to // remove it in Current, that is what `updateCurrentForDatabases` // does. db._useDatabase(database); @@ -1024,6 +1074,16 @@ function updateCurrentForCollections(localErrors, currentCollections) { } } } + trx = Object.keys(localErrors).reduce((trx, shardName) => { + let error = localErrors[shardName]; + if (error.collection) { + trx[curCollections + error.info.database + '/' + error.info.planId + '/' + error.info.shardName] = { + op: 'set', + new: error.collection, + }; + } + return trx; + }, trx); return trx; } @@ -1159,7 +1219,7 @@ function migratePrimary(plan, current) { // plan...can NOT be sure that the plan was completely executed // may react on the errors that have been created syncReplicatedShardsWithLeaders(plan, current, localErrors); -} +} // ///////////////////////////////////////////////////////////////////////////// // / @brief executePlanForDatabases @@ -1913,34 +1973,48 @@ function supervisionState () { // / @brief wait for synchronous replication to settle // ///////////////////////////////////////////////////////////////////////////// +function checkForSyncReplOneCollection (dbName, collName) { + try { + let cinfo = global.ArangoClusterInfo.getCollectionInfo(dbName, collName); + let shards = Object.keys(cinfo.shards); + let ccinfo = shards.map(function (s) { + return global.ArangoClusterInfo.getCollectionInfoCurrent(dbName, + collName, s).servers; + }); + console.debug('checkForSyncReplOneCollection:', dbName, collName, shards, + cinfo.shards, ccinfo); + let ok = true; + for (let i = 0; i < shards.length; ++i) { + if (cinfo.shards[shards[i]].length !== ccinfo[i].length) { + ok = false; + } + } + if (ok) { + console.debug('checkForSyncReplOneCollection: OK:', dbName, collName, + shards); + return true; + } + console.debug('checkForSyncReplOneCollection: not yet:', dbName, collName, + shards); + return false; + } catch (err) { + console.error('checkForSyncReplOneCollection: exception:', dbName, collName, + JSON.stringify(err)); + } + return false; +} + function waitForSyncReplOneCollection (dbName, collName) { console.debug('waitForSyncRepl:', dbName, collName); - try { - var count = 60; - while (--count > 0) { - var cinfo = global.ArangoClusterInfo.getCollectionInfo(dbName, collName); - var shards = Object.keys(cinfo.shards); - var ccinfo = shards.map(function (s) { - return global.ArangoClusterInfo.getCollectionInfoCurrent(dbName, - collName, s).servers; - }); - console.debug('waitForSyncRepl', dbName, collName, shards, cinfo.shards, ccinfo); - var ok = true; - for (var i = 0; i < shards.length; ++i) { - if (cinfo.shards[shards[i]].length !== ccinfo[i].length) { - ok = false; - } - } - if (ok) { - console.debug('waitForSyncRepl: OK:', dbName, collName, shards); - return true; - } - require('internal').wait(1); + let count = 60; + while (--count > 0) { + let ok = checkForSyncReplOneCollection(dbName, collName); + if (ok) { + return true; } - } catch (err) { - console.warn('waitForSyncRepl:', dbName, collName, ': exception', JSON.stringify(err)); + require('internal').wait(1); } - console.warn('waitForSyncRepl:', dbName, collName, ': BAD'); + console.warn('waitForSyncReplOneCollection:', dbName, collName, ': BAD'); return false; } @@ -1948,11 +2022,24 @@ function waitForSyncRepl (dbName, collList) { if (!isCoordinator()) { return true; } - var ok = true; - for (var i = 0; i < collList.length; ++i) { - ok = waitForSyncReplOneCollection(dbName, collList[i].name()) && ok; + let n = collList.length; + let count = 10 * n; // wait for up to 10 * collList.length seconds + let ok = [...Array(n)].map(v => false); + while (--count > 0) { + let allOk = true; + for (var i = 0; i < n; ++i) { + if (!ok[i]) { + ok[i] = checkForSyncReplOneCollection(dbName, collList[i].name()); + allOk = allOk && ok[i]; + } + } + if (allOk) { + return true; + } + require('internal').wait(1); } - return ok; + console.warn('waitForSyncRepl: timeout:', dbName, collList); + return false; } exports.bootstrapDbServers = bootstrapDbServers; diff --git a/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js b/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js index c15e1349b7..ed8074236e 100644 --- a/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js +++ b/js/server/tests/cluster-sync/cluster-sync-test-noncluster-spec.js @@ -939,6 +939,26 @@ describe('Cluster sync', function() { .that.has.property('new') .with.deep.equal(["_repltest"]); }); + it('should report newly assumed leadership for which we were a follower previously and remove any leaders and followers (these have to reregister themselves separateley)', function() { + let props = { planId: '888111' }; + let collection = db._create('testi', props); + collection.assumeLeadership(); + let current = { + testung: { + 888111: { + testi : { "error" : false, "errorMessage" : "", "errorNum" : 0, "indexes" : [ { "id" : "0", "type" : "primary", "fields" : [ "_key" ], "selectivityEstimate" : 1, "unique" : true, "sparse" : false } ], "servers" : [ "bogus-old-leader", "repltest", "useless-follower" ] } + }, + } + }; + let result = cluster.updateCurrentForCollections({}, current); + expect(result).to.be.an('object'); + expect(Object.keys(result)).to.have.lengthOf(1); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.have.property('op', 'set'); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.has.deep.property('new.servers') + .with.deep.equal(["repltest"]); + }); it('should delete any collections for which we are not a leader locally', function() { let current = { testung: { @@ -953,5 +973,84 @@ describe('Cluster sync', function() { expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') .that.has.deep.property('op', 'delete'); }); + it('should report newly created indices', function() { + let props = { planId: '888111' }; + let collection = db._create('testi', props); + collection.assumeLeadership(); + collection.ensureIndex({"type": "hash", "fields": ["hund"]}); + let current = { + testung: { + 888111: { + testi : { "error" : false, "errorMessage" : "", "errorNum" : 0, "indexes" : [ { "id" : "0", "type" : "primary", "fields" : [ "_key" ], "selectivityEstimate" : 1, "unique" : true, "sparse" : false } ], "servers" : [ "repltest" ] } + }, + } + }; + let result = cluster.updateCurrentForCollections({}, current); + expect(result).to.be.an('object'); + expect(Object.keys(result)).to.have.lengthOf(1); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.have.property('op', 'set'); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.has.deep.property('new.indexes') + .with.lengthOf(2); + }); + it('should report collection errors', function() { + let errors = { + 'testi': { + collection: { + 'name': 'testi', + 'error': true, + 'errorNum': 666, + 'errorMessage': 'the number of the beast :S', + }, + info: { + database: 'testung', + planId: '888111', + shardName: 'testi', + } + } + }; + let result = cluster.updateCurrentForCollections(errors, {}); + expect(result).to.be.an('object'); + expect(Object.keys(result)).to.have.lengthOf(1); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.have.property('op', 'set'); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.has.deep.property('new.error') + .equals(true); + }); + it('should report index errors', function() { + let current = { + testung: { + 888111: { + testi : { "error" : false, "errorMessage" : "", "errorNum" : 0, "indexes" : [ { "id" : "0", "type" : "primary", "fields" : [ "_key" ], "selectivityEstimate" : 1, "unique" : true, "sparse" : false } ], "servers" : [ "repltest" ] } + }, + } + }; + + let errors = { + 'testi': { + 'indexErrors': { + 1: { + 'id': 1, + 'error': true, + 'errorNum': 666, + 'errorMessage': 'the number of the beast :S', + } + }, + } + }; + let props = { planId: '888111' }; + let collection = db._create('testi', props); + collection.assumeLeadership(); + let result = cluster.updateCurrentForCollections(errors, current); + expect(result).to.be.an('object'); + expect(Object.keys(result)).to.have.lengthOf(1); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.have.property('op', 'set'); + expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi') + .that.has.deep.property('new.indexes.1.error') + .equals(true); + }); }); });