1
0
Fork 0

Fix reporting index and collection errors

This commit is contained in:
Andreas Streichardt 2017-01-26 10:43:35 +01:00
parent 2eab5bf5bc
commit 4e6371630a
2 changed files with 311 additions and 152 deletions

View File

@ -46,6 +46,41 @@ const agencyOperations = {
'increment' : {'op' : 'increment'}
};
// good enough isEqual which does not depend on equally sorted objects as _.isEqual
var isEqual = function(object, other) {
if (typeof object !== typeof other) {
return false;
}
if (typeof object !== 'object') {
// explicitly use non strict equal
// eslint-disable-next-line eqeqeq
return object == other;
}
if (Array.isArray(object)) {
return object.every((value, index) => {
return isEqual(value, other[index]);
});
} else if (object === null) {
return other === null;
}
let myKeys = Object.keys(object);
let otherKeys = Object.keys(other);
if (myKeys.length !== otherKeys.length) {
return false;
}
return myKeys.every(key => {
if (!isEqual(object[key], other[key])) {
return false;
}
return true;
});
};
var endpointToURL = function (endpoint) {
if (endpoint.substr(0, 6) === 'ssl://') {
return 'https://' + endpoint.substr(6);
@ -251,25 +286,6 @@ function getShardMap (plannedCollections) {
return shardMap;
}
// /////////////////////////////////////////////////////////////////////////////
// / @brief return the indexes of a collection as a map
// /////////////////////////////////////////////////////////////////////////////
function getIndexMap (shard) {
var indexes = { }, i;
var idx = arangodb.db._collection(shard).getIndexes();
for (i = 0; i < idx.length; ++i) {
// fetch id without collection name
var id = idx[i].id.replace(/^[a-zA-Z0-9_\-]*?\/([0-9]+)$/, '$1');
idx[i].id = id;
indexes[id] = idx[i];
}
return indexes;
}
// /////////////////////////////////////////////////////////////////////////////
// / @brief return a hash with the local databases
// /////////////////////////////////////////////////////////////////////////////
@ -613,6 +629,65 @@ function scheduleOneShardSynchronization (database, shard, planId, leader) {
return true;
}
function createIndexes(collection, plannedIndexes) {
let existingIndexes = collection.getIndexes();
let localId = plannedIndex => {
return collection.name() + '/' + plannedIndex.id;
};
let clusterId = existingIndex => {
return existingIndex.split('/')[1];
};
let findExisting = index => {
return existingIndexes.filter(existingIndex => {
return existingIndex.id === localId(index);
})[0];
};
let errors = plannedIndexes.reduce((errors, plannedIndex) => {
if (plannedIndex.type !== 'primary' && plannedIndex.type !== 'edge') {
if (findExisting(plannedIndex)) {
return errors;
}
try {
console.debug("creating index '%s/%s': %s",
collection._dbName,
collection.name(),
JSON.stringify(plannedIndex));
collection.ensureIndex(plannedIndex);
} catch (e) {
errors[plannedIndex.id] = {
errorNum: e.errorNum,
errorMessage: e.errorMessage,
};
};
}
return errors;
}, {});
let indexesToDelete = existingIndexes.filter(index => {
if (index.type === 'primary' || index.type === 'edge') {
return false;
}
return plannedIndexes.filter(plannedIndex => {
return localId(plannedIndex) === index.id;
}).length === 0;
});
return indexesToDelete.reduce((errors, index) => {
console.debug("dropping index '%s/%s': %s",
collection._dbName,
collection.name(),
JSON.stringify(index));
if (!collection.dropIndex(index)) {
errors[clusterId(index)] = {
errorNum: 4,
errorMessage: 'could not delete index locally',
};
}
}, errors);
}
// /////////////////////////////////////////////////////////////////////////////
// / @brief executePlanForCollections
// /////////////////////////////////////////////////////////////////////////////
@ -624,6 +699,19 @@ function executePlanForCollections(plannedCollections) {
let db = require('internal').db;
db._useDatabase('_system');
let createShardError = function(errors, database, planId, shardName) {
if (Object.keys(errors).length > 0) {
let fullError = {};
fullError[shardName] = {
info: {database, planId, shardName},
};
fullError = Object.assign(fullError, errors);
return fullError;
} else {
return errors;
}
};
let localDatabases = getLocalDatabases();
// Create shards in Plan that are not there locally:
Object.keys(plannedCollections).forEach(database => {
@ -637,55 +725,58 @@ function executePlanForCollections(plannedCollections) {
let collections = plannedCollections[database];
// diff the collections
Object.keys(collections).forEach(function (collection) {
let collInfo = collections[collection];
let shards = collInfo.shards;
Object.keys(collections).forEach(function (collectionName) {
let collectionInfo = collections[collectionName];
let shards = collectionInfo.shards;
collInfo.planId = collInfo.id;
Object.keys(shards).forEach(shard => {
if (shards[shard].indexOf(ourselves) >= 0) {
let shouldBeLeader = shards[shard][0] === ourselves;
collectionInfo.planId = collectionInfo.id;
localErrors = Object.keys(shards).reduce((errors, shardName) => {
let shardErrors = {};
let plannedServers = shards[shardName];
if (plannedServers.indexOf(ourselves) >= 0) {
let shouldBeLeader = plannedServers[0] === ourselves;
// found a shard we are responsible for
localErrors[shard] = { error: false, errorNum: 0,
errorMessage: 'no error', indexes: {} };
let error = localErrors[shard];
let collectionStatus;
if (!localCollections.hasOwnProperty(shard)) {
let collection;
if (!localCollections.hasOwnProperty(shardName)) {
// must create this shard
console.debug("creating local shard '%s/%s' for central '%s/%s'",
console.info("creating local shard '%s/%s' for central '%s/%s'",
database,
shard,
shardName,
database,
collInfo.planId);
collectionInfo.planId);
let save = {id: collInfo.id, name: collInfo.name};
delete collInfo.id; // must not
delete collInfo.name;
let save = {id: collectionInfo.id, name: collectionInfo.name};
delete collectionInfo.id; // must not
delete collectionInfo.name;
try {
if (collInfo.type === ArangoCollection.TYPE_EDGE) {
db._createEdgeCollection(shard, collInfo);
if (collectionInfo.type === ArangoCollection.TYPE_EDGE) {
db._createEdgeCollection(shardName, collectionInfo);
} else {
db._create(shard, collInfo);
db._create(shardName, collectionInfo);
}
} catch (err2) {
error = { error: true, errorNum: err2.errorNum,
errorMessage: err2.errorMessage };
shardErrors.collection = {
errorNum: err2.errorNum,
errorMessage: err2.errorMessage,
};
console.error("creating local shard '%s/%s' for central '%s/%s' failed: %s",
database,
shard,
shardName,
database,
collInfo.planId,
collectionInfo.planId,
JSON.stringify(err2));
return Object.assign(errors, createShardError(shardErrors, database, collectionInfo.planId, shardName));
}
collInfo.id = save.id;
collInfo.name = save.name;
collectionInfo.id = save.id;
collectionInfo.name = save.name;
collection = db._collection(shardName);
if (shouldBeLeader) {
db._collection(shard).assumeLeadership();
collection.assumeLeadership();
}
collectionStatus = ArangoCollection.STATUS_LOADED;
} else {
collection = db._collection(shardName);
// We adjust local leadership, note that the planned resignation
// case is not handled here, since then ourselves does not appear
// in shards[shard] but only "_" + ourselves.
@ -695,23 +786,23 @@ function executePlanForCollections(plannedCollections) {
// "_" + ourselves. See below under "Drop local shards"
// to see the proper handling of this case. Place is marked
// with *** in comments.
if (!shouldBeLeader && localCollections[shard].isLeader) {
db._collection(shard).leaderResign();
if (!shouldBeLeader && localCollections[shardName].isLeader) {
collection.leaderResign();
} else if (shouldBeLeader &&
!localCollections[shard].isLeader) {
db._collection(shard).assumeLeadership();
!localCollections[shardName].isLeader) {
collection.assumeLeadership();
}
collectionStatus = localCollections[shard].status;
collectionStatus = localCollections[shardName].status;
// collection exists, now compare collection properties
let cmp = [ 'journalSize', 'waitForSync', 'doCompact',
'indexBuckets' ];
let properties = cmp.reduce((obj, key) => {
if (localCollections[shard][key] !== collInfo[key]) {
if (localCollections[shardName][key] !== collectionInfo[key]) {
// property change
obj[key] = collInfo[key];
obj[key] = collectionInfo[key];
}
return obj;
}, {});
@ -719,103 +810,49 @@ function executePlanForCollections(plannedCollections) {
if (Object.keys(properties).length > 0) {
console.info("updating properties for local shard '%s/%s'",
database,
shard);
shardName);
try {
db._collection(shard).properties(properties);
collection.properties(properties);
} catch (err3) {
error = { error: true, errorNum: err3.errorNum,
errorMessage: err3.errorMessage };
shardErrors.collection = {
errorNum: err3.errorNum,
errorMessage: err3.errorMessage,
};
return Object.assign(errors, createShardError(shardErrors, database, collectionInfo.planId, shardName));
}
}
}
if (error.error) {
return; // No point to look for indices, if the
// creation has not worked
}
// Now check whether the status is OK:
if (collectionStatus !== collInfo.status) {
if (collectionStatus !== collectionInfo.status) {
console.info("detected status change for local shard '%s/%s'",
database,
shard);
shardName);
if (collInfo.status === ArangoCollection.STATUS_UNLOADED) {
if (collectionInfo.status === ArangoCollection.STATUS_UNLOADED) {
console.info("unloading local shard '%s/%s'",
database,
shard);
db._collection(shard).unload();
} else if (collInfo.status === ArangoCollection.STATUS_LOADED) {
shardName);
collection.unload();
} else if (collectionInfo.status === ArangoCollection.STATUS_LOADED) {
console.info("loading local shard '%s/%s'",
database,
shard);
db._collection(shard).load();
shardName);
collection.load();
}
}
let indexes = getIndexMap(shard);
let idx;
let index;
if (collInfo.hasOwnProperty('indexes')) {
for (let i = 0; i < collInfo.indexes.length; ++i) {
index = collInfo.indexes[i];
if (index.type !== 'primary' && index.type !== 'edge' &&
!indexes.hasOwnProperty(index.id)) {
console.debug("creating index '%s/%s': %s",
database,
shard,
JSON.stringify(index));
try {
arangodb.db._collection(shard).ensureIndex(index);
} catch (err5) {
error.indexes[index.id] = {
id: index.id,
error: true,
errorNum: err5.errorNum,
errorMessage: err5.errorMessage
};
}
}
}
for (idx in indexes) {
if (indexes.hasOwnProperty(idx)) {
// found an index in the index map, check if it must be deleted
if (indexes[idx].type !== 'primary' && indexes[idx].type !== 'edge') {
let found = false;
for (let i = 0; i < collInfo.indexes.length; ++i) {
if (collInfo.indexes[i].id === idx) {
found = true;
break;
}
}
if (!found) {
// found an index to delete locally
index = indexes[idx];
console.info("dropping index '%s/%s': %s",
database,
shard,
JSON.stringify(index));
arangodb.db._collection(shard).dropIndex(index);
delete indexes[idx];
}
}
}
}
let indexErrors = createIndexes(collection, collectionInfo.indexes || {});
if (indexErrors > 0) {
shardErrors.indexErrors = indexErrors;
}
}
});
return Object.assign(errors, createShardError(shardErrors, database, collectionInfo.planId, shardName));
}, localErrors);
});
} catch(e) {
console.debug("Got error executing plan", e, e.stack);
console.error("Got error executing plan", e, e.stack);
} finally {
// always return to previous database
db._useDatabase('_system');
@ -902,15 +939,18 @@ function updateCurrentForCollections(localErrors, currentCollections) {
let database;
function assembleLocalCollectionInfo(info, error) {
if (error.collection) {
return {
error: true,
errorMessage: error.collection.errorMessage,
errorNum: error.collection.errorNum,
indexes: [],
servers: [ourselves],
};
}
let coll = db._collection(info.name);
let payload = {
error: error.error || false,
errorMessage: error.errorMessage || '',
errorNum: error.errorNum || 0,
};
let indexErrors = fetchKey(error, 'indexes') || {};
payload.indexes = coll.getIndexes().map(index => {
let indexes = coll.getIndexes().map(index => {
let agencyIndex = {};
Object.assign(agencyIndex, index);
// Fix up the IDs of the indexes:
@ -920,20 +960,28 @@ function updateCurrentForCollections(localErrors, currentCollections) {
} else {
agencyIndex.id = index.id;
}
if (indexErrors[agencyIndex.id] !== undefined) {
Object.assign(agencyIndex, indexErrors[agencyIndex.id]);
delete error.indexes[agencyIndex.id];
}
return agencyIndex;
});
// add the remaining errors which do not have a local id
Object.keys(indexErrors).forEach(indexId => {
payload.indexes.push(error.indexes[indexId]);
});
payload.servers = [ourselves].concat(coll.getFollowers());
return payload;
if (error.indexErrors) {
indexes = indexes.concat(Object.keys(error.indexErrors).map(id => {
let indexError = error.indexErrors[id];
return {
id,
error: true,
errorMessage: indexError.errorMessage,
errorNum: indexError.errorNum,
};
}));
}
return {
error: false,
errorMessage: '',
errorNum: 0,
indexes,
servers: [ourselves].concat(coll.getFollowers()),
};
}
function makeDropCurrentEntryCollection(dbname, col, shard) {
@ -963,7 +1011,7 @@ function updateCurrentForCollections(localErrors, currentCollections) {
let localCollectionInfo = assembleLocalCollectionInfo(shardInfo, localErrors[shard] || {});
let currentCollectionInfo = fetchKey(currentCollections, database, shardInfo.planId, shard);
if (!_.isEqual(localCollectionInfo, currentCollectionInfo)) {
if (!isEqual(localCollectionInfo, currentCollectionInfo)) {
trx[curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name] = {
op: 'set',
new: localCollectionInfo,
@ -979,6 +1027,8 @@ function updateCurrentForCollections(localErrors, currentCollections) {
};
}
}
// mark error as handled in any case
delete localErrors[shard];
}
}
} catch (e) {
@ -994,7 +1044,7 @@ function updateCurrentForCollections(localErrors, currentCollections) {
for (database in currentCollections) {
if (currentCollections.hasOwnProperty(database)) {
if (localDatabases.hasOwnProperty(database)) {
// If a database has vanished locally, it is not our job to
// If a database has vanished locally, it is not our job to
// remove it in Current, that is what `updateCurrentForDatabases`
// does.
db._useDatabase(database);
@ -1024,6 +1074,16 @@ function updateCurrentForCollections(localErrors, currentCollections) {
}
}
}
trx = Object.keys(localErrors).reduce((trx, shardName) => {
let error = localErrors[shardName];
if (error.collection) {
trx[curCollections + error.info.database + '/' + error.info.planId + '/' + error.info.shardName] = {
op: 'set',
new: error.collection,
};
}
return trx;
}, trx);
return trx;
}
@ -1159,7 +1219,7 @@ function migratePrimary(plan, current) {
// plan...can NOT be sure that the plan was completely executed
// may react on the errors that have been created
syncReplicatedShardsWithLeaders(plan, current, localErrors);
}
}
// /////////////////////////////////////////////////////////////////////////////
// / @brief executePlanForDatabases

View File

@ -939,6 +939,26 @@ describe('Cluster sync', function() {
.that.has.property('new')
.with.deep.equal(["_repltest"]);
});
it('should report newly assumed leadership for which we were a follower previously and remove any leaders and followers (these have to reregister themselves separateley)', function() {
let props = { planId: '888111' };
let collection = db._create('testi', props);
collection.assumeLeadership();
let current = {
testung: {
888111: {
testi : { "error" : false, "errorMessage" : "", "errorNum" : 0, "indexes" : [ { "id" : "0", "type" : "primary", "fields" : [ "_key" ], "selectivityEstimate" : 1, "unique" : true, "sparse" : false } ], "servers" : [ "bogus-old-leader", "repltest", "useless-follower" ] }
},
}
};
let result = cluster.updateCurrentForCollections({}, current);
expect(result).to.be.an('object');
expect(Object.keys(result)).to.have.lengthOf(1);
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.have.property('op', 'set');
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.has.deep.property('new.servers')
.with.deep.equal(["repltest"]);
});
it('should delete any collections for which we are not a leader locally', function() {
let current = {
testung: {
@ -953,5 +973,84 @@ describe('Cluster sync', function() {
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.has.deep.property('op', 'delete');
});
it('should report newly created indices', function() {
let props = { planId: '888111' };
let collection = db._create('testi', props);
collection.assumeLeadership();
collection.ensureIndex({"type": "hash", "fields": ["hund"]});
let current = {
testung: {
888111: {
testi : { "error" : false, "errorMessage" : "", "errorNum" : 0, "indexes" : [ { "id" : "0", "type" : "primary", "fields" : [ "_key" ], "selectivityEstimate" : 1, "unique" : true, "sparse" : false } ], "servers" : [ "repltest" ] }
},
}
};
let result = cluster.updateCurrentForCollections({}, current);
expect(result).to.be.an('object');
expect(Object.keys(result)).to.have.lengthOf(1);
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.have.property('op', 'set');
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.has.deep.property('new.indexes')
.with.lengthOf(2);
});
it('should report collection errors', function() {
let errors = {
'testi': {
collection: {
'name': 'testi',
'error': true,
'errorNum': 666,
'errorMessage': 'the number of the beast :S',
},
info: {
database: 'testung',
planId: '888111',
shardName: 'testi',
}
}
};
let result = cluster.updateCurrentForCollections(errors, {});
expect(result).to.be.an('object');
expect(Object.keys(result)).to.have.lengthOf(1);
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.have.property('op', 'set');
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.has.deep.property('new.error')
.equals(true);
});
it('should report index errors', function() {
let current = {
testung: {
888111: {
testi : { "error" : false, "errorMessage" : "", "errorNum" : 0, "indexes" : [ { "id" : "0", "type" : "primary", "fields" : [ "_key" ], "selectivityEstimate" : 1, "unique" : true, "sparse" : false } ], "servers" : [ "repltest" ] }
},
}
};
let errors = {
'testi': {
'indexErrors': {
1: {
'id': 1,
'error': true,
'errorNum': 666,
'errorMessage': 'the number of the beast :S',
}
},
}
};
let props = { planId: '888111' };
let collection = db._create('testi', props);
collection.assumeLeadership();
let result = cluster.updateCurrentForCollections(errors, current);
expect(result).to.be.an('object');
expect(Object.keys(result)).to.have.lengthOf(1);
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.have.property('op', 'set');
expect(result).to.have.property('/arango/Current/Collections/testung/888111/testi')
.that.has.deep.property('new.indexes.1.error')
.equals(true);
});
});
});