diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 159853871d..b728315a9e 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1789,16 +1789,17 @@ static v8::Handle RemoveVocbaseCol (const bool useCollection, #ifdef TRI_ENABLE_CLUSTER -static v8::Handle CreateCollectionCoordinator (v8::Arguments const& argv, - TRI_col_type_e collectionType, - std::string const& databaseName, - TRI_col_info_t& parameter) { +static v8::Handle CreateCollectionCoordinator ( + v8::Arguments const& argv, + TRI_col_type_e collectionType, + std::string const& databaseName, + TRI_col_info_t& parameter) { v8::HandleScope scope; const string name = TRI_ObjectToString(argv[0]); uint64_t numberOfShards = 1; - std::vector shardKeys; + vector shardKeys; // default shard key shardKeys.push_back("_key"); @@ -1847,25 +1848,25 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a uint64_t id = ClusterInfo::instance()->uniqid(1 + numberOfShards); // collection id is the first unique id we got - const std::string cid = StringUtils::itoa(id); + const string cid = StringUtils::itoa(id); // fetch list of available servers in cluster, and shuffle them randomly - std::vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); + vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); if (dbServers.empty()) { TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster"); } - std::random_shuffle(dbServers.begin(), dbServers.end()); + random_shuffle(dbServers.begin(), dbServers.end()); // now create the shards std::map shards; for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server - const std::string serverId = dbServers[i % dbServers.size()]; + const string serverId = dbServers[i % dbServers.size()]; // determine shard id - const std::string shardId = "s" + StringUtils::itoa(id + 1 + i); + const string shardId = "s" + StringUtils::itoa(id + 1 + i); shards.insert(std::make_pair(shardId, serverId)); } @@ -1892,6 +1893,7 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "nrShards", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, numberOfShards)); AgencyComm agency; @@ -1903,25 +1905,58 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not lock plan in agency"); } - if (! agency.exists("Plan/Collections/" + databaseName)) { + if (! agency.exists("Plan/Databases/" + databaseName)) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "didn't find database entry in agency"); } - { - if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); - } - - AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, json, 0.0); + if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); } + + AgencyCommResult result + = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, + json, 0.0); + if (!result.successful()) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not create entry for collection in plan in agency"); + } + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } - v8::Handle result = v8::Object::New(); - // TODO: wait for the creation of the collection - return scope.Close(result); + // Now wait for it to appear and be complete: + AgencyCommResult res = agency.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + uint64_t index = res._index; + while (true) { + res = agency.getValues("Current/Collections/" + databaseName + "/" + cid, + true); + if (res.successful()) { + res.parse("", false); + map::iterator it = res._values.begin(); + if (it != res._values.end()) { + TRI_json_t const* json = (*it).second._json; + TRI_json_t const* shards = TRI_LookupArrayJson(json, "shards"); + if (TRI_IsArrayJson(shards)) { + size_t len = shards->_value._objects._length / 2; + if (len == numberOfShards) { + return scope.Close(v8::True()); + } + } + } + } + res = agency.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + index = res._index; + } } #endif @@ -2033,6 +2068,7 @@ static v8::Handle CreateVocBase (v8::Arguments const& argv, if (ServerState::instance()->isCoordinator()) { char const* originalDatabase = GetCurrentDatabaseName(); if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + TRI_FreeCollectionInfoOptions(¶meter); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } @@ -8316,43 +8352,73 @@ static v8::Handle JS_ListDatabases (v8::Arguments const& argv) { /// name. //////////////////////////////////////////////////////////////////////////////// -static int CreateDatabaseInAgency(string const& place, string const& name) { - AgencyComm ac; - AgencyCommResult res; - AgencyCommLocker locker(place, "WRITE"); - - if (! locker.successful()) { - return TRI_ERROR_INTERNAL; - } - - res = ac.createDirectory(place+"/Collections/"+name); - if (res.successful()) { - return TRI_ERROR_NO_ERROR; - } - else if (res.httpCode() == 403) { - return TRI_ERROR_ARANGO_DUPLICATE_NAME; - } - else { - return TRI_ERROR_INTERNAL; - } -} - static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& argv) { v8::HandleScope scope; - - // Arguments are already checked, there are 1 to 3. + // First work with the arguments to create a JSON entry: const string name = TRI_ObjectToString(argv[0]); - - int ourerrno = TRI_ERROR_NO_ERROR; - - ourerrno = CreateDatabaseInAgency("Plan",name); - if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Plan - // FIXME: Now wait for the directory under Current/Collections - return scope.Close(v8::True()); + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + if (0 == json) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name", + TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, + TRI_ObjectToString(argv[0]).c_str())); + if (argv.Length() > 1) { + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options", + TRI_ObjectToJson(argv[1])); + if (argv.Length() > 2) { + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "users", + TRI_ObjectToJson(argv[2])); + } } - TRI_V8_EXCEPTION(scope, ourerrno); + AgencyComm ac; + AgencyCommResult res; + + { + AgencyCommLocker locker("Plan", "WRITE"); + if (! locker.successful()) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not lock plan in agency"); + } + + res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + if (!res.successful()) { + if (res._statusCode == 403) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); + } + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not create entry in plan in agency"); + } + } + + ClusterInfo* ci = ClusterInfo::instance(); + vector DBServers = ci->getCurrentDBServers(); + + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + uint64_t index = res._index; + while (true) { + res = ac.getValues("Current/Databases/"+name, true); + if (res.successful()) { + res.parse("Current/Databases/"+name+"/", false); + if (res._values.size() >= DBServers.size()) { + return scope.Close(v8::True()); + } + } + res = ac.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + index = res._index; + } } #endif @@ -8517,52 +8583,46 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a const string name = TRI_ObjectToString(argv[0]); AgencyComm ac; - AgencyCommResult acres; + AgencyCommResult res; { - AgencyCommLocker locker("Target", "WRITE"); + AgencyCommLocker locker("Plan", "WRITE"); + if (! locker.successful()) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } - // check that locking worked! - if (locker.successful()) { - // Now nobody can create or remove a database, so we can check that - // the one we want to drop does indeed exist: - acres = ac.getValues("Current/Collections/"+name+"/Lock", false); - - if (! acres.successful()) { + res = ac.removeValues("Plan/Databases/"+name, false); + if (!res.successful()) { + if (res._statusCode == 403) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - } - else { - TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not acquire agency lock"); + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } } - // Now let's lock it. - // We cannot use a locker here, because we want to remove all of - // Current/Collections/ before we are done and we must not - // unlock the Lock after that. - if (! ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + uint64_t index = res._index; + while (true) { + map done; + res = ac.getValues("Current/Databases/"+name, true); + if (res.successful()) { + res.parse("Current/Databases/"+name+"/", false); + if (res._values.size() == 0) { + return scope.Close(v8::True()); + } + } + res = ac.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + index = res._index; } - // res = ac.getValues("Current/Collections/"+name+"/Lock, false); - - // If this fails or the DB does not exist, return an error - // Remove entry Plan/Collections/ using Plan/Lock - // get list of DBServers during the lock - // (from now on new DBServers will no longer create a database) - // this is the point of no return - // tell all DBServers to drop database - // note errors, but there is nothing we can do about it if things go wrong - // only count and reports the servers with errors - // Remove entry Target/Collections/, use Target/Lock - // Remove entry Current/Collections/ using Current/Lock - // (from now on coordinators will understand that the database is gone - // Release Plan/Lock - // Report error - - return scope.Close(v8::True()); } - #endif ////////////////////////////////////////////////////////////////////////////////