1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Jan Steemann 2014-01-15 14:00:50 +01:00
commit a0be99b6a6
1 changed files with 148 additions and 88 deletions

View File

@ -1789,16 +1789,17 @@ static v8::Handle<v8::Value> RemoveVocbaseCol (const bool useCollection,
#ifdef TRI_ENABLE_CLUSTER
static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& argv,
TRI_col_type_e collectionType,
std::string const& databaseName,
TRI_col_info_t& parameter) {
static v8::Handle<v8::Value> CreateCollectionCoordinator (
v8::Arguments const& argv,
TRI_col_type_e collectionType,
std::string const& databaseName,
TRI_col_info_t& parameter) {
v8::HandleScope scope;
const string name = TRI_ObjectToString(argv[0]);
uint64_t numberOfShards = 1;
std::vector<std::string> shardKeys;
vector<string> shardKeys;
// default shard key
shardKeys.push_back("_key");
@ -1847,25 +1848,25 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
uint64_t id = ClusterInfo::instance()->uniqid(1 + numberOfShards);
// collection id is the first unique id we got
const std::string cid = StringUtils::itoa(id);
const string cid = StringUtils::itoa(id);
// fetch list of available servers in cluster, and shuffle them randomly
std::vector<std::string> dbServers = ClusterInfo::instance()->getCurrentDBServers();
vector<string> dbServers = ClusterInfo::instance()->getCurrentDBServers();
if (dbServers.empty()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster");
}
std::random_shuffle(dbServers.begin(), dbServers.end());
random_shuffle(dbServers.begin(), dbServers.end());
// now create the shards
std::map<std::string, std::string> shards;
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server
const std::string serverId = dbServers[i % dbServers.size()];
const string serverId = dbServers[i % dbServers.size()];
// determine shard id
const std::string shardId = "s" + StringUtils::itoa(id + 1 + i);
const string shardId = "s" + StringUtils::itoa(id + 1 + i);
shards.insert(std::make_pair<std::string, std::string>(shardId, serverId));
}
@ -1892,6 +1893,7 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "nrShards", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, numberOfShards));
AgencyComm agency;
@ -1903,25 +1905,58 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not lock plan in agency");
}
if (! agency.exists("Plan/Collections/" + databaseName)) {
if (! agency.exists("Plan/Databases/" + databaseName)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "didn't find database entry in agency");
}
{
if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME);
}
AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, json, 0.0);
if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME);
}
AgencyCommResult result
= agency.setValue("Plan/Collections/" + databaseName + "/" + cid,
json, 0.0);
if (!result.successful()) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not create entry for collection in plan in agency");
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
v8::Handle<v8::Object> result = v8::Object::New();
// TODO: wait for the creation of the collection
return scope.Close(result);
// Now wait for it to appear and be complete:
AgencyCommResult res = agency.getValues("Current/Version", false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
uint64_t index = res._index;
while (true) {
res = agency.getValues("Current/Collections/" + databaseName + "/" + cid,
true);
if (res.successful()) {
res.parse("", false);
map<string, AgencyCommResultEntry>::iterator it = res._values.begin();
if (it != res._values.end()) {
TRI_json_t const* json = (*it).second._json;
TRI_json_t const* shards = TRI_LookupArrayJson(json, "shards");
if (TRI_IsArrayJson(shards)) {
size_t len = shards->_value._objects._length / 2;
if (len == numberOfShards) {
return scope.Close(v8::True());
}
}
}
}
res = agency.watchValue("Current/Version", index, 1.0, false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
index = res._index;
}
}
#endif
@ -2033,6 +2068,7 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName();
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) {
TRI_FreeCollectionInfoOptions(&parameter);
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
}
@ -8316,43 +8352,73 @@ static v8::Handle<v8::Value> JS_ListDatabases (v8::Arguments const& argv) {
/// name.
////////////////////////////////////////////////////////////////////////////////
static int CreateDatabaseInAgency(string const& place, string const& name) {
AgencyComm ac;
AgencyCommResult res;
AgencyCommLocker locker(place, "WRITE");
if (! locker.successful()) {
return TRI_ERROR_INTERNAL;
}
res = ac.createDirectory(place+"/Collections/"+name);
if (res.successful()) {
return TRI_ERROR_NO_ERROR;
}
else if (res.httpCode() == 403) {
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
}
else {
return TRI_ERROR_INTERNAL;
}
}
static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const& argv) {
v8::HandleScope scope;
// Arguments are already checked, there are 1 to 3.
// First work with the arguments to create a JSON entry:
const string name = TRI_ObjectToString(argv[0]);
int ourerrno = TRI_ERROR_NO_ERROR;
ourerrno = CreateDatabaseInAgency("Plan",name);
if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Plan
// FIXME: Now wait for the directory under Current/Collections
return scope.Close(v8::True());
TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
if (0 == json) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name",
TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE,
TRI_ObjectToString(argv[0]).c_str()));
if (argv.Length() > 1) {
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options",
TRI_ObjectToJson(argv[1]));
if (argv.Length() > 2) {
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "users",
TRI_ObjectToJson(argv[2]));
}
}
TRI_V8_EXCEPTION(scope, ourerrno);
AgencyComm ac;
AgencyCommResult res;
{
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not lock plan in agency");
}
res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
if (!res.successful()) {
if (res._statusCode == 403) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME);
}
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not create entry in plan in agency");
}
}
ClusterInfo* ci = ClusterInfo::instance();
vector<ServerID> DBServers = ci->getCurrentDBServers();
res = ac.getValues("Current/Version", false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
uint64_t index = res._index;
while (true) {
res = ac.getValues("Current/Databases/"+name, true);
if (res.successful()) {
res.parse("Current/Databases/"+name+"/", false);
if (res._values.size() >= DBServers.size()) {
return scope.Close(v8::True());
}
}
res = ac.watchValue("Current/Version", index, 1.0, false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
index = res._index;
}
}
#endif
@ -8517,52 +8583,46 @@ static v8::Handle<v8::Value> JS_DropDatabase_Coordinator (v8::Arguments const& a
const string name = TRI_ObjectToString(argv[0]);
AgencyComm ac;
AgencyCommResult acres;
AgencyCommResult res;
{
AgencyCommLocker locker("Target", "WRITE");
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
// check that locking worked!
if (locker.successful()) {
// Now nobody can create or remove a database, so we can check that
// the one we want to drop does indeed exist:
acres = ac.getValues("Current/Collections/"+name+"/Lock", false);
if (! acres.successful()) {
res = ac.removeValues("Plan/Databases/"+name, false);
if (!res.successful()) {
if (res._statusCode == 403) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
}
else {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not acquire agency lock");
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
}
// Now let's lock it.
// We cannot use a locker here, because we want to remove all of
// Current/Collections/<db-name> before we are done and we must not
// unlock the Lock after that.
if (! ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
res = ac.getValues("Current/Version", false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
uint64_t index = res._index;
while (true) {
map<string, TRI_json_t*> done;
res = ac.getValues("Current/Databases/"+name, true);
if (res.successful()) {
res.parse("Current/Databases/"+name+"/", false);
if (res._values.size() == 0) {
return scope.Close(v8::True());
}
}
res = ac.watchValue("Current/Version", index, 1.0, false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
index = res._index;
}
// res = ac.getValues("Current/Collections/"+name+"/Lock, false);
// If this fails or the DB does not exist, return an error
// Remove entry Plan/Collections/<name> using Plan/Lock
// get list of DBServers during the lock
// (from now on new DBServers will no longer create a database)
// this is the point of no return
// tell all DBServers to drop database
// note errors, but there is nothing we can do about it if things go wrong
// only count and reports the servers with errors
// Remove entry Target/Collections/<name>, use Target/Lock
// Remove entry Current/Collections/<name> using Current/Lock
// (from now on coordinators will understand that the database is gone
// Release Plan/Lock
// Report error
return scope.Close(v8::True());
}
#endif
////////////////////////////////////////////////////////////////////////////////