From bd5f58ff8bdbaa5e57001ed57b90c820a8e207fb Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 12:46:32 +0100 Subject: [PATCH 1/3] Temporary version of Create DB and Drop DB. --- arangod/V8Server/v8-vocbase.cpp | 149 ++++++++++++++++++-------------- 1 file changed, 83 insertions(+), 66 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 77bbdaec0f..3c53c909ce 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -8316,43 +8316,69 @@ static v8::Handle JS_ListDatabases (v8::Arguments const& argv) { /// name. //////////////////////////////////////////////////////////////////////////////// -static int CreateDatabaseInAgency(string const& place, string const& name) { - AgencyComm ac; - AgencyCommResult res; - AgencyCommLocker locker(place, "WRITE"); - - if (! locker.successful()) { - return TRI_ERROR_INTERNAL; - } - - res = ac.createDirectory(place+"/Collections/"+name); - if (res.successful()) { - return TRI_ERROR_NO_ERROR; - } - else if (res.httpCode() == 403) { - return TRI_ERROR_ARANGO_DUPLICATE_NAME; - } - else { - return TRI_ERROR_INTERNAL; - } -} - static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& argv) { v8::HandleScope scope; - - // Arguments are already checked, there are 1 to 3. + // First work with the arguments to create a JSON entry: const string name = TRI_ObjectToString(argv[0]); - - int ourerrno = TRI_ERROR_NO_ERROR; - - ourerrno = CreateDatabaseInAgency("Plan",name); - if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Plan - // FIXME: Now wait for the directory under Current/Collections - return scope.Close(v8::True()); + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + if (0 == json) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name", + TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, + TRI_ObjectToString(argv[0]).c_str())); + if (argv.Length() > 1) { + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options", + TRI_ObjectToJson(argv[1])); + if (argv.Length() > 2) { + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "users", + TRI_ObjectToJson(argv[2])); + } } - TRI_V8_EXCEPTION(scope, ourerrno); + AgencyComm ac; + AgencyCommResult res; + + { + AgencyCommLocker locker("Plan", "WRITE"); + if (! locker.successful()) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + + res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + if (!res.successful()) { + if (res._statusCode == 403) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); + } + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + } + + ClusterInfo* ci = ClusterInfo::instance(); + vector DBServers = ci->getCurrentDBServers(); + + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + uint64_t version = 1; // FIXME: füll mich aus dem Result + uint64_t index = res._index; // FIXME: dito + while (true) { + map done; + res = ac.getValues("Current/Databases/"+name, true); + if (res.successful()) { + if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { + if (done.size() >= DBServers.size()) { + return scope.Close(v8::True()); + } + } + } + res = ac.watchValue("Current/Version", index, 1.0, false); + index = res._index; + } } #endif @@ -8517,52 +8543,43 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a const string name = TRI_ObjectToString(argv[0]); AgencyComm ac; - AgencyCommResult acres; + AgencyCommResult res; { - AgencyCommLocker locker("Target", "WRITE"); + AgencyCommLocker locker("Plan", "WRITE"); + if (! locker.successful()) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } - // check that locking worked! - if (locker.successful()) { - // Now nobody can create or remove a database, so we can check that - // the one we want to drop does indeed exist: - acres = ac.getValues("Current/Collections/"+name+"/Lock", false); - - if (! acres.successful()) { + res = ac.removeValues("Plan/Databases/"+name, false); + if (!res.successful()) { + if (res._statusCode == 403) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - } - else { - TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not acquire agency lock"); + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } } - // Now let's lock it. - // We cannot use a locker here, because we want to remove all of - // Current/Collections/ before we are done and we must not - // unlock the Lock after that. - if (! ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) { + res = ac.getValues("Current/Version", false); + if (!res.successful()) { TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } - // res = ac.getValues("Current/Collections/"+name+"/Lock, false); - - // If this fails or the DB does not exist, return an error - // Remove entry Plan/Collections/ using Plan/Lock - // get list of DBServers during the lock - // (from now on new DBServers will no longer create a database) - // this is the point of no return - // tell all DBServers to drop database - // note errors, but there is nothing we can do about it if things go wrong - // only count and reports the servers with errors - // Remove entry Target/Collections/, use Target/Lock - // Remove entry Current/Collections/ using Current/Lock - // (from now on coordinators will understand that the database is gone - // Release Plan/Lock - // Report error - - return scope.Close(v8::True()); + uint64_t version = 1; // FIXME: füll mich aus dem Result + uint64_t index = res._index; // FIXME: dito + while (true) { + map done; + res = ac.getValues("Current/Databases/"+name, true); + if (res.successful()) { + if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { + if (done.size() > 0) { + return scope.Close(v8::True()); + } + } + } + res = ac.watchValue("Current/Version", index, 1.0, false); + index = res._index; + } } - #endif //////////////////////////////////////////////////////////////////////////////// From 6a427af0521dd00d7d7b5a44b0886988a3d8315d Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 13:36:04 +0100 Subject: [PATCH 2/3] Nearly finished with CreateColl and DropDatabase for coordinator. --- arangod/V8Server/v8-vocbase.cpp | 89 ++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 3c53c909ce..f70115f7e7 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1789,16 +1789,17 @@ static v8::Handle RemoveVocbaseCol (const bool useCollection, #ifdef TRI_ENABLE_CLUSTER -static v8::Handle CreateCollectionCoordinator (v8::Arguments const& argv, - TRI_col_type_e collectionType, - std::string const& databaseName, - TRI_col_info_t& parameter) { +static v8::Handle CreateCollectionCoordinator ( + v8::Arguments const& argv, + TRI_col_type_e collectionType, + std::string const& databaseName, + TRI_col_info_t& parameter) { v8::HandleScope scope; const string name = TRI_ObjectToString(argv[0]); uint64_t numberOfShards = 1; - std::vector shardKeys; + vector shardKeys; // default shard key shardKeys.push_back("_key"); @@ -1847,25 +1848,25 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a uint64_t id = ClusterInfo::instance()->uniqid(1 + numberOfShards); // collection id is the first unique id we got - const std::string cid = StringUtils::itoa(id); + const string cid = StringUtils::itoa(id); // fetch list of available servers in cluster, and shuffle them randomly - std::vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); + vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); if (dbServers.empty()) { TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster"); } - std::random_shuffle(dbServers.begin(), dbServers.end()); + random_shuffle(dbServers.begin(), dbServers.end()); // now create the shards std::map shards; for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server - const std::string serverId = dbServers[i % dbServers.size()]; + const string serverId = dbServers[i % dbServers.size()]; // determine shard id - const std::string shardId = "s" + StringUtils::itoa(id + 1 + i); + const string shardId = "s" + StringUtils::itoa(id + 1 + i); shards.insert(std::make_pair(shardId, serverId)); } @@ -1892,6 +1893,7 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "nrShards", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, numberOfShards)); AgencyComm agency; @@ -1903,25 +1905,50 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not lock plan in agency"); } - if (! agency.exists("Plan/Collections/" + databaseName)) { + if (! agency.exists("Plan/Databases/" + databaseName)) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "didn't find database entry in agency"); } - { - if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); - } - - AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, JsonHelper::toString(json), 0.0); + if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); } + + AgencyCommResult result + = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, + json, 0.0); + if (!result.successful()) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not create entry for collection in plan in agency"); + } + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } - v8::Handle result = v8::Object::New(); - // TODO: wait for the creation of the collection - return scope.Close(result); + // Now wait for it to appear and be complete: + AgencyCommResult res = agency.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + uint64_t index = res._index; + while (true) { + res = agency.getValues("Current/Collections/" + databaseName + "/" + cid, + true); + if (res.successful()) { + // FIXME + // Now extract JSON, look into length of "shards" entry, if equal to + // numberOfShards then we are done. + return scope.Close(v8::True()); + } + res = agency.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + index = res._index; + } } #endif @@ -2033,6 +2060,7 @@ static v8::Handle CreateVocBase (v8::Arguments const& argv, if (ServerState::instance()->isCoordinator()) { char const* originalDatabase = GetCurrentDatabaseName(); if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + TRI_FreeCollectionInfoOptions(¶meter); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } @@ -8344,7 +8372,8 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& AgencyCommLocker locker("Plan", "WRITE"); if (! locker.successful()) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not lock plan in agency"); } res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0); @@ -8353,7 +8382,8 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& if (res._statusCode == 403) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); } - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not create entry in plan in agency"); } } @@ -8362,10 +8392,10 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& res = ac.getValues("Current/Version", false); if (!res.successful()) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); } - uint64_t version = 1; // FIXME: füll mich aus dem Result - uint64_t index = res._index; // FIXME: dito + uint64_t index = res._index; while (true) { map done; res = ac.getValues("Current/Databases/"+name, true); @@ -8377,6 +8407,10 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& } } res = ac.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } index = res._index; } } @@ -8564,8 +8598,7 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a if (!res.successful()) { TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } - uint64_t version = 1; // FIXME: füll mich aus dem Result - uint64_t index = res._index; // FIXME: dito + uint64_t index = res._index; while (true) { map done; res = ac.getValues("Current/Databases/"+name, true); From 97476ccf088285a20307c415dffb821519c103ef Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 13:59:41 +0100 Subject: [PATCH 3/3] Create Collection completed. Use new AgencyComm. --- arangod/V8Server/v8-vocbase.cpp | 38 +++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index f70115f7e7..b728315a9e 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1937,10 +1937,18 @@ static v8::Handle CreateCollectionCoordinator ( res = agency.getValues("Current/Collections/" + databaseName + "/" + cid, true); if (res.successful()) { - // FIXME - // Now extract JSON, look into length of "shards" entry, if equal to - // numberOfShards then we are done. - return scope.Close(v8::True()); + res.parse("", false); + map::iterator it = res._values.begin(); + if (it != res._values.end()) { + TRI_json_t const* json = (*it).second._json; + TRI_json_t const* shards = TRI_LookupArrayJson(json, "shards"); + if (TRI_IsArrayJson(shards)) { + size_t len = shards->_value._objects._length / 2; + if (len == numberOfShards) { + return scope.Close(v8::True()); + } + } + } } res = agency.watchValue("Current/Version", index, 1.0, false); if (!res.successful()) { @@ -8397,13 +8405,11 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& } uint64_t index = res._index; while (true) { - map done; res = ac.getValues("Current/Databases/"+name, true); if (res.successful()) { - if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { - if (done.size() >= DBServers.size()) { - return scope.Close(v8::True()); - } + res.parse("Current/Databases/"+name+"/", false); + if (res._values.size() >= DBServers.size()) { + return scope.Close(v8::True()); } } res = ac.watchValue("Current/Version", index, 1.0, false); @@ -8596,20 +8602,24 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a res = ac.getValues("Current/Version", false); if (!res.successful()) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); } uint64_t index = res._index; while (true) { map done; res = ac.getValues("Current/Databases/"+name, true); if (res.successful()) { - if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { - if (done.size() > 0) { - return scope.Close(v8::True()); - } + res.parse("Current/Databases/"+name+"/", false); + if (res._values.size() == 0) { + return scope.Close(v8::True()); } } res = ac.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } index = res._index; } }