1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2016-03-24 17:54:13 +01:00
commit 1f9e40fbf6
5 changed files with 248 additions and 195 deletions

View File

@ -942,7 +942,7 @@ void ClusterInfo::loadCurrentCollections() {
_currentCollections.swap(newCollections);
_shardIds.swap(newShardIds);
_currentCollectionsProt.version++; // such that others notice our change
_currentCollectionsProt.isValid = true; // will never be reset to false
_currentCollectionsProt.isValid = true;
}
return;
}
@ -2506,6 +2506,14 @@ std::vector<ServerID> ClusterInfo::getCurrentCoordinators() {
return result;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate current
//////////////////////////////////////////////////////////////////////////////
void ClusterInfo::invalidateCurrent() {
WRITE_LOCKER(writeLocker, _currentCollectionsProt.lock);
_currentCollectionsProt.isValid = false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get information about current followers of a shard.
////////////////////////////////////////////////////////////////////////////////

View File

@ -810,6 +810,12 @@ class ClusterInfo {
std::vector<ServerID> getCurrentCoordinators();
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate current
//////////////////////////////////////////////////////////////////////////////
void invalidateCurrent();
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief actually clears a list of planned databases

View File

@ -261,6 +261,8 @@ void HeartbeatThread::runCoordinator() {
// last value of plan which we have noticed:
uint64_t lastPlanVersionNoticed = 0;
// last value of current which we have noticed:
uint64_t lastCurrentVersionNoticed = 0;
// value of Sync/Commands/my-id at startup
uint64_t lastCommandIndex = getLastCommandIndex();
@ -362,6 +364,28 @@ void HeartbeatThread::runCoordinator() {
}
}
result = _agency.getValues("Current/Version", false);
if (result.successful()) {
result.parse("", false);
std::map<std::string, AgencyCommResultEntry>::iterator it =
result._values.begin();
if (it != result._values.end()) {
// there is a plan version
uint64_t currentVersion =
arangodb::basics::VelocyPackHelper::stringUInt64(
it->second._vpack->slice());
if (currentVersion > lastCurrentVersionNoticed) {
lastCurrentVersionNoticed = currentVersion;
ClusterInfo::instance()->invalidateCurrent();
}
}
}
if (shouldSleep) {
double remain = interval - (TRI_microtime() - start);

View File

@ -557,250 +557,266 @@ function createLocalCollections (plannedCollections, planVersion, takeOverRespon
payload);
};
// mop: just a function alias but this way one at least knows what it is supposed to do :S
var takeOver = createCollectionAgency;
var db = require("internal").db;
db._useDatabase("_system");
var localDatabases = getLocalDatabases();
var database;
var i;
// iterate over all matching databases
for (database in plannedCollections) {
if (plannedCollections.hasOwnProperty(database)) {
if (localDatabases.hasOwnProperty(database)) {
// save old database name
var previousDatabase = db._name();
// switch into other database
db._useDatabase(database);
var migrate = writeLocked => {
var localDatabases = getLocalDatabases();
var database;
var i;
try {
// iterate over collections of database
var localCollections = getLocalCollections();
// iterate over all matching databases
for (database in plannedCollections) {
if (plannedCollections.hasOwnProperty(database)) {
if (localDatabases.hasOwnProperty(database)) {
// save old database name
var previousDatabase = db._name();
// switch into other database
db._useDatabase(database);
var collections = plannedCollections[database];
try {
// iterate over collections of database
var localCollections = getLocalCollections();
// diff the collections
Object.keys(collections).forEach(function(collection) {
var collInfo = collections[collection];
var shards = collInfo.shards;
var shard;
var collections = plannedCollections[database];
collInfo.planId = collInfo.id;
var save = [collInfo.id, collInfo.name];
delete collInfo.id; // must not actually set it here
delete collInfo.name; // name is now shard
// diff the collections
Object.keys(collections).forEach(function(collection) {
var collInfo = collections[collection];
var shards = collInfo.shards;
var shard;
for (shard in shards) {
if (shards.hasOwnProperty(shard)) {
var didWrite = false;
if (shards[shard][0] === ourselves) {
// found a shard we are responsible for
collInfo.planId = collInfo.id;
var save = [collInfo.id, collInfo.name];
delete collInfo.id; // must not actually set it here
delete collInfo.name; // name is now shard
var error = { error: false, errorNum: 0,
errorMessage: "no error" };
for (shard in shards) {
if (shards.hasOwnProperty(shard)) {
var didWrite = false;
if (shards[shard][0] === ourselves) {
// found a shard we are responsible for
if (! localCollections.hasOwnProperty(shard)) {
// must create this shard
console.info("creating local shard '%s/%s' for central '%s/%s'",
database,
shard,
database,
collInfo.planId);
var error = { error: false, errorNum: 0,
errorMessage: "no error" };
try {
if (collInfo.type === ArangoCollection.TYPE_EDGE) {
db._createEdgeCollection(shard, collInfo);
}
else {
db._create(shard, collInfo);
}
}
catch (err2) {
error = { error: true, errorNum: err2.errorNum,
errorMessage: err2.errorMessage };
console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s",
database,
shard,
database,
collInfo.planId,
JSON.stringify(err2));
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
else {
if (localCollections[shard].status !== collInfo.status) {
console.info("detected status change for local shard '%s/%s'",
database,
shard);
if (collInfo.status === ArangoCollection.STATUS_UNLOADED) {
console.info("unloading local shard '%s/%s'",
database,
shard);
db._collection(shard).unload();
}
else if (collInfo.status === ArangoCollection.STATUS_LOADED) {
console.info("loading local shard '%s/%s'",
database,
shard);
db._collection(shard).load();
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
// collection exists, now compare collection properties
var properties = { };
var cmp = [ "journalSize", "waitForSync", "doCompact",
"indexBuckets" ];
for (i = 0; i < cmp.length; ++i) {
var p = cmp[i];
if (localCollections[shard][p] !== collInfo[p]) {
// property change
properties[p] = collInfo[p];
}
}
if (Object.keys(properties).length > 0) {
console.info("updating properties for local shard '%s/%s'",
database,
shard);
if (! localCollections.hasOwnProperty(shard)) {
// must create this shard
console.info("creating local shard '%s/%s' for central '%s/%s'",
database,
shard,
database,
collInfo.planId);
try {
db._collection(shard).properties(properties);
if (collInfo.type === ArangoCollection.TYPE_EDGE) {
db._createEdgeCollection(shard, collInfo);
}
else {
db._create(shard, collInfo);
}
}
catch (err3) {
error = { error: true, errorNum: err3.errorNum,
errorMessage: err3.errorMessage };
catch (err2) {
error = { error: true, errorNum: err2.errorNum,
errorMessage: err2.errorMessage };
console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s",
database,
shard,
database,
collInfo.planId,
JSON.stringify(err2));
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
else {
if (localCollections[shard].status !== collInfo.status) {
console.info("detected status change for local shard '%s/%s'",
database,
shard);
if (error.error) {
if (takeOverResponsibility && !didWrite) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
}
continue; // No point to look for properties and
// indices, if the creation has not worked
}
if (collInfo.status === ArangoCollection.STATUS_UNLOADED) {
console.info("unloading local shard '%s/%s'",
database,
shard);
db._collection(shard).unload();
}
else if (collInfo.status === ArangoCollection.STATUS_LOADED) {
console.info("loading local shard '%s/%s'",
database,
shard);
db._collection(shard).load();
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
var indexes = getIndexMap(shard);
var idx;
var index;
// collection exists, now compare collection properties
var properties = { };
var cmp = [ "journalSize", "waitForSync", "doCompact",
"indexBuckets" ];
for (i = 0; i < cmp.length; ++i) {
var p = cmp[i];
if (localCollections[shard][p] !== collInfo[p]) {
// property change
properties[p] = collInfo[p];
}
}
if (collInfo.hasOwnProperty("indexes")) {
for (i = 0; i < collInfo.indexes.length; ++i) {
index = collInfo.indexes[i];
var changed = false;
if (index.type !== "primary" && index.type !== "edge" &&
! indexes.hasOwnProperty(index.id)) {
console.info("creating index '%s/%s': %s",
database,
shard,
JSON.stringify(index));
if (Object.keys(properties).length > 0) {
console.info("updating properties for local shard '%s/%s'",
database,
shard);
try {
arangodb.db._collection(shard).ensureIndex(index);
index.error = false;
index.errorNum = 0;
index.errorMessage = "";
db._collection(shard).properties(properties);
}
catch (err5) {
index.error = true;
index.errorNum = err5.errorNum;
index.errorMessage = err5.errorMessage;
catch (err3) {
error = { error: true, errorNum: err3.errorNum,
errorMessage: err3.errorMessage };
}
changed = true;
}
if (changed) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
var changed2 = false;
for (idx in indexes) {
if (indexes.hasOwnProperty(idx)) {
// found an index in the index map, check if it must be deleted
if (error.error) {
if (takeOverResponsibility && !didWrite) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
}
continue; // No point to look for properties and
// indices, if the creation has not worked
}
if (indexes[idx].type !== "primary" && indexes[idx].type !== "edge") {
var found = false;
for (i = 0; i < collInfo.indexes.length; ++i) {
if (collInfo.indexes[i].id === idx) {
found = true;
break;
}
var indexes = getIndexMap(shard);
var idx;
var index;
if (collInfo.hasOwnProperty("indexes")) {
for (i = 0; i < collInfo.indexes.length; ++i) {
index = collInfo.indexes[i];
var changed = false;
if (index.type !== "primary" && index.type !== "edge" &&
! indexes.hasOwnProperty(index.id)) {
console.info("creating index '%s/%s': %s",
database,
shard,
JSON.stringify(index));
try {
arangodb.db._collection(shard).ensureIndex(index);
index.error = false;
index.errorNum = 0;
index.errorMessage = "";
}
catch (err5) {
index.error = true;
index.errorNum = err5.errorNum;
index.errorMessage = err5.errorMessage;
}
if (! found) {
// found an index to delete locally
changed2 = true;
index = indexes[idx];
changed = true;
}
if (changed) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
console.info("dropping index '%s/%s': %s",
database,
shard,
JSON.stringify(index));
var changed2 = false;
for (idx in indexes) {
if (indexes.hasOwnProperty(idx)) {
// found an index in the index map, check if it must be deleted
arangodb.db._collection(shard).dropIndex(index);
if (indexes[idx].type !== "primary" && indexes[idx].type !== "edge") {
var found = false;
for (i = 0; i < collInfo.indexes.length; ++i) {
if (collInfo.indexes[i].id === idx) {
found = true;
break;
}
}
delete indexes[idx];
collInfo.indexes.splice(i, i);
if (! found) {
// found an index to delete locally
changed2 = true;
index = indexes[idx];
console.info("dropping index '%s/%s': %s",
database,
shard,
JSON.stringify(index));
arangodb.db._collection(shard).dropIndex(index);
delete indexes[idx];
collInfo.indexes.splice(i, i);
}
}
}
}
if (changed2) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
if (changed2) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
if (takeOverResponsibility && !didWrite) {
console.info("HMMMM WRITE");
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
if (takeOverResponsibility && !didWrite) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
}
}
}
}
}
collInfo.id = save[0];
collInfo.name = save[1];
});
}
catch (err) {
// always return to previous database
db._useDatabase(previousDatabase);
throw err;
}
collInfo.id = save[0];
collInfo.name = save[1];
});
}
catch (err) {
// always return to previous database
db._useDatabase(previousDatabase);
throw err;
}
}
}
}
}
if (takeOverResponsibility) {
// mop: if this is a complete takeover we need a global lock because
// otherwise the coordinator might fetch results which are only partly
// migrated
var fakeLock = (lockInfo, cb, args) => {
if (!lockInfo || lockInfo.part != 'Current') {
throw new Error("Invalid lockInfo " + JSON.stringify(lockInfo));
}
return cb(...args);
}
writeLocked({ part: "Current" }, migrate, [fakeLock]);
} else {
migrate(writeLocked);
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -57,7 +57,6 @@ start() {
--cluster.my-role $ROLE \
--log.file cluster/$PORT.log \
--log.requests-file cluster/$PORT.req \
--log.level TRACE \
--server.disable-statistics true \
--server.foxx-queues false \
--javascript.startup-directory ./js \