1
0
Fork 0

Repair DBServerAgencySync in background.

Also: Notice role change follower->leader.
This commit is contained in:
Max Neunhoeffer 2016-05-25 17:27:27 +02:00
parent e4ca111914
commit 4a4472e31e
4 changed files with 81 additions and 3 deletions

View File

@ -2653,3 +2653,13 @@ void FollowerInfo::remove(ServerID const& sid) {
<< path;
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief clear follower list, no changes in agency necesary
//////////////////////////////////////////////////////////////////////////////
void FollowerInfo::clear() {
MUTEX_LOCKER(locker, _mutex);
auto v = std::make_shared<std::vector<ServerID>>();
_followers = v; // will cast to std::vector<ServerID> const
}

View File

@ -1036,6 +1036,12 @@ class FollowerInfo {
void remove(ServerID const& s);
//////////////////////////////////////////////////////////////////////////////
/// @brief clear follower list, no changes in agency necesary
//////////////////////////////////////////////////////////////////////////////
void clear();
};
} // end namespace arangodb

View File

@ -1074,6 +1074,50 @@ static void JS_FiguresVocbaseCol(
/// @brief was docuBlock collectionLoad
////////////////////////////////////////////////////////////////////////////////
static void JS_LeaderResign(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (ServerState::instance()->isDBServer()) {
TRI_vocbase_col_t const* collection =
TRI_UnwrapClass<TRI_vocbase_col_t>(args.Holder(), WRP_VOCBASE_COL_TYPE);
if (collection == nullptr) {
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
TRI_vocbase_t* vocbase = collection->_vocbase;
std::string collectionName = collection->name();
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
auto transactionContext = std::make_shared<V8TransactionContext>(vocbase, true);
SingleCollectionTransaction trx(transactionContext, collectionName,
TRI_TRANSACTION_READ);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_document_collection_t* docColl = trx.documentCollection();
docColl->followers()->clear();
}
TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock collectionLoad
////////////////////////////////////////////////////////////////////////////////
static void JS_LoadVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
@ -3092,6 +3136,8 @@ void TRI_InitV8collection(v8::Handle<v8::Context> context, TRI_server_t* server,
JS_FiguresVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("insert"),
JS_InsertVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("leaderResign"),
JS_LeaderResign, true);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("load"),
JS_LoadVocbaseCol);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("name"),

View File

@ -450,6 +450,7 @@ function handleDatabaseChanges (plan, current, writeLocked) {
////////////////////////////////////////////////////////////////////////////////
function createLocalCollections (plannedCollections, planVersion,
currentCollections,
takeOverResponsibility, writeLocked) {
var ourselves = global.ArangoServerState.id();
@ -506,6 +507,15 @@ function createLocalCollections (plannedCollections, planVersion,
var didWrite = false;
if (shards[shard].indexOf(ourselves) >= 0) {
var isLeader = shards[shard][0] === ourselves;
var wasLeader = isLeader;
try {
var currentServers = currentCollections[database][collection][shard].servers;
wasLeader = currentServers[0] === ourselves;
}
catch(err) {
}
console.error("Fuxxen:", shard, isLeader, wasLeader);
// found a shard we are responsible for
var error = { error: false, errorNum: 0,
@ -546,6 +556,10 @@ function createLocalCollections (plannedCollections, planVersion,
}
}
else {
if (!isLeader && wasLeader) {
db._collection(shard).leaderResign();
}
if (localCollections[shard].status !== collInfo.status) {
console.info("detected status change for local shard '%s/%s'",
database,
@ -695,7 +709,8 @@ function createLocalCollections (plannedCollections, planVersion,
}
}
if (takeOverResponsibility && !didWrite && isLeader) {
if ((takeOverResponsibility && !didWrite && isLeader) ||
(!didWrite && isLeader && !wasLeader)) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
@ -1130,10 +1145,11 @@ function handleCollectionChanges (plan, current, takeOverResponsibility,
var ok = true;
try {
// Note that createLocalCollections and dropLocalCollections do not
createLocalCollections(plannedCollections, plan.Version, currentCollections,
takeOverResponsibility, writeLocked);
// Note that dropLocalCollections does not
// need the currentCollections, since they compare the plan with
// the local situation.
createLocalCollections(plannedCollections, plan.Version, takeOverResponsibility, writeLocked);
dropLocalCollections(plannedCollections, writeLocked);
cleanupCurrentCollections(plannedCollections, currentCollections,
writeLocked);