1
0
Fork 0

to lock or not to lock

This commit is contained in:
Kaveh Vahedipour 2017-01-06 14:46:57 +01:00
parent 4ac74301d5
commit a0af781630
2 changed files with 266 additions and 111 deletions

View File

@ -40,8 +40,6 @@
#include "VocBase/ticks.h"
#include "V8Server/v8-vocbaseprivate.h"
#include <chrono>
using namespace arangodb;
using namespace arangodb::basics;
@ -265,18 +263,48 @@ static void JS_GetAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief acquires a read-lock in the agency
////////////////////////////////////////////////////////////////////////////////
static void JS_LockReadAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() < 1) {
TRI_V8_THROW_EXCEPTION_USAGE("lockRead(<part>, <ttl>, <timeout>)");
}
std::string const part = TRI_ObjectToString(args[0]);
double ttl = 0.0;
if (args.Length() > 1) {
ttl = TRI_ObjectToDouble(args[1]);
}
double timeout = 0.0;
if (args.Length() > 2) {
timeout = TRI_ObjectToDouble(args[2]);
}
AgencyComm comm;
if (!comm.lockRead(part, ttl, timeout)) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"unable to acquire lock");
}
TRI_V8_RETURN_TRUE();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief read transaction to the agency
////////////////////////////////////////////////////////////////////////////////
static void JS_APIAgency(std::string const& method,
static void JS_APIAgency(std::string const& envelope,
v8::FunctionCallbackInfo<v8::Value> const& args) {
using namespace std::chrono;
TRI_V8_TRY_CATCH_BEGIN(isolate)
v8::HandleScope scope(isolate);
if (args.Length() < 1) {
TRI_V8_THROW_EXCEPTION_USAGE("read([[...]])");
@ -294,7 +322,7 @@ static void JS_APIAgency(std::string const& method,
comm.sendWithFailover(
arangodb::rest::RequestType::POST,
AgencyCommManager::CONNECTION_OPTIONS._requestTimeout,
std::string("/_api/agency/") + method, builder.toJson());
std::string("/_api/agency/") + envelope, builder.toJson());
if (!result.successful()) {
THROW_AGENCY_EXCEPTION(result);
@ -330,6 +358,101 @@ static void JS_TransactAgency(v8::FunctionCallbackInfo<v8::Value> const& args) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief acquires a write-lock in the agency
////////////////////////////////////////////////////////////////////////////////
static void JS_LockWriteAgency(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() < 1) {
TRI_V8_THROW_EXCEPTION_USAGE("lockWrite(<part>, <ttl>, <timeout>)");
}
std::string const part = TRI_ObjectToString(args[0]);
double ttl = 0.0;
if (args.Length() > 1) {
ttl = TRI_ObjectToDouble(args[1]);
}
double timeout = 0.0;
if (args.Length() > 2) {
timeout = TRI_ObjectToDouble(args[2]);
}
AgencyComm comm;
if (!comm.lockWrite(part, ttl, timeout)) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"unable to acquire lock");
}
TRI_V8_RETURN_TRUE();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief releases a read-lock in the agency
////////////////////////////////////////////////////////////////////////////////
static void JS_UnlockReadAgency(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() > 2) {
TRI_V8_THROW_EXCEPTION_USAGE("unlockRead(<part>, <timeout>)");
}
std::string const part = TRI_ObjectToString(args[0]);
double timeout = 0.0;
if (args.Length() > 1) {
timeout = TRI_ObjectToDouble(args[1]);
}
AgencyComm comm;
if (!comm.unlockRead(part, timeout)) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"unable to release lock");
}
TRI_V8_RETURN_TRUE();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief releases a write-lock in the agency
////////////////////////////////////////////////////////////////////////////////
static void JS_UnlockWriteAgency(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() > 2) {
TRI_V8_THROW_EXCEPTION_USAGE("unlockWrite(<part>, <timeout>)");
}
std::string const part = TRI_ObjectToString(args[0]);
double timeout = 0.0;
if (args.Length() > 1) {
timeout = TRI_ObjectToDouble(args[1]);
}
AgencyComm comm;
if (!comm.unlockWrite(part, timeout)) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"unable to release lock");
}
TRI_V8_RETURN_TRUE();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief removes a value from the agency
////////////////////////////////////////////////////////////////////////////////
@ -1195,25 +1318,6 @@ static void JS_CoordinatorConfigServerState(
TRI_V8_TRY_CATCH_END
}
#ifdef DEBUG_SYNC_REPLICATION
////////////////////////////////////////////////////////////////////////////////
/// @brief set arangoserver state to initialized
////////////////////////////////////////////////////////////////////////////////
static void JS_SetInitializedServerState(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
if (args.Length() != 0) {
TRI_V8_THROW_EXCEPTION_USAGE("setInitialized()");
}
ServerState::instance()->setInitialized();
TRI_V8_TRY_CATCH_END
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief return whether the cluster is initialized
////////////////////////////////////////////////////////////////////////////////
@ -2001,6 +2105,10 @@ void TRI_InitV8Cluster(v8::Isolate* isolate, v8::Handle<v8::Context> context) {
JS_IsEnabledAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("increaseVersion"),
JS_IncreaseVersionAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("lockRead"),
JS_LockReadAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("lockWrite"),
JS_LockWriteAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("remove"),
JS_RemoveAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("set"), JS_SetAgency);
@ -2010,6 +2118,10 @@ void TRI_InitV8Cluster(v8::Isolate* isolate, v8::Handle<v8::Context> context) {
JS_PrefixAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("uniqid"),
JS_UniqidAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("unlockRead"),
JS_UnlockReadAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("unlockWrite"),
JS_UnlockWriteAgency);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("version"),
JS_VersionAgency);
@ -2115,10 +2227,6 @@ void TRI_InitV8Cluster(v8::Isolate* isolate, v8::Handle<v8::Context> context) {
JS_DBserverConfigServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("coordinatorConfig"),
JS_CoordinatorConfigServerState);
#ifdef DEBUG_SYNC_REPLICATION
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("setInitialized"),
JS_SetInitializedServerState);
#endif
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("initialized"),
JS_InitializedServerState);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("isCoordinator"),

View File

@ -38,11 +38,6 @@ var wait = require('internal').wait;
var isEnterprise = require('internal').isEnterprise();
var _ = require('lodash');
const inccv = {'/arango/Current/Version':{'op':'increment'}};
const incpv = {'/arango/Plan/Version':{'op':'increment'}};
const agencyDBs = '/' + global.ArangoAgency.prefix() + '/Current/Databases/';
const agencyCols = '/' + global.ArangoAgency.prefix() + '/Current/Collections/';
var endpointToURL = function (endpoint) {
if (endpoint.substr(0, 6) === 'ssl://') {
return 'https://' + endpoint.substr(6);
@ -306,12 +301,11 @@ function getLocalCollections () {
// / @brief create databases if they exist in the plan but not locally
// //////////////////////////////////////////////////////////////////////////////
function createLocalDatabases (plannedDatabases, currentDatabases) {
function createLocalDatabases (plannedDatabases, currentDatabases, writeLocked) {
var ourselves = global.ArangoServerState.id();
var createDatabaseAgency = function (payload) {
var envelope = {};
envelope[agencyDBs + payload.name + '/' + ourselves] = payload;
global.ArangoAgency.write([[envelope],[inccv]]);
global.ArangoAgency.set('Current/Databases/' + payload.name + '/' + ourselves,
payload);
};
var db = require('internal').db;
@ -345,14 +339,15 @@ function createLocalDatabases (plannedDatabases, currentDatabases) {
payload.errorNum = err.errorNum;
payload.errorMessage = err.errorMessage;
}
createDatabaseAgency(payload);
writeLocked({ part: 'Current' },
createDatabaseAgency,
[ payload ]);
} else if (typeof currentDatabases[name] !== 'object' || !currentDatabases[name].hasOwnProperty(ourselves)) {
// mop: ok during cluster startup we have this buggy situation where a dbserver
// has a database but has not yet announced it to the agency :S
createDatabaseAgency(payload);
writeLocked({ part: 'Current' },
createDatabaseAgency,
[ payload ]);
}
}
}
@ -362,15 +357,12 @@ function createLocalDatabases (plannedDatabases, currentDatabases) {
// / @brief drop databases if they do exist locally but not in the plan
// //////////////////////////////////////////////////////////////////////////////
function dropLocalDatabases (plannedDatabases) {
function dropLocalDatabases (plannedDatabases, writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropDatabaseAgency = function (payload) {
try {
var envelope = {};
envelope[agencyDBs + payload.name + '/' + ourselves] = {"op":"delete"};
global.ArangoAgency.write([[envelope],[inccv]]);
global.ArangoAgency.remove('Current/Databases/' + payload.name + '/' + ourselves);
} catch (err) {
// ignore errors
}
@ -407,8 +399,9 @@ function dropLocalDatabases (plannedDatabases) {
}
db._dropDatabase(name);
dropDatabaseAgency({name:name});
writeLocked({ part: 'Current' },
dropDatabaseAgency,
[ { name: name } ]);
}
}
}
@ -418,14 +411,12 @@ function dropLocalDatabases (plannedDatabases) {
// / @brief clean up what's in Current/Databases for ourselves
// //////////////////////////////////////////////////////////////////////////////
function cleanupCurrentDatabases (currentDatabases) {
function cleanupCurrentDatabases (currentDatabases, writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropDatabaseAgency = function (payload) {
try {
var envelope = {};
envelope[agencyDBs + payload.name + '/' + ourselves] = {"op":"delete"};
global.ArangoAgency.write([[envelope],[inccv]]);
global.ArangoAgency.remove('Current/Databases/' + payload.name + '/' + ourselves);
} catch (err) {
// ignore errors
}
@ -446,7 +437,9 @@ function cleanupCurrentDatabases (currentDatabases) {
// we are entered for a database that we don't have locally
console.debug("cleaning up entry for unknown database '%s'", name);
dropDatabaseAgency({name:name});
writeLocked({ part: 'Current' },
dropDatabaseAgency,
[ { name: name } ]);
}
}
}
@ -457,47 +450,39 @@ function cleanupCurrentDatabases (currentDatabases) {
// / @brief handle database changes
// //////////////////////////////////////////////////////////////////////////////
function handleDatabaseChanges (plan, current) {
function handleDatabaseChanges (plan, current, writeLocked) {
var plannedDatabases = plan.Databases;
var currentDatabases = current.Databases;
createLocalDatabases(plannedDatabases, currentDatabases);
dropLocalDatabases(plannedDatabases);
cleanupCurrentDatabases(currentDatabases);
createLocalDatabases(plannedDatabases, currentDatabases, writeLocked);
dropLocalDatabases(plannedDatabases, writeLocked);
cleanupCurrentDatabases(currentDatabases, writeLocked);
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief create collections if they exist in the plan but not locally
// //////////////////////////////////////////////////////////////////////////////
function createLocalCollections (
plannedCollections, planVersion, currentCollections, takeOverResponsibility) {
function createLocalCollections (plannedCollections, planVersion,
currentCollections,
takeOverResponsibility, writeLocked) {
var ourselves = global.ArangoServerState.id();
var createCollectionAgency = function (database, shard, collInfo, err) {
var payload = {
error: err.error,
errorNum: err.errorNum,
errorMessage: err.errorMessage,
var createCollectionAgency = function (database, shard, collInfo, error) {
var payload = { error: error.error,
errorNum: error.errorNum,
errorMessage: error.errorMessage,
satellite: collInfo.replicationFactor === 0,
indexes: collInfo.indexes,
servers: [ ourselves ],
planVersion: planVersion };
planVersion: planVersion };
console.info('creating Current/Collections/' + database + '/' +
console.debug('creating Current/Collections/' + database + '/' +
collInfo.planId + '/' + shard);
var envelope = {};
//envelope[agencyCols + database + '/' + collInfo.planId + '/' + shard] = payload;
envelope["/arango/Current/Version"] = {"op":"increment"};
global.ArangoAgency.set('Current/Collections/' + database + '/' +
collInfo.planId + '/' + shard, payload);
global.ArangoAgency.write([[envelope]]);
console.info('creating Current/Collections/' + database + '/' +
collInfo.planId + '/' + shard,
payload);
console.debug('creating Current/Collections/' + database + '/' +
collInfo.planId + '/' + shard + ' done.');
};
@ -506,7 +491,7 @@ function createLocalCollections (
var db = require('internal').db;
db._useDatabase('_system');
var migrate = function() {
var migrate = writeLocked => {
var localDatabases = getLocalDatabases();
var database;
var i;
@ -577,7 +562,9 @@ function createLocalCollections (
}
if (isLeader) {
createCollectionAgency(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
} else {
@ -602,7 +589,9 @@ function createLocalCollections (
db._collection(shard).load();
}
if (isLeader) {
createCollectionAgency(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
@ -631,7 +620,9 @@ function createLocalCollections (
errorMessage: err3.errorMessage };
}
if (isLeader) {
createCollectionAgency(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
@ -640,7 +631,9 @@ function createLocalCollections (
if (error.error) {
if (takeOverResponsibility && !didWrite) {
if (isLeader) {
takeOver(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
takeOver,
[ database, shard, collInfo, error ]);
}
}
continue; // No point to look for properties and
@ -678,7 +671,9 @@ function createLocalCollections (
changed = true;
}
if (changed && isLeader) {
createCollectionAgency(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
@ -716,14 +711,18 @@ function createLocalCollections (
}
}
if (changed2 && isLeader) {
createCollectionAgency(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
if ((takeOverResponsibility && !didWrite && isLeader) ||
(!didWrite && isLeader && !wasLeader)) {
takeOver(database, shard, collInfo, error);
writeLocked({ part: 'Current' },
takeOver,
[ database, shard, collInfo, error ]);
}
}
}
@ -743,8 +742,20 @@ function createLocalCollections (
}
};
migrate();
if (takeOverResponsibility) {
// mop: if this is a complete takeover we need a global lock because
// otherwise the coordinator might fetch results which are only partly
// migrated
var fakeLock = (lockInfo, cb, args) => {
if (!lockInfo || lockInfo.part !== 'Current') {
throw new Error('Invalid lockInfo ' + JSON.stringify(lockInfo));
}
return cb(...args);
};
writeLocked({ part: 'Current' }, migrate, [fakeLock]);
} else {
migrate(writeLocked);
}
}
function leaderResign (database, collId, shardName, ourselves) {
@ -774,16 +785,15 @@ function leaderResign (database, collId, shardName, ourselves) {
// / @brief drop collections if they exist locally but not in the plan
// //////////////////////////////////////////////////////////////////////////////
function dropLocalCollections (plannedCollections, currentCollections) {
function dropLocalCollections (plannedCollections, currentCollections,
writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropCollectionAgency = function (database, shardID, id) {
try {
console.debug('dropping Current/Collections/' + database + '/' +
id + '/' + shardID);
var envelope = {};
envelope[agencyCols + database + '/' + id + '/' + shardID] = {"op":"delete"};
global.ArangoAgency.write([[envelope],[inccv]]);
global.ArangoAgency.remove('Current/Collections/' + database + '/' + id + '/' + shardID);
console.debug('dropping Current/Collections/' + database + '/' +
id + '/' + shardID + ' done.');
} catch (err) {
@ -866,7 +876,9 @@ function dropLocalCollections (plannedCollections, currentCollections) {
console.debug('cleaning out Current entry for shard %s in',
'agency for %s/%s', collection, database,
collections[collection].name);
dropCollectionAgency(database, collection, collections[collection].planId);
writeLocked({ part: 'Current' },
dropCollectionAgency,
[ database, collection, collections[collection].planId ]);
}
}
}
@ -885,16 +897,13 @@ function dropLocalCollections (plannedCollections, currentCollections) {
// / @brief clean up what's in Current/Collections for ourselves
// //////////////////////////////////////////////////////////////////////////////
function cleanupCurrentCollections (plannedCollections, currentCollections) {
function cleanupCurrentCollections (plannedCollections, currentCollections,
writeLocked) {
var dropCollectionAgency = function (database, collection, shardID) {
try {
console.debug('cleaning Current/Collections/' + database + '/' +
collection + '/' + shardID);
var envelope = {};
envelope[agencyCols + database + '/' + collection + '/' + shardID] = {"op": "delete"};
global.ArangoAgency.write([[envelope],[inccv]]);
global.ArangoAgency.remove('Current/Collections/' + database + '/' + collection + '/' + shardID);
console.debug('cleaning Current/Collections/' + database + '/' +
collection + '/' + shardID + ' done.');
} catch (err) {
@ -928,7 +937,9 @@ function cleanupCurrentCollections (plannedCollections, currentCollections) {
database,
collection);
dropCollectionAgency(database, collection, shard);
writeLocked({ part: 'Current' },
dropCollectionAgency,
[ database, collection, shard ]);
}
}
}
@ -1283,7 +1294,8 @@ function synchronizeLocalFollowerCollections (plannedCollections,
// / @brief handle collection changes
// //////////////////////////////////////////////////////////////////////////////
function handleCollectionChanges (plan, current, takeOverResponsibility) {
function handleCollectionChanges (plan, current, takeOverResponsibility,
writeLocked) {
var plannedCollections = plan.Collections;
var currentCollections = current.Collections;
@ -1291,9 +1303,10 @@ function handleCollectionChanges (plan, current, takeOverResponsibility) {
try {
createLocalCollections(plannedCollections, plan.Version, currentCollections,
takeOverResponsibility);
dropLocalCollections(plannedCollections, currentCollections);
cleanupCurrentCollections(plannedCollections, currentCollections);
takeOverResponsibility, writeLocked);
dropLocalCollections(plannedCollections, currentCollections, writeLocked);
cleanupCurrentCollections(plannedCollections, currentCollections,
writeLocked);
if (!synchronizeLocalFollowerCollections(plannedCollections,
currentCollections)) {
// If not all needed jobs have been scheduled, then work is still
@ -1394,7 +1407,7 @@ function primaryToSecondary () {
// / @brief change handling trampoline function
// //////////////////////////////////////////////////////////////////////////////
function handleChanges (plan, current) {
function handleChanges (plan, current, writeLocked) {
var changed = false;
var role = ArangoServerState.role();
if (role === 'PRIMARY' || role === 'SECONDARY') {
@ -1439,12 +1452,12 @@ function handleChanges (plan, current) {
}
}
handleDatabaseChanges(plan, current);
handleDatabaseChanges(plan, current, writeLocked);
var success;
if (role === 'PRIMARY' || role === 'COORDINATOR') {
// Note: This is only ever called for DBservers (primary and secondary),
// we keep the coordinator case here just in case...
success = handleCollectionChanges(plan, current, changed);
success = handleCollectionChanges(plan, current, changed, writeLocked);
} else {
success = setupReplication();
}
@ -1614,9 +1627,43 @@ var handlePlanChange = function (plan, current) {
current: current.Version
};
// ////////////////////////////////////////////////////////////////////////////
// / @brief execute an action under a write-lock
// ////////////////////////////////////////////////////////////////////////////
function writeLocked (lockInfo, cb, args) {
var timeout = lockInfo.timeout;
if (timeout === undefined) {
timeout = 60;
}
var ttl = lockInfo.ttl;
if (ttl === undefined) {
ttl = 120;
}
if (require('internal').coverage || require('internal').valgrind) {
ttl *= 10;
timeout *= 10;
}
global.ArangoAgency.lockWrite(lockInfo.part, ttl, timeout);
try {
cb.apply(null, args);
global.ArangoAgency.increaseVersion(lockInfo.part + '/Version');
let version = global.ArangoAgency.get(lockInfo.part + '/Version');
versions[lockInfo.part.toLowerCase()] = version.arango[lockInfo.part].Version;
global.ArangoAgency.unlockWrite(lockInfo.part, timeout);
} catch (err) {
global.ArangoAgency.unlockWrite(lockInfo.part, timeout);
throw err;
}
}
try {
versions.success = handleChanges(plan, current);
versions.success = handleChanges(plan, current, writeLocked);
console.debug('plan change handling successful');
} catch (err) {