1
0
Fork 0

Do not update in endless loop

This commit is contained in:
Andreas Streichardt 2017-01-13 17:55:22 +01:00
parent b0792f1de3
commit 2a2cbf808a
1 changed files with 34 additions and 45 deletions

View File

@ -1624,27 +1624,36 @@ function updateCurrentForCollections(localErrors, current) {
let localDatabases = getLocalDatabases();
let database;
function makeAddCurrentEntryCollection(dbname, info, error, trx) {
function assembleLocalCollectionInfo(info, error) {
let coll = db._collection(info.name);
let payload = {indexes: coll.getIndexes()};
// Fix up the IDs of the indexes:
for (let i = 0; i < payload.indexes.length; ++i) {
let pos = payload.indexes[i].id.indexOf("/");
let payload = {
error: error.error,
errorMessage: error.errorMessage,
errorNum: error.errorNum,
};
payload.indexes = coll.getIndexes().map(index => {
let agencyIndex = {};
Object.assign(agencyIndex, index);
// Fix up the IDs of the indexes:
let pos = index.id.indexOf("/");
if (pos >= 0) {
payload.indexes[i].id = payload.indexes[i].id.slice(pos+1);
agencyIndex.id = index.id.slice(pos+1);
} else {
agencyIndex.id = index.id;
}
}
Object.keys(error.indexes).forEach(id => {
payload.indexes.push(error.indexes[id]);
if (error.indexes[agencyIndex.id] !== undefined) {
Object.assign(agencyIndex, error.indexes[agencyIndex.id]);
delete error.indexes[agencyIndex.id];
}
return agencyIndex;
});
// add the remaining errors which do not have a local id
Object.keys(error.indexes).forEach(indexId => {
payload.indexes.push(error.indexes[indexId]);
});
if (error !== undefined && error.error) {
_.assign(payload, error);
} else {
_.assign(payload, { error: false, errorNum: 0, errorMessage: "" });
}
payload.servers = [ourselves].concat(coll.getFollowers());
trx[0][curCollections + dbname + '/' + info.planId + '/' + info.name] =
{op: 'set', new: payload};
return payload;
}
function makeDropCurrentEntryCollection(dbname, col, shard, trx) {
@ -1672,35 +1681,15 @@ function updateCurrentForCollections(localErrors, current) {
if (localCollections.hasOwnProperty(shard)) {
let shardInfo = localCollections[shard];
if (shardInfo.isLeader) {
// It is the responsibility of the leader to update Current
if (!currentDBInfo.hasOwnProperty(shardInfo.planId) ||
!currentDBInfo[shardInfo.planId].hasOwnProperty(shard) ||
currentDBInfo[shardInfo.planId][shard].servers[0] !==
ourselves) {
makeAddCurrentEntryCollection(database, shardInfo,
localErrors[shard], trx);
} else {
// Here, the entry in current exists and we have been and
// currently are the leader. So we have to update it if
// changed:
let inCurrent = currentDBInfo[shardInfo.planId][shard];
let error = localErrors[shard];
let diff = false;
if (error !== undefined) {
if (inCurrent.error !== error.error ||
inCurrent.errorNum !== error.errorNum ||
inCurrent.errorMessage !== error.errorMessage) {
diff = true;
}
let localCollectionInfo = assembleLocalCollectionInfo(shardInfo, localErrors[shard]);
// our local state differs from what is in the agency! => update!
if (typeof currentDBInfo[shardInfo.planId] != 'object'
|| !_.isEqual(localCollectionInfo, currentDBInfo[shardInfo.planId][shard])) {
trx[0][curCollections + database + '/' + shardInfo.planId + '/' + shardInfo.name] = {
op: 'set',
new: localCollectionInfo,
};
}
if (!_.isEqual(inCurrent.indexes, shardInfo.indexes)) {
diff = true;
}
if (diff) {
makeAddCurrentEntryCollection(database, shardInfo,
error, trx);
}
}
}
}
}
@ -1844,7 +1833,7 @@ function migratePrimary(plan, current) {
// diff current and local and prepare agency transactions or whatever
// to update current. Will report the errors created locally to the agency
let trx = updateCurrentForCollections(localErrors, current);
if (Object.keys(trx[0]).length !== 0) {
if (trx.length > 0 && Object.keys(trx[0]).length !== 0) {
trx[0][curVersion] = {op: 'increment'};
// TODO: reduce timeout when we can:
try {