diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 9c5e60ad71..88f3a8cba1 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -942,7 +942,7 @@ void ClusterInfo::loadCurrentCollections() { _currentCollections.swap(newCollections); _shardIds.swap(newShardIds); _currentCollectionsProt.version++; // such that others notice our change - _currentCollectionsProt.isValid = true; // will never be reset to false + _currentCollectionsProt.isValid = true; } return; } @@ -2506,6 +2506,14 @@ std::vector ClusterInfo::getCurrentCoordinators() { return result; } +////////////////////////////////////////////////////////////////////////////// +/// @brief invalidate current +////////////////////////////////////////////////////////////////////////////// +void ClusterInfo::invalidateCurrent() { + WRITE_LOCKER(writeLocker, _currentCollectionsProt.lock); + _currentCollectionsProt.isValid = false; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief get information about current followers of a shard. //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index e77c00677b..3bca2ffa47 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -810,6 +810,12 @@ class ClusterInfo { std::vector getCurrentCoordinators(); + ////////////////////////////////////////////////////////////////////////////// + /// @brief invalidate current + ////////////////////////////////////////////////////////////////////////////// + + void invalidateCurrent(); + private: ////////////////////////////////////////////////////////////////////////////// /// @brief actually clears a list of planned databases diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index dcdd91fb37..024a00c3e8 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -261,6 +261,8 @@ void HeartbeatThread::runCoordinator() { // last value of plan which we have noticed: uint64_t lastPlanVersionNoticed = 0; + // last value of current which we have noticed: + uint64_t lastCurrentVersionNoticed = 0; // value of Sync/Commands/my-id at startup uint64_t lastCommandIndex = getLastCommandIndex(); @@ -362,6 +364,28 @@ void HeartbeatThread::runCoordinator() { } } + result = _agency.getValues("Current/Version", false); + if (result.successful()) { + result.parse("", false); + + std::map::iterator it = + result._values.begin(); + + if (it != result._values.end()) { + // there is a plan version + uint64_t currentVersion = + arangodb::basics::VelocyPackHelper::stringUInt64( + it->second._vpack->slice()); + + if (currentVersion > lastCurrentVersionNoticed) { + lastCurrentVersionNoticed = currentVersion; + + ClusterInfo::instance()->invalidateCurrent(); + } + } + } + + if (shouldSleep) { double remain = interval - (TRI_microtime() - start); diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index f7f3f8ff58..c25f1848ab 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -557,250 +557,266 @@ function createLocalCollections (plannedCollections, planVersion, takeOverRespon payload); }; - // mop: just a function alias but this way one at least knows what it is supposed to do :S var takeOver = createCollectionAgency; var db = require("internal").db; db._useDatabase("_system"); - var localDatabases = getLocalDatabases(); - var database; - var i; - // iterate over all matching databases - for (database in plannedCollections) { - if (plannedCollections.hasOwnProperty(database)) { - if (localDatabases.hasOwnProperty(database)) { - // save old database name - var previousDatabase = db._name(); - // switch into other database - db._useDatabase(database); + var migrate = writeLocked => { + var localDatabases = getLocalDatabases(); + var database; + var i; - try { - // iterate over collections of database - var localCollections = getLocalCollections(); + // iterate over all matching databases + for (database in plannedCollections) { + if (plannedCollections.hasOwnProperty(database)) { + if (localDatabases.hasOwnProperty(database)) { + // save old database name + var previousDatabase = db._name(); + // switch into other database + db._useDatabase(database); - var collections = plannedCollections[database]; + try { + // iterate over collections of database + var localCollections = getLocalCollections(); - // diff the collections - Object.keys(collections).forEach(function(collection) { - var collInfo = collections[collection]; - var shards = collInfo.shards; - var shard; + var collections = plannedCollections[database]; - collInfo.planId = collInfo.id; - var save = [collInfo.id, collInfo.name]; - delete collInfo.id; // must not actually set it here - delete collInfo.name; // name is now shard + // diff the collections + Object.keys(collections).forEach(function(collection) { + var collInfo = collections[collection]; + var shards = collInfo.shards; + var shard; - for (shard in shards) { - if (shards.hasOwnProperty(shard)) { - var didWrite = false; - if (shards[shard][0] === ourselves) { - // found a shard we are responsible for + collInfo.planId = collInfo.id; + var save = [collInfo.id, collInfo.name]; + delete collInfo.id; // must not actually set it here + delete collInfo.name; // name is now shard - var error = { error: false, errorNum: 0, - errorMessage: "no error" }; + for (shard in shards) { + if (shards.hasOwnProperty(shard)) { + var didWrite = false; + if (shards[shard][0] === ourselves) { + // found a shard we are responsible for - if (! localCollections.hasOwnProperty(shard)) { - // must create this shard - console.info("creating local shard '%s/%s' for central '%s/%s'", - database, - shard, - database, - collInfo.planId); + var error = { error: false, errorNum: 0, + errorMessage: "no error" }; - try { - if (collInfo.type === ArangoCollection.TYPE_EDGE) { - db._createEdgeCollection(shard, collInfo); - } - else { - db._create(shard, collInfo); - } - } - catch (err2) { - error = { error: true, errorNum: err2.errorNum, - errorMessage: err2.errorMessage }; - console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s", - database, - shard, - database, - collInfo.planId, - JSON.stringify(err2)); - } - - writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); - didWrite = true; - } - else { - if (localCollections[shard].status !== collInfo.status) { - console.info("detected status change for local shard '%s/%s'", - database, - shard); - - if (collInfo.status === ArangoCollection.STATUS_UNLOADED) { - console.info("unloading local shard '%s/%s'", - database, - shard); - db._collection(shard).unload(); - } - else if (collInfo.status === ArangoCollection.STATUS_LOADED) { - console.info("loading local shard '%s/%s'", - database, - shard); - db._collection(shard).load(); - } - writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); - didWrite = true; - } - - // collection exists, now compare collection properties - var properties = { }; - var cmp = [ "journalSize", "waitForSync", "doCompact", - "indexBuckets" ]; - for (i = 0; i < cmp.length; ++i) { - var p = cmp[i]; - if (localCollections[shard][p] !== collInfo[p]) { - // property change - properties[p] = collInfo[p]; - } - } - - if (Object.keys(properties).length > 0) { - console.info("updating properties for local shard '%s/%s'", - database, - shard); + if (! localCollections.hasOwnProperty(shard)) { + // must create this shard + console.info("creating local shard '%s/%s' for central '%s/%s'", + database, + shard, + database, + collInfo.planId); try { - db._collection(shard).properties(properties); + if (collInfo.type === ArangoCollection.TYPE_EDGE) { + db._createEdgeCollection(shard, collInfo); + } + else { + db._create(shard, collInfo); + } } - catch (err3) { - error = { error: true, errorNum: err3.errorNum, - errorMessage: err3.errorMessage }; + catch (err2) { + error = { error: true, errorNum: err2.errorNum, + errorMessage: err2.errorMessage }; + console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s", + database, + shard, + database, + collInfo.planId, + JSON.stringify(err2)); } + writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); + createCollectionAgency, + [ database, shard, collInfo, error ]); didWrite = true; } - } + else { + if (localCollections[shard].status !== collInfo.status) { + console.info("detected status change for local shard '%s/%s'", + database, + shard); - if (error.error) { - if (takeOverResponsibility && !didWrite) { - writeLocked({ part: "Current" }, - takeOver, - [ database, shard, collInfo, error ]); - } - continue; // No point to look for properties and - // indices, if the creation has not worked - } + if (collInfo.status === ArangoCollection.STATUS_UNLOADED) { + console.info("unloading local shard '%s/%s'", + database, + shard); + db._collection(shard).unload(); + } + else if (collInfo.status === ArangoCollection.STATUS_LOADED) { + console.info("loading local shard '%s/%s'", + database, + shard); + db._collection(shard).load(); + } + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, collInfo, error ]); + didWrite = true; + } - var indexes = getIndexMap(shard); - var idx; - var index; + // collection exists, now compare collection properties + var properties = { }; + var cmp = [ "journalSize", "waitForSync", "doCompact", + "indexBuckets" ]; + for (i = 0; i < cmp.length; ++i) { + var p = cmp[i]; + if (localCollections[shard][p] !== collInfo[p]) { + // property change + properties[p] = collInfo[p]; + } + } - if (collInfo.hasOwnProperty("indexes")) { - for (i = 0; i < collInfo.indexes.length; ++i) { - index = collInfo.indexes[i]; - - var changed = false; - - if (index.type !== "primary" && index.type !== "edge" && - ! indexes.hasOwnProperty(index.id)) { - console.info("creating index '%s/%s': %s", - database, - shard, - JSON.stringify(index)); + if (Object.keys(properties).length > 0) { + console.info("updating properties for local shard '%s/%s'", + database, + shard); try { - arangodb.db._collection(shard).ensureIndex(index); - index.error = false; - index.errorNum = 0; - index.errorMessage = ""; + db._collection(shard).properties(properties); } - catch (err5) { - index.error = true; - index.errorNum = err5.errorNum; - index.errorMessage = err5.errorMessage; + catch (err3) { + error = { error: true, errorNum: err3.errorNum, + errorMessage: err3.errorMessage }; } - - changed = true; - } - if (changed) { writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); + createCollectionAgency, + [ database, shard, collInfo, error ]); didWrite = true; } } - var changed2 = false; - for (idx in indexes) { - if (indexes.hasOwnProperty(idx)) { - // found an index in the index map, check if it must be deleted + if (error.error) { + if (takeOverResponsibility && !didWrite) { + writeLocked({ part: "Current" }, + takeOver, + [ database, shard, collInfo, error ]); + } + continue; // No point to look for properties and + // indices, if the creation has not worked + } - if (indexes[idx].type !== "primary" && indexes[idx].type !== "edge") { - var found = false; - for (i = 0; i < collInfo.indexes.length; ++i) { - if (collInfo.indexes[i].id === idx) { - found = true; - break; - } + var indexes = getIndexMap(shard); + var idx; + var index; + + if (collInfo.hasOwnProperty("indexes")) { + for (i = 0; i < collInfo.indexes.length; ++i) { + index = collInfo.indexes[i]; + + var changed = false; + + if (index.type !== "primary" && index.type !== "edge" && + ! indexes.hasOwnProperty(index.id)) { + console.info("creating index '%s/%s': %s", + database, + shard, + JSON.stringify(index)); + + try { + arangodb.db._collection(shard).ensureIndex(index); + index.error = false; + index.errorNum = 0; + index.errorMessage = ""; + } + catch (err5) { + index.error = true; + index.errorNum = err5.errorNum; + index.errorMessage = err5.errorMessage; } - if (! found) { - // found an index to delete locally - changed2 = true; - index = indexes[idx]; + changed = true; + } + if (changed) { + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, collInfo, error ]); + didWrite = true; + } + } - console.info("dropping index '%s/%s': %s", - database, - shard, - JSON.stringify(index)); + var changed2 = false; + for (idx in indexes) { + if (indexes.hasOwnProperty(idx)) { + // found an index in the index map, check if it must be deleted - arangodb.db._collection(shard).dropIndex(index); + if (indexes[idx].type !== "primary" && indexes[idx].type !== "edge") { + var found = false; + for (i = 0; i < collInfo.indexes.length; ++i) { + if (collInfo.indexes[i].id === idx) { + found = true; + break; + } + } - delete indexes[idx]; - collInfo.indexes.splice(i, i); + if (! found) { + // found an index to delete locally + changed2 = true; + index = indexes[idx]; + + console.info("dropping index '%s/%s': %s", + database, + shard, + JSON.stringify(index)); + + arangodb.db._collection(shard).dropIndex(index); + + delete indexes[idx]; + collInfo.indexes.splice(i, i); + } } } } + if (changed2) { + writeLocked({ part: "Current" }, + createCollectionAgency, + [ database, shard, collInfo, error ]); + didWrite = true; + } } - if (changed2) { - writeLocked({ part: "Current" }, - createCollectionAgency, - [ database, shard, collInfo, error ]); - didWrite = true; - } - } - if (takeOverResponsibility && !didWrite) { - console.info("HMMMM WRITE"); - writeLocked({ part: "Current" }, - takeOver, - [ database, shard, collInfo, error ]); + if (takeOverResponsibility && !didWrite) { + writeLocked({ part: "Current" }, + takeOver, + [ database, shard, collInfo, error ]); + } } } } - } - collInfo.id = save[0]; - collInfo.name = save[1]; - - }); - } - catch (err) { - // always return to previous database - db._useDatabase(previousDatabase); - throw err; - } + collInfo.id = save[0]; + collInfo.name = save[1]; + }); + } + catch (err) { + // always return to previous database + db._useDatabase(previousDatabase); + throw err; + } + + } } } } + + if (takeOverResponsibility) { + // mop: if this is a complete takeover we need a global lock because + // otherwise the coordinator might fetch results which are only partly + // migrated + var fakeLock = (lockInfo, cb, args) => { + if (!lockInfo || lockInfo.part != 'Current') { + throw new Error("Invalid lockInfo " + JSON.stringify(lockInfo)); + } + return cb(...args); + } + writeLocked({ part: "Current" }, migrate, [fakeLock]); + } else { + migrate(writeLocked); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index f1d451ff34..8fe35fb272 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -57,7 +57,6 @@ start() { --cluster.my-role $ROLE \ --log.file cluster/$PORT.log \ --log.requests-file cluster/$PORT.req \ - --log.level TRACE \ --server.disable-statistics true \ --server.foxx-queues false \ --javascript.startup-directory ./js \