From 7aa29fda5b99ba8a67d52ffd7c516310fcbc2f02 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 08:52:10 +0100 Subject: [PATCH 01/13] Merge Jan's changes and preliminary version of CreateDB. --- arangod/V8Server/v8-vocbase.cpp | 153 +++----------------------------- 1 file changed, 11 insertions(+), 142 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 19207aed24..3a40e6811e 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -8316,67 +8316,25 @@ static v8::Handle JS_ListDatabases (v8::Arguments const& argv) { /// name. //////////////////////////////////////////////////////////////////////////////// -static int CreateDatabaseInAgency(string const& place, string const& name, - vector* DBServers) { +static int CreateDatabaseInAgency(string const& place, string const& name) { AgencyComm ac; AgencyCommResult res; - AgencyCommLocker locker(place, "WRITE"); if (! locker.successful()) { return TRI_ERROR_INTERNAL; } - if (0 != DBServers) { - ClusterInfo* ci = ClusterInfo::instance(); - ci->loadDBServers(); // to make sure we know about all of them - *DBServers = ci->getDBServers(); + res = ac.createDirectory(place+"/Collections/"+name); + if (res.successful()) { + return TRI_ERROR_NO_ERROR; } - res = ac.casValue(place+"/Collections/"+name+"/Lock",string("UNLOCKED"), - false, 0.0, 0.0); - - if (res.httpCode() == 412) { - // already created by someone else + else if (res.httpCode() == 403) { return TRI_ERROR_ARANGO_DUPLICATE_NAME; } - - if (res.successful()) { - res = ac.casValue(place+"/Collections/"+name+"/Version",string("1"), - false, 0.0, 0.0); - if (res.successful()) { - return TRI_ERROR_NO_ERROR; - } - - // clean up - ac.removeValues(place+"/Collections/"+name,true); + else { + return TRI_ERROR_INTERNAL; } - - return TRI_ERROR_INTERNAL; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief helper function for building a json body for our requests -//////////////////////////////////////////////////////////////////////////////// - -static string CreateDatabaseBuildJsonBody( v8::Arguments const& argv ) { - TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); - if (0 == json) { - return string(""); - } - TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name", - TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, - TRI_ObjectToString(argv[0]).c_str())); - if (argv.Length() > 1) { - TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options", - TRI_ObjectToJson(argv[1])); - if (argv.Length() > 2) { - TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "users", - TRI_ObjectToJson(argv[2])); - } - } - string jsonstr = JsonHelper::toString(json); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - return jsonstr; } static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& argv) { @@ -8392,102 +8350,13 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& int ourerrno = TRI_ERROR_NO_ERROR; - ourerrno = CreateDatabaseInAgency("Target",name,0); - if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Target - vector DBServers; - // We will get the list of DBServers whilst holding the lock to - // modify "/Plan/Collections". Therefore, everybody who is on the - // list will be told, everybody who is starting later will see the - // entry in "/Plan/Collections/..." and will create the database on - // startup. - ourerrno = CreateDatabaseInAgency("Plan",name,&DBServers); - if (ourerrno == TRI_ERROR_NO_ERROR) { - vector::iterator it; - // build request to be sent to all servers - - string jsonstr = CreateDatabaseBuildJsonBody(argv); - if (jsonstr.empty()) { - ourerrno = TRI_ERROR_INTERNAL; - } - else { - ClusterCommResult* res; - CoordTransactionID coordTransactionID = TRI_NewTickServer(); - for (it = DBServers.begin(); it != DBServers.end(); ++it) { - res = cc->asyncRequest("CreateDB", coordTransactionID, - "server:"+*it, - triagens::rest::HttpRequest::HTTP_REQUEST_POST, - "/_api/database", jsonstr.c_str(), - jsonstr.size(), new map, 0, 0.0); - delete res; - } - unsigned int done = 0; - while (done < DBServers.size()) { - res = cc->wait("", coordTransactionID, 0, "", 0.0); - if (res->status == CL_COMM_RECEIVED) { - if (res->answer_code == triagens::rest::HttpResponse::OK) { - done++; - delete res; - } - else if (res->answer_code == triagens::rest::HttpResponse::CONFLICT) { - ourerrno = TRI_ERROR_ARANGO_DUPLICATE_NAME; - delete res; - break; - } - else { - ourerrno = TRI_ERROR_INTERNAL; - delete res; - break; - } - } - else { - delete res; - break; - } - } - if (done == DBServers.size()) { - ourerrno = CreateDatabaseInAgency("Current",name,0); - if (ourerrno == TRI_ERROR_NO_ERROR) { - return scope.Close(v8::True()); - } - } - cc->drop( "CreateDatabase", coordTransactionID, 0, "" ); - for (it = DBServers.begin(); it != DBServers.end(); ++it) { - res = cc->asyncRequest("CreateDB", coordTransactionID, - "server:"+*it, - triagens::rest::HttpRequest::HTTP_REQUEST_DELETE, - "/_api/database/"+name, "", 0, - new map, 0, 0.0); - delete res; - } - done = 0; - while (done < DBServers.size()) { - res = cc->wait("", coordTransactionID, 0, "", 0.0); - delete res; - done++; - } - } - - { - AgencyCommLocker locker("Plan","WRITE"); - - // TODO: what should we do if locking fails? - if (locker.successful()) { - ac.removeValues("Plan/Collections/"+name,true); - } - } - } - { - AgencyCommLocker locker("Target","WRITE"); - - // TODO: what should we do if locking fails? - if (locker.successful()) { - ac.removeValues("Target/Collections/"+name,true); - } - } + ourerrno = CreateDatabaseInAgency("Plan",name); + if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Plan + // FIXME: Now wait for the directory under Current/Collections + return scope.Close(v8::True()); } TRI_V8_EXCEPTION(scope, ourerrno); - return scope.Close(v8::True()); } #endif From 2495b6e720ce08fd07d3f1f04b90c980d2093143 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 09:52:47 +0100 Subject: [PATCH 02/13] removed unused variables --- arangod/V8Server/v8-vocbase.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 3a40e6811e..d80153c96e 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -8344,10 +8344,6 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& const string name = TRI_ObjectToString(argv[0]); - //ClusterInfo* ci = ClusterInfo::instance(); - ClusterComm* cc = ClusterComm::instance(); - AgencyComm ac; - int ourerrno = TRI_ERROR_NO_ERROR; ourerrno = CreateDatabaseInAgency("Plan",name); From e094cc534268f915f3ea73e2fa5b810efec97bcc Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 10:17:27 +0100 Subject: [PATCH 03/13] less redundancy of URLs --- arangod/Cluster/ClusterInfo.cpp | 40 ++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index e3ba56489f..7e69c52fc3 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -443,20 +443,22 @@ vector ClusterInfo::listDatabases () { //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::loadCollections () { + static const std::string prefix = "Current/Collections"; + AgencyCommResult result; { AgencyCommLocker locker("Current", "READ"); if (locker.successful()) { - result = _agency.getValues("Current/Collections", true); + result = _agency.getValues(prefix, true); } } if (result.successful()) { std::map collections; - if (result.flattenJson(collections, "Current/Collections/", false)) { + if (result.flattenJson(collections, prefix + "/", false)) { LOG_TRACE("Current/Collections loaded successfully"); WRITE_LOCKER(_lock); @@ -516,7 +518,7 @@ void ClusterInfo::loadCollections () { } } - LOG_TRACE("Error while loading Current/Collections"); + LOG_TRACE("Error while loading %s", prefix.c_str()); _collectionsValid = false; } @@ -635,21 +637,23 @@ const std::vector ClusterInfo::getCollections (DatabaseID const& //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::loadServers () { + static const std::string prefix = "Current/ServersRegistered"; + AgencyCommResult result; { AgencyCommLocker locker("Current", "READ"); if (locker.successful()) { - result = _agency.getValues("Current/ServersRegistered", true); + result = _agency.getValues(prefix, true); } } if (result.successful()) { std::map servers; - if (result.flattenJson(servers, "Current/ServersRegistered/", false)) { - LOG_TRACE("Current/ServersRegistered loaded successfully"); + if (result.flattenJson(servers, prefix + "/", false)) { + LOG_TRACE("%s loaded successfully", prefix.c_str()); WRITE_LOCKER(_lock); _servers.clear(); @@ -665,7 +669,7 @@ void ClusterInfo::loadServers () { } } - LOG_TRACE("Error while loading Current/ServersRegistered"); + LOG_TRACE("Error while loading %s", prefix.c_str()); _serversValid = false; @@ -679,13 +683,13 @@ void ClusterInfo::loadServers () { //////////////////////////////////////////////////////////////////////////////// std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { + int tries = 0; if (! _serversValid) { loadServers(); + tries++; } - int tries = 0; - while (++tries <= 2) { { READ_LOCKER(_lock); @@ -709,21 +713,23 @@ std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { //////////////////////////////////////////////////////////////////////////////// void ClusterInfo::loadDBServers () { + static const std::string prefix = "Current/DBServers"; + AgencyCommResult result; { AgencyCommLocker locker("Current", "READ"); if (locker.successful()) { - result = _agency.getValues("Current/DBServers", true); + result = _agency.getValues(prefix, true); } } if (result.successful()) { std::map servers; - if (result.flattenJson(servers, "Current/DBServers/", false)) { - LOG_TRACE("Current/DBServers loaded successfully"); + if (result.flattenJson(servers, prefix + "/", false)) { + LOG_TRACE("%s loaded successfully", prefix.c_str()); WRITE_LOCKER(_lock); _DBServers.clear(); @@ -739,7 +745,7 @@ void ClusterInfo::loadDBServers () { } } - LOG_TRACE("Error while loading Current/DBServers"); + LOG_TRACE("Error while loading %s", prefix.c_str()); _DBServersValid = false; @@ -771,6 +777,8 @@ std::vector ClusterInfo::getDBServers () { //////////////////////////////////////////////////////////////////////////////// std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) { + static const std::string prefix = "Target/MapIDToEndpoint/"; + AgencyCommResult result; // fetch value at Target/MapIDToEndpoint @@ -778,15 +786,15 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) { AgencyCommLocker locker("Target", "READ"); if (locker.successful()) { - result = _agency.getValues("Target/MapIDToEndpoint/" + serverID, false); + result = _agency.getValues(prefix + serverID, false); } } if (result.successful()) { std::map out; - if (! result.flattenJson(out, "Target/MapIDToEndpoint/", false)) { - LOG_FATAL_AND_EXIT("Got an invalid JSON response for Target/MapIDToEndpoint"); + if (! result.flattenJson(out, prefix, false)) { + LOG_FATAL_AND_EXIT("Got an invalid JSON response for %s", prefix.c_str()); } // check if we can find ourselves in the list returned by the agency From e02b0b52e1f829a295d1f270ca24f74f8745a381 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 10:52:56 +0100 Subject: [PATCH 04/13] renamed methods --- arangod/Cluster/ClusterInfo.cpp | 43 +++++++++++++++------------------ arangod/Cluster/ClusterInfo.h | 6 ++--- arangod/Cluster/v8-cluster.cpp | 4 +-- arangod/V8Server/v8-vocbase.cpp | 8 +++--- js/server/tests/cluster.js | 2 -- 5 files changed, 29 insertions(+), 34 deletions(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 7e69c52fc3..357d9c3565 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -329,10 +329,7 @@ ClusterInfo::ClusterInfo () _DBServersValid(false) { _uniqid._currentValue = _uniqid._upperValue = 0ULL; - // Actual loading is postponed until necessary: - // loadServers(); - // loadDBServers(); - // loadCollections(); + // Actual loading into caches is postponed until necessary } //////////////////////////////////////////////////////////////////////////////// @@ -397,7 +394,7 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) { int tries = 0; if (! _collectionsValid) { - loadCollections(); + loadCurrentCollections(); ++tries; } @@ -412,8 +409,8 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) { } } - // must call loadCollections outside the lock - loadCollections(); + // must load collections outside the lock + loadCurrentCollections(); } return false; @@ -427,7 +424,7 @@ vector ClusterInfo::listDatabases () { vector res; if (! _collectionsValid) { - loadCollections(); + loadCurrentCollections(); } AllCollections::const_iterator it; @@ -442,7 +439,7 @@ vector ClusterInfo::listDatabases () { /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// -void ClusterInfo::loadCollections () { +void ClusterInfo::loadCurrentCollections () { static const std::string prefix = "Current/Collections"; AgencyCommResult result; @@ -529,12 +526,13 @@ void ClusterInfo::loadCollections () { CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID, CollectionID const& collectionID) { - if (! _collectionsValid) { - loadCollections(); - } - int tries = 0; + if (! _collectionsValid) { + loadCurrentCollections(); + ++tries; + } + while (++tries <= 2) { { READ_LOCKER(_lock); @@ -551,8 +549,8 @@ CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID, } } - // must call loadCollections outside the lock - loadCollections(); + // must load collections outside the lock + loadCurrentCollections(); } return CollectionInfo(); @@ -605,7 +603,7 @@ const std::vector ClusterInfo::getCollections (DatabaseID const& std::vector result; // always reload - loadCollections(); + loadCurrentCollections(); READ_LOCKER(_lock); // look up database by id @@ -712,7 +710,7 @@ std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// -void ClusterInfo::loadDBServers () { +void ClusterInfo::loadCurrentDBServers () { static const std::string prefix = "Current/DBServers"; AgencyCommResult result; @@ -757,9 +755,9 @@ void ClusterInfo::loadDBServers () { /// currently registered //////////////////////////////////////////////////////////////////////////////// -std::vector ClusterInfo::getDBServers () { +std::vector ClusterInfo::getCurrentDBServers () { if (! _DBServersValid) { - loadDBServers(); + loadCurrentDBServers(); } std::vector res; @@ -770,7 +768,6 @@ std::vector ClusterInfo::getDBServers () { return res; } - //////////////////////////////////////////////////////////////////////////////// /// @brief lookup the server's endpoint by scanning Target/MapIDToEnpdoint for /// our id @@ -819,7 +816,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) { int tries = 0; if (! _collectionsValid) { - loadCollections(); + loadCurrentCollections(); tries++; } @@ -833,8 +830,8 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) { } } - // must call loadCollections outside the lock - loadCollections(); + // must load collections outside the lock + loadCurrentCollections(); } return ServerID(""); diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index ffbf799faf..20b60c0c29 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -298,7 +298,7 @@ namespace triagens { /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// - void loadCollections (); + void loadCurrentCollections (); //////////////////////////////////////////////////////////////////////////////// /// @brief ask about a collection @@ -332,14 +332,14 @@ namespace triagens { /// Usually one does not have to call this directly. //////////////////////////////////////////////////////////////////////////////// - void loadDBServers (); + void loadCurrentDBServers (); //////////////////////////////////////////////////////////////////////////////// /// @brief return a list of all DBServers in the cluster that have /// currently registered //////////////////////////////////////////////////////////////////////////////// - std::vector getDBServers (); + std::vector getCurrentDBServers (); //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about servers from the agency diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 8ac2518aad..0854d24931 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -795,7 +795,7 @@ static v8::Handle JS_GetDBServers (v8::Arguments const& argv) { TRI_V8_EXCEPTION_USAGE(scope, "DBServers()"); } - std::vector DBServers = ClusterInfo::instance()->getDBServers(); + std::vector DBServers = ClusterInfo::instance()->getCurrentDBServers(); v8::Handle l = v8::Array::New(); @@ -819,7 +819,7 @@ static v8::Handle JS_ReloadDBServers (v8::Arguments const& argv) { TRI_V8_EXCEPTION_USAGE(scope, "reloadDBServers()"); } - ClusterInfo::instance()->loadDBServers(); + ClusterInfo::instance()->loadCurrentDBServers(); return scope.Close(v8::Undefined()); } diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index d80153c96e..77bbdaec0f 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1850,7 +1850,7 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a const std::string cid = StringUtils::itoa(id); // fetch list of available servers in cluster, and shuffle them randomly - std::vector dbServers = ClusterInfo::instance()->getDBServers(); + std::vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); if (dbServers.empty()) { TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster"); @@ -8179,7 +8179,7 @@ static v8::Handle JS_ListDatabases_Coordinator ClusterInfo* ci = ClusterInfo::instance(); if (argv.Length() == 0) { - ci->loadCollections(); + ci->loadCurrentCollections(); vector list = ci->listDatabases(); v8::Handle result = v8::Array::New(); for (size_t i = 0; i < list.size(); ++i) { @@ -8193,7 +8193,7 @@ static v8::Handle JS_ListDatabases_Coordinator int tries = 0; vector DBServers; while (++tries <= 2) { - DBServers = ci->getDBServers(); + DBServers = ci->getCurrentDBServers(); if (DBServers.size() != 0) { ServerID sid = DBServers[0]; ClusterComm* cc = ClusterComm::instance(); @@ -8227,7 +8227,7 @@ static v8::Handle JS_ListDatabases_Coordinator delete res; } } - ci->loadDBServers(); // just in case some new have arrived + ci->loadCurrentDBServers(); // just in case some new have arrived } // Give up: return scope.Close(v8::Undefined()); diff --git a/js/server/tests/cluster.js b/js/server/tests/cluster.js index 54ffe3577d..85cb04b733 100644 --- a/js/server/tests/cluster.js +++ b/js/server/tests/cluster.js @@ -92,8 +92,6 @@ function ClusterEnabledSuite () { catch (err) { } }); - - agency.set("Sync/LatestID", "0"); }; return { From b5a448ffdbaef80a8630a1b27e8fdef94b71aa45 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 11:03:01 +0100 Subject: [PATCH 05/13] added loadPlannedDatabases method --- arangod/Cluster/ClusterInfo.cpp | 44 ++++++++++++++++++++++++++++++++- arangod/Cluster/ClusterInfo.h | 8 ++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 357d9c3565..25f98bca96 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -434,6 +434,48 @@ vector ClusterInfo::listDatabases () { return res; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about planned databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadPlannedDatabases () { + static const std::string prefix = "Plan/Databases"; + + AgencyCommResult result; + + { + AgencyCommLocker locker("Plan", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + std::map databases; + + if (result.flattenJson(databases, prefix + "/", false)) { + LOG_TRACE("%s loaded successfully", prefix.c_str()); + + WRITE_LOCKER(_lock); + _plannedDatabases.clear(); + + std::map::const_iterator it; + for (it = databases.begin(); it != databases.end(); ++it) { + const std::string& name = (*it).first; + TRI_json_t* options = JsonHelper::fromString((*it).second); + + _plannedDatabases.insert(std::make_pair(name, options)); + } + + return; + } + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief (re-)load the information about collections from the agency /// Usually one does not have to call this directly. @@ -456,7 +498,7 @@ void ClusterInfo::loadCurrentCollections () { std::map collections; if (result.flattenJson(collections, prefix + "/", false)) { - LOG_TRACE("Current/Collections loaded successfully"); + LOG_TRACE("%s loaded successfully", prefix.c_str()); WRITE_LOCKER(_lock); _collections.clear(); diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 20b60c0c29..2fa93c5a28 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -300,6 +300,13 @@ namespace triagens { void loadCurrentCollections (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about planned databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + + void loadPlannedDatabases (); + //////////////////////////////////////////////////////////////////////////////// /// @brief ask about a collection /// If it is not found in the cache, the cache is reloaded once. @@ -391,6 +398,7 @@ namespace triagens { _uniqid; // Cached data from the agency, we reload whenever necessary: + std::map _plannedDatabases; // from Plan/Databases AllCollections _collections; // from Current/Collections/ bool _collectionsValid; std::map _servers; // from Current/ServersRegistered From 666b996a152ba5a0d9962e520e90e12ac2084da2 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 11:08:51 +0100 Subject: [PATCH 06/13] Fix arangom for the future. --- utils/arangom.in | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/utils/arangom.in b/utils/arangom.in index 4a4953bdf5..cbe5861c33 100755 --- a/utils/arangom.in +++ b/utils/arangom.in @@ -43,31 +43,27 @@ if [ "$1" == "init" ] ; then set Target/MapIDToEndpoint set Target/Version 1 - + set Target/Lock UNLOCKED set Target/DBServers set Target/Coordinators - set Target/Collections + set Target/Databases/@Usystem "{}" set Target/Collections/@Usystem - set Target/Collections/@Usystem/Version 1 - set Target/Collections/@Usystem/Lock UNLOCKED set Plan/Version 1 + set Plan/Lock UNLOCKED set Plan/DBServers set Plan/Coordinators - set Plan/Collections + set Plan/Databases/@Usystem "{}" set Plan/Collections/@Usystem - set Plan/Collections/@Usystem/Version 1 - set Plan/Collections/@Usystem/Lock UNLOCKED set Current/Version 1 - set Current/ServersRegistered + set Current/Lock UNLOCKED set Current/DBServers set Current/Coordinators - set Current/Collections + set Current/Databases/@Usystem set Current/Collections/@Usystem - set Current/Collections/@Usystem/Version 1 - set Current/Collections/@Usystem/Lock UNLOCKED + set Current/ServersRegistered set Current/ShardsCopied set Sync/ServerStates From bd5f58ff8bdbaa5e57001ed57b90c820a8e207fb Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 12:46:32 +0100 Subject: [PATCH 07/13] Temporary version of Create DB and Drop DB. --- arangod/V8Server/v8-vocbase.cpp | 149 ++++++++++++++++++-------------- 1 file changed, 83 insertions(+), 66 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 77bbdaec0f..3c53c909ce 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -8316,43 +8316,69 @@ static v8::Handle JS_ListDatabases (v8::Arguments const& argv) { /// name. //////////////////////////////////////////////////////////////////////////////// -static int CreateDatabaseInAgency(string const& place, string const& name) { - AgencyComm ac; - AgencyCommResult res; - AgencyCommLocker locker(place, "WRITE"); - - if (! locker.successful()) { - return TRI_ERROR_INTERNAL; - } - - res = ac.createDirectory(place+"/Collections/"+name); - if (res.successful()) { - return TRI_ERROR_NO_ERROR; - } - else if (res.httpCode() == 403) { - return TRI_ERROR_ARANGO_DUPLICATE_NAME; - } - else { - return TRI_ERROR_INTERNAL; - } -} - static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& argv) { v8::HandleScope scope; - - // Arguments are already checked, there are 1 to 3. + // First work with the arguments to create a JSON entry: const string name = TRI_ObjectToString(argv[0]); - - int ourerrno = TRI_ERROR_NO_ERROR; - - ourerrno = CreateDatabaseInAgency("Plan",name); - if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Plan - // FIXME: Now wait for the directory under Current/Collections - return scope.Close(v8::True()); + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + if (0 == json) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name", + TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, + TRI_ObjectToString(argv[0]).c_str())); + if (argv.Length() > 1) { + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options", + TRI_ObjectToJson(argv[1])); + if (argv.Length() > 2) { + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "users", + TRI_ObjectToJson(argv[2])); + } } - TRI_V8_EXCEPTION(scope, ourerrno); + AgencyComm ac; + AgencyCommResult res; + + { + AgencyCommLocker locker("Plan", "WRITE"); + if (! locker.successful()) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + + res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + if (!res.successful()) { + if (res._statusCode == 403) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); + } + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + } + + ClusterInfo* ci = ClusterInfo::instance(); + vector DBServers = ci->getCurrentDBServers(); + + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } + uint64_t version = 1; // FIXME: füll mich aus dem Result + uint64_t index = res._index; // FIXME: dito + while (true) { + map done; + res = ac.getValues("Current/Databases/"+name, true); + if (res.successful()) { + if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { + if (done.size() >= DBServers.size()) { + return scope.Close(v8::True()); + } + } + } + res = ac.watchValue("Current/Version", index, 1.0, false); + index = res._index; + } } #endif @@ -8517,52 +8543,43 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a const string name = TRI_ObjectToString(argv[0]); AgencyComm ac; - AgencyCommResult acres; + AgencyCommResult res; { - AgencyCommLocker locker("Target", "WRITE"); + AgencyCommLocker locker("Plan", "WRITE"); + if (! locker.successful()) { + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + } - // check that locking worked! - if (locker.successful()) { - // Now nobody can create or remove a database, so we can check that - // the one we want to drop does indeed exist: - acres = ac.getValues("Current/Collections/"+name+"/Lock", false); - - if (! acres.successful()) { + res = ac.removeValues("Plan/Databases/"+name, false); + if (!res.successful()) { + if (res._statusCode == 403) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - } - else { - TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not acquire agency lock"); + TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } } - // Now let's lock it. - // We cannot use a locker here, because we want to remove all of - // Current/Collections/ before we are done and we must not - // unlock the Lock after that. - if (! ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) { + res = ac.getValues("Current/Version", false); + if (!res.successful()) { TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } - // res = ac.getValues("Current/Collections/"+name+"/Lock, false); - - // If this fails or the DB does not exist, return an error - // Remove entry Plan/Collections/ using Plan/Lock - // get list of DBServers during the lock - // (from now on new DBServers will no longer create a database) - // this is the point of no return - // tell all DBServers to drop database - // note errors, but there is nothing we can do about it if things go wrong - // only count and reports the servers with errors - // Remove entry Target/Collections/, use Target/Lock - // Remove entry Current/Collections/ using Current/Lock - // (from now on coordinators will understand that the database is gone - // Release Plan/Lock - // Report error - - return scope.Close(v8::True()); + uint64_t version = 1; // FIXME: füll mich aus dem Result + uint64_t index = res._index; // FIXME: dito + while (true) { + map done; + res = ac.getValues("Current/Databases/"+name, true); + if (res.successful()) { + if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { + if (done.size() > 0) { + return scope.Close(v8::True()); + } + } + } + res = ac.watchValue("Current/Version", index, 1.0, false); + index = res._index; + } } - #endif //////////////////////////////////////////////////////////////////////////////// From 6a427af0521dd00d7d7b5a44b0886988a3d8315d Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 13:36:04 +0100 Subject: [PATCH 08/13] Nearly finished with CreateColl and DropDatabase for coordinator. --- arangod/V8Server/v8-vocbase.cpp | 89 ++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 3c53c909ce..f70115f7e7 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1789,16 +1789,17 @@ static v8::Handle RemoveVocbaseCol (const bool useCollection, #ifdef TRI_ENABLE_CLUSTER -static v8::Handle CreateCollectionCoordinator (v8::Arguments const& argv, - TRI_col_type_e collectionType, - std::string const& databaseName, - TRI_col_info_t& parameter) { +static v8::Handle CreateCollectionCoordinator ( + v8::Arguments const& argv, + TRI_col_type_e collectionType, + std::string const& databaseName, + TRI_col_info_t& parameter) { v8::HandleScope scope; const string name = TRI_ObjectToString(argv[0]); uint64_t numberOfShards = 1; - std::vector shardKeys; + vector shardKeys; // default shard key shardKeys.push_back("_key"); @@ -1847,25 +1848,25 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a uint64_t id = ClusterInfo::instance()->uniqid(1 + numberOfShards); // collection id is the first unique id we got - const std::string cid = StringUtils::itoa(id); + const string cid = StringUtils::itoa(id); // fetch list of available servers in cluster, and shuffle them randomly - std::vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); + vector dbServers = ClusterInfo::instance()->getCurrentDBServers(); if (dbServers.empty()) { TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster"); } - std::random_shuffle(dbServers.begin(), dbServers.end()); + random_shuffle(dbServers.begin(), dbServers.end()); // now create the shards std::map shards; for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server - const std::string serverId = dbServers[i % dbServers.size()]; + const string serverId = dbServers[i % dbServers.size()]; // determine shard id - const std::string shardId = "s" + StringUtils::itoa(id + 1 + i); + const string shardId = "s" + StringUtils::itoa(id + 1 + i); shards.insert(std::make_pair(shardId, serverId)); } @@ -1892,6 +1893,7 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards)); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "nrShards", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, numberOfShards)); AgencyComm agency; @@ -1903,25 +1905,50 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not lock plan in agency"); } - if (! agency.exists("Plan/Collections/" + databaseName)) { + if (! agency.exists("Plan/Databases/" + databaseName)) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "didn't find database entry in agency"); } - { - if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); - } - - AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, JsonHelper::toString(json), 0.0); + if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); } + + AgencyCommResult result + = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, + json, 0.0); + if (!result.successful()) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not create entry for collection in plan in agency"); + } + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } - v8::Handle result = v8::Object::New(); - // TODO: wait for the creation of the collection - return scope.Close(result); + // Now wait for it to appear and be complete: + AgencyCommResult res = agency.getValues("Current/Version", false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + uint64_t index = res._index; + while (true) { + res = agency.getValues("Current/Collections/" + databaseName + "/" + cid, + true); + if (res.successful()) { + // FIXME + // Now extract JSON, look into length of "shards" entry, if equal to + // numberOfShards then we are done. + return scope.Close(v8::True()); + } + res = agency.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } + index = res._index; + } } #endif @@ -2033,6 +2060,7 @@ static v8::Handle CreateVocBase (v8::Arguments const& argv, if (ServerState::instance()->isCoordinator()) { char const* originalDatabase = GetCurrentDatabaseName(); if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { + TRI_FreeCollectionInfoOptions(¶meter); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); } @@ -8344,7 +8372,8 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& AgencyCommLocker locker("Plan", "WRITE"); if (! locker.successful()) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not lock plan in agency"); } res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0); @@ -8353,7 +8382,8 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& if (res._statusCode == 403) { TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); } - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not create entry in plan in agency"); } } @@ -8362,10 +8392,10 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& res = ac.getValues("Current/Version", false); if (!res.successful()) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); } - uint64_t version = 1; // FIXME: füll mich aus dem Result - uint64_t index = res._index; // FIXME: dito + uint64_t index = res._index; while (true) { map done; res = ac.getValues("Current/Databases/"+name, true); @@ -8377,6 +8407,10 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& } } res = ac.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } index = res._index; } } @@ -8564,8 +8598,7 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a if (!res.successful()) { TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); } - uint64_t version = 1; // FIXME: füll mich aus dem Result - uint64_t index = res._index; // FIXME: dito + uint64_t index = res._index; while (true) { map done; res = ac.getValues("Current/Databases/"+name, true); From 4a805062bb81343d631a182659a3e861d3b48024 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 13:36:38 +0100 Subject: [PATCH 09/13] changed internal APIs --- arangod/Cluster/AgencyComm.cpp | 375 +++++++++++++++++++------ arangod/Cluster/AgencyComm.h | 61 ++-- arangod/Cluster/ApplicationCluster.cpp | 47 +++- arangod/Cluster/ClusterInfo.cpp | 187 ++++++------ arangod/Cluster/ClusterInfo.h | 2 + arangod/Cluster/HeartbeatThread.cpp | 73 ++--- arangod/Cluster/HeartbeatThread.h | 2 +- arangod/Cluster/ServerState.cpp | 31 +- arangod/Cluster/v8-cluster.cpp | 104 ++++--- arangod/V8Server/v8-vocbase.cpp | 2 +- init-cluster.sh | 12 +- lib/Basics/JsonHelper.cpp | 58 ++++ lib/Basics/JsonHelper.h | 27 ++ utils/arangom.in | 8 +- 14 files changed, 658 insertions(+), 331 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 6b6593a690..98324e35d9 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -26,6 +26,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "Cluster/AgencyComm.h" +#include "Basics/JsonHelper.h" #include "Basics/ReadLocker.h" #include "Basics/StringUtils.h" #include "Basics/WriteLocker.h" @@ -89,6 +90,7 @@ AgencyCommResult::AgencyCommResult () : _location(), _message(), _body(), + _values(), _index(0), _statusCode(0), _connected(false) { @@ -99,6 +101,15 @@ AgencyCommResult::AgencyCommResult () //////////////////////////////////////////////////////////////////////////////// AgencyCommResult::~AgencyCommResult () { + // free all JSON data + std::map::iterator it = _values.begin(); + + while (it != _values.end()) { + if ((*it).second._json != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, (*it).second._json); + } + ++it; + } } // ----------------------------------------------------------------------------- @@ -232,6 +243,7 @@ std::string AgencyCommResult::errorDetails () const { return _message + " (" + errorMessage + ")"; } +/* //////////////////////////////////////////////////////////////////////////////// /// @brief recursively flatten the JSON response into a map /// @@ -455,6 +467,124 @@ bool AgencyCommResult::flattenJson (std::map& out, return result; } +*/ +//////////////////////////////////////////////////////////////////////////////// +/// @brief recursively flatten the JSON response into a map +/// +/// stripKeyPrefix is decoded, as is the _globalPrefix +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyCommResult::parseJsonNode (TRI_json_t const* node, + std::string const& stripKeyPrefix, + bool withDirs) { + if (! TRI_IsArrayJson(node)) { + return true; + } + + // get "key" attribute + TRI_json_t const* key = TRI_LookupArrayJson(node, "key"); + + if (! TRI_IsStringJson(key)) { + return false; + } + + std::string keydecoded + = AgencyComm::decodeKey(std::string(key->_value._string.data, + key->_value._string.length-1)); + + // make sure we don't strip more bytes than the key is long + const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size(); + const size_t length = keydecoded.size(); + + std::string prefix; + if (offset >= length) { + prefix = ""; + } + else { + prefix = keydecoded.substr(offset); + } + + // get "dir" attribute + TRI_json_t const* dir = TRI_LookupArrayJson(node, "dir"); + bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean); + + if (isDir) { + if (withDirs) { + AgencyCommResultEntry entry; + + entry._index = 0; + entry._json = 0; + entry._isDir = true; + _values.insert(std::make_pair(prefix, entry)); + } + + // is a directory, so there may be a "nodes" attribute + TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes"); + + if (! TRI_IsListJson(nodes)) { + // if directory is empty... + return true; + } + + const size_t n = TRI_LengthVector(&nodes->_value._objects); + + for (size_t i = 0; i < n; ++i) { + if (! parseJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i), + stripKeyPrefix, + withDirs)) { + return false; + } + } + } + else { + // not a directory + + // get "value" attribute + TRI_json_t const* value = TRI_LookupArrayJson(node, "value"); + + if (TRI_IsStringJson(value)) { + if (! prefix.empty()) { + AgencyCommResultEntry entry; + + // get "modifiedIndex" + entry._index = triagens::basics::JsonHelper::stringUInt64(node, "modifiedIndex"); + entry._json = triagens::basics::JsonHelper::fromString(value->_value._string.data, value->_value._string.length - 1); + entry._isDir = false; + + _values.insert(std::make_pair(prefix, entry)); + } + } + } + + return true; +} + +//////////////////////////////////////////////////////////////////////////////// +/// parse an agency result +/// note that stripKeyPrefix is a decoded, normal key! +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyCommResult::parse (std::string const& stripKeyPrefix, + bool withDirs) { + TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); + + if (! TRI_IsArrayJson(json)) { + if (json != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + } + return false; + } + + _values.clear(); + + // get "node" attribute + TRI_json_t const* node = TRI_LookupArrayJson(json, "node"); + + const bool result = parseJsonNode(node, stripKeyPrefix, withDirs); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + + return result; +} // ----------------------------------------------------------------------------- // --SECTION-- AgencyComm @@ -516,29 +646,19 @@ AgencyCommLocker::AgencyCommLocker (std::string const& key, double ttl) : _key(key), _type(type), + _json(0), _version(0), _isLocked(false) { AgencyComm comm; - if (comm.lock(key, ttl, 0.0, type)) { - fetchVersion(comm); - _isLocked = true; + + _json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, type.c_str(), type.size()); + + if (_json == 0) { + return; } -} -//////////////////////////////////////////////////////////////////////////////// -/// @brief constructs an agency comm locker with default timeout -//////////////////////////////////////////////////////////////////////////////// - -AgencyCommLocker::AgencyCommLocker (std::string const& key, - std::string const& type) - : _key(key), - _type(type), - _version(0), - _isLocked(false) { - - AgencyComm comm; - if (comm.lock(key, AgencyComm::_globalConnectionOptions._lockTimeout, 0.0, type)) { + if (comm.lock(key, ttl, 0.0, _json)) { fetchVersion(comm); _isLocked = true; } @@ -550,6 +670,10 @@ AgencyCommLocker::AgencyCommLocker (std::string const& key, AgencyCommLocker::~AgencyCommLocker () { unlock(); + + if (_json != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _json); + } } // ----------------------------------------------------------------------------- @@ -565,7 +689,7 @@ void AgencyCommLocker::unlock () { AgencyComm comm; updateVersion(comm); - if (comm.unlock(_key, _type, 0.0)) { + if (comm.unlock(_key, _json, 0.0)) { _isLocked = false; } } @@ -586,22 +710,21 @@ bool AgencyCommLocker::fetchVersion (AgencyComm& comm) { AgencyCommResult result = comm.getValues(_key + "/Version", false); if (! result.successful()) { - if (result.httpCode() != 404) { + if (result.httpCode() != (int) triagens::rest::HttpResponse::NOT_FOUND) { return false; } return true; } - - std::map out; - result.flattenJson(out, "", false); - std::map::const_iterator it = out.begin(); + + result.parse("", false); + std::map::const_iterator it = result._values.begin(); - if (it == out.end()) { + if (it == result._values.end()) { return false; } - _version = triagens::basics::StringUtils::uint64((*it).second); + _version = triagens::basics::JsonHelper::stringUInt64((*it).second._json); return true; } @@ -617,20 +740,44 @@ bool AgencyCommLocker::updateVersion (AgencyComm& comm) { AgencyCommResult result; if (_version == 0) { + TRI_json_t* json = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, 1); + + if (json == 0) { + return false; + } + // no Version key found, now set it result = comm.casValue(_key + "/Version", - "1", + json, false, 0.0, 0.0); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } else { // Version key found, now update it + TRI_json_t* oldJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, _version); + + if (oldJson == 0) { + return false; + } + + TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, _version + 1); + + if (newJson == 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); + return false; + } + result = comm.casValue(_key + "/Version", - triagens::basics::StringUtils::itoa(_version), - triagens::basics::StringUtils::itoa(_version + 1), + oldJson, + newJson, 0.0, 0.0); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); } return result.successful(); @@ -963,19 +1110,6 @@ std::string AgencyComm::generateStamp () { return std::string(buffer, len); } -//////////////////////////////////////////////////////////////////////////////// -/// @brief validates the lock type -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyComm::checkLockType (std::string const& key, - std::string const& value) { - if (value != "READ" && value != "WRITE") { - return false; - } - - return true; -} - // ----------------------------------------------------------------------------- // --SECTION-- private static methods // ----------------------------------------------------------------------------- @@ -1018,13 +1152,21 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe bool AgencyComm::sendServerState () { // construct JSON value { "status": "...", "time": "..." } - std::string value("{\"status\":\""); - value.append(ServerState::stateToString(ServerState::instance()->getState())); - value.append("\",\"time\":\""); - value.append(AgencyComm::generateStamp()); - value.append("\"}"); - - AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), value, 0.0)); + TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); + + if (json == 0) { + return false; + } + + const std::string status = ServerState::stateToString(ServerState::instance()->getState()); + const std::string stamp = AgencyComm::generateStamp(); + + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "status", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, status.c_str(), status.size())); + TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "time", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, stamp.c_str(), stamp.size())); + + AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), json, 0.0)); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + return result.successful(); } @@ -1071,7 +1213,7 @@ AgencyCommResult AgencyComm::createDirectory (std::string const& key) { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult AgencyComm::setValue (std::string const& key, - std::string const& value, + TRI_json_t const* json, double ttl) { AgencyCommResult result; @@ -1079,7 +1221,7 @@ AgencyCommResult AgencyComm::setValue (std::string const& key, _globalConnectionOptions._requestTimeout, result, buildUrl(key) + ttlParam(ttl, true), - "value=" + triagens::basics::StringUtils::urlEncode(value), + "value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(json)), false); return result; @@ -1165,7 +1307,7 @@ AgencyCommResult AgencyComm::removeValues (std::string const& key, //////////////////////////////////////////////////////////////////////////////// AgencyCommResult AgencyComm::casValue (std::string const& key, - std::string const& value, + TRI_json_t const* json, bool prevExist, double ttl, double timeout) { @@ -1176,7 +1318,7 @@ AgencyCommResult AgencyComm::casValue (std::string const& key, result, buildUrl(key) + "?prevExist=" + (prevExist ? "true" : "false") + ttlParam(ttl, false), - "value=" + triagens::basics::StringUtils::urlEncode(value), + "value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(json)), false); return result; @@ -1189,8 +1331,8 @@ AgencyCommResult AgencyComm::casValue (std::string const& key, //////////////////////////////////////////////////////////////////////////////// AgencyCommResult AgencyComm::casValue (std::string const& key, - std::string const& oldValue, - std::string const& newValue, + TRI_json_t const* oldJson, + TRI_json_t const* newJson, double ttl, double timeout) { AgencyCommResult result; @@ -1199,9 +1341,9 @@ AgencyCommResult AgencyComm::casValue (std::string const& key, timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, result, buildUrl(key) + "?prevValue=" - + triagens::basics::StringUtils::urlEncode(oldValue) + + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(oldJson)) + ttlParam(ttl, false), - "value=" + triagens::basics::StringUtils::urlEncode(newValue), + "value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(newJson)), false); return result; @@ -1245,7 +1387,15 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key, bool AgencyComm::lockRead (std::string const& key, double ttl, double timeout) { - return lock(key, ttl, timeout, "READ"); + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", 4); + + if (json == 0) { + return false; + } + + bool result = lock(key, ttl, timeout, json); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -1255,7 +1405,15 @@ bool AgencyComm::lockRead (std::string const& key, bool AgencyComm::lockWrite (std::string const& key, double ttl, double timeout) { - return lock(key, ttl, timeout, "WRITE"); + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", 5); + + if (json == 0) { + return false; + } + + bool result = lock(key, ttl, timeout, json); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -1264,7 +1422,15 @@ bool AgencyComm::lockWrite (std::string const& key, bool AgencyComm::unlockRead (std::string const& key, double timeout) { - return unlock(key, "READ", timeout); + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", 4); + + if (json == 0) { + return false; + } + + bool result = unlock(key, json, timeout); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -1273,7 +1439,15 @@ bool AgencyComm::unlockRead (std::string const& key, bool AgencyComm::unlockWrite (std::string const& key, double timeout) { - return unlock(key, "WRITE", timeout); + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", 5); + + if (json == 0) { + return false; + } + + bool result = unlock(key, json, timeout); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -1295,24 +1469,40 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, return result; } - std::map out; - result.flattenJson(out, "", false); - std::map::const_iterator it = out.begin(); + result.parse("", false); + + TRI_json_t* oldJson = 0; - std::string oldValue; - if (it != out.end()) { - oldValue = (*it).second; + std::map::iterator it = result._values.begin(); + + if (it != result._values.end()) { + // steal the json + oldJson = (*it).second._json; + (*it).second._json = 0; } else { - oldValue = "0"; - } - - uint64_t newValue = triagens::basics::StringUtils::int64(oldValue) + count; + oldJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "0", 1); + } - result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0, timeout); + if (oldJson == 0) { + return AgencyCommResult(); + } + + const uint64_t oldValue = triagens::basics::JsonHelper::stringUInt64(oldJson) + count; + const uint64_t newValue = oldValue + count; + TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, newValue); + + if (newJson == 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); + return AgencyCommResult(); + } + + result = casValue(key, oldJson, newJson, 0.0, timeout); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); if (result.successful()) { - result._index = triagens::basics::StringUtils::int64(oldValue) + 1; + result._index = oldValue + 1; break; } } @@ -1344,11 +1534,7 @@ std::string AgencyComm::ttlParam (double ttl, bool AgencyComm::lock (std::string const& key, double ttl, double timeout, - std::string const& value) { - if (! checkLockType(key, value)) { - return false; - } - + TRI_json_t const* json) { if (ttl == 0.0) { ttl = _globalConnectionOptions._lockTimeout; } @@ -1360,16 +1546,25 @@ bool AgencyComm::lock (std::string const& key, const double end = TRI_microtime() + timeout; while (true) { + TRI_json_t* oldJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", 8); + + if (oldJson == 0) { + return false; + } + AgencyCommResult result = casValue(key + "/Lock", - "UNLOCKED", - value, + oldJson, + json, ttl, timeout); - if (! result.successful() && result.httpCode() == 404) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); + + if (! result.successful() && + result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) { // key does not yet exist. create it now result = casValue(key + "/Lock", - value, + json, false, ttl, timeout); @@ -1397,12 +1592,8 @@ bool AgencyComm::lock (std::string const& key, //////////////////////////////////////////////////////////////////////////////// bool AgencyComm::unlock (std::string const& key, - std::string const& value, + TRI_json_t const* json, double timeout) { - if (! checkLockType(key, value)) { - return false; - } - if (timeout == 0.0) { timeout = _globalConnectionOptions._lockTimeout; } @@ -1410,12 +1601,20 @@ bool AgencyComm::unlock (std::string const& key, const double end = TRI_microtime() + timeout; while (true) { + TRI_json_t* newJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", 8); + + if (newJson == 0) { + return false; + } + AgencyCommResult result = casValue(key + "/Lock", - value, - std::string("UNLOCKED"), + json, + newJson, 0.0, timeout); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); + if (result.successful()) { return true; } diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 976f283832..cb607c6be9 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -107,6 +107,16 @@ namespace triagens { size_t _connectRetries; }; +// ----------------------------------------------------------------------------- +// --SECTION-- AgencyCommResultEntry +// ----------------------------------------------------------------------------- + + struct AgencyCommResultEntry { + uint64_t _index; + TRI_json_t* _json; + bool _isDir; + }; + // ----------------------------------------------------------------------------- // --SECTION-- AgencyCommResult // ----------------------------------------------------------------------------- @@ -194,7 +204,7 @@ namespace triagens { const std::string body () const { return _body; } - +/* //////////////////////////////////////////////////////////////////////////////// /// @brief recursively flatten the JSON response into a map //////////////////////////////////////////////////////////////////////////////// @@ -226,6 +236,24 @@ namespace triagens { bool flattenJson (std::map&, std::string const&, bool) const; +*/ +//////////////////////////////////////////////////////////////////////////////// +/// @brief recursively flatten the JSON response into a map +/// +/// stripKeyPrefix is decoded, as is the _globalPrefix +//////////////////////////////////////////////////////////////////////////////// + + bool parseJsonNode (TRI_json_t const*, + std::string const&, + bool); + +//////////////////////////////////////////////////////////////////////////////// +/// parse an agency result +/// note that stripKeyPrefix is a decoded, normal key! +//////////////////////////////////////////////////////////////////////////////// + + bool parse (std::string const&, + bool); // ----------------------------------------------------------------------------- // --SECTION-- public variables @@ -236,6 +264,8 @@ namespace triagens { std::string _location; std::string _message; std::string _body; + + std::map _values; uint64_t _index; int _statusCode; bool _connected; @@ -261,14 +291,7 @@ namespace triagens { AgencyCommLocker (std::string const&, std::string const&, - double); - -//////////////////////////////////////////////////////////////////////////////// -/// @brief constructs an agency comm locker with default timeout -//////////////////////////////////////////////////////////////////////////////// - - AgencyCommLocker (std::string const&, - std::string const&); + double = 0.0); //////////////////////////////////////////////////////////////////////////////// /// @brief destroys an agency comm locker @@ -322,6 +345,7 @@ namespace triagens { const std::string _key; const std::string _type; + TRI_json_t* _json; uint64_t _version; bool _isLocked; @@ -425,13 +449,6 @@ namespace triagens { static std::string generateStamp (); -//////////////////////////////////////////////////////////////////////////////// -/// @brief validates the lock type -//////////////////////////////////////////////////////////////////////////////// - - static bool checkLockType (std::string const&, - std::string const&); - // ----------------------------------------------------------------------------- // --SECTION-- private static methods // ----------------------------------------------------------------------------- @@ -469,7 +486,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult setValue (std::string const&, - std::string const&, + TRI_json_t const*, double); //////////////////////////////////////////////////////////////////////////////// @@ -498,7 +515,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult casValue (std::string const&, - std::string const&, + TRI_json_t const*, bool, double, double); @@ -510,8 +527,8 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult casValue (std::string const&, - std::string const&, - std::string const&, + TRI_json_t const*, + TRI_json_t const*, double, double); @@ -610,14 +627,14 @@ namespace triagens { bool lock (std::string const&, double, double, - std::string const&); + TRI_json_t const*); //////////////////////////////////////////////////////////////////////////////// /// @brief release a lock //////////////////////////////////////////////////////////////////////////////// bool unlock (std::string const&, - std::string const&, + TRI_json_t const*, double); //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index 45ca2ef0ad..c10ce36762 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -27,6 +27,7 @@ #include "ApplicationCluster.h" #include "Rest/Endpoint.h" +#include "Basics/JsonHelper.h" #include "SimpleHttpClient/ConnectionManager.h" #include "Cluster/HeartbeatThread.h" #include "Cluster/ServerState.h" @@ -226,17 +227,15 @@ bool ApplicationCluster::start () { AgencyCommResult result = comm.getValues("Sync/HeartbeatIntervalMs", false); if (result.successful()) { - std::map value; + result.parse("", false); - if (result.flattenJson(value, "", false)) { - std::map::const_iterator it = value.begin(); + std::map::const_iterator it = result._values.begin(); - if (it != value.end()) { - _heartbeatInterval = triagens::basics::StringUtils::uint64((*it).second); + if (it != result._values.end()) { + _heartbeatInterval = triagens::basics::JsonHelper::stringUInt64((*it).second._json); - LOG_INFO("using heartbeat interval value '%llu ms' from agency", - (unsigned long long) _heartbeatInterval); - } + LOG_INFO("using heartbeat interval value '%llu ms' from agency", + (unsigned long long) _heartbeatInterval); } } @@ -287,7 +286,15 @@ bool ApplicationCluster::open () { AgencyCommLocker locker("Current", "WRITE"); if (locker.successful()) { - result = comm.setValue("Current/ServersRegistered/" + _myId, _myAddress, 0.0); + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, _myAddress.c_str(), _myAddress.size()); + + if (json == 0) { + locker.unlock(); + LOG_FATAL_AND_EXIT("out of memory"); + } + + result = comm.setValue("Current/ServersRegistered/" + _myId, json, 0.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } if (! result.successful()) { @@ -296,20 +303,38 @@ bool ApplicationCluster::open () { } if (role == ServerState::ROLE_COORDINATOR) { + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "none", 4); + + if (json == 0) { + locker.unlock(); + LOG_FATAL_AND_EXIT("out of memory"); + } + ServerState::instance()->setState(ServerState::STATE_SERVING); // register coordinator - AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, "none", 0.0); + AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, json, 0.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + if (! result.successful()) { locker.unlock(); LOG_FATAL_AND_EXIT("unable to register coordinator in agency"); } } else if (role == ServerState::ROLE_PRIMARY) { + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "none", 4); + + if (json == 0) { + locker.unlock(); + LOG_FATAL_AND_EXIT("out of memory"); + } + ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); // register server - AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, "none", 0.0); + AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, json, 0.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + if (! result.successful()) { locker.unlock(); LOG_FATAL_AND_EXIT("unable to register db server in agency"); diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 25f98bca96..cb7f394867 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -84,6 +84,22 @@ CollectionInfo::CollectionInfo (std::string const& data) { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from json +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo::CollectionInfo (TRI_json_t* json) { + if (json != 0) { + if (JsonHelper::isArray(json)) { + if (! createFromJson(json)) { + invalidate(); + } + } + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a collection info object from another //////////////////////////////////////////////////////////////////////////////// @@ -453,24 +469,23 @@ void ClusterInfo::loadPlannedDatabases () { } if (result.successful()) { - std::map databases; + result.parse(prefix + "/", false); - if (result.flattenJson(databases, prefix + "/", false)) { - LOG_TRACE("%s loaded successfully", prefix.c_str()); - - WRITE_LOCKER(_lock); - _plannedDatabases.clear(); + WRITE_LOCKER(_lock); + _plannedDatabases.clear(); - std::map::const_iterator it; - for (it = databases.begin(); it != databases.end(); ++it) { - const std::string& name = (*it).first; - TRI_json_t* options = JsonHelper::fromString((*it).second); + std::map::iterator it = result._values.begin(); - _plannedDatabases.insert(std::make_pair(name, options)); - } + while (it != result._values.end()) { + const std::string& name = (*it).first; + TRI_json_t* options = (*it).second._json; - return; + // steal the json + (*it).second._json = 0; + _plannedDatabases.insert(std::make_pair(name, options)); } + + return; } LOG_TRACE("Error while loading %s", prefix.c_str()); @@ -495,61 +510,59 @@ void ClusterInfo::loadCurrentCollections () { } if (result.successful()) { - std::map collections; - - if (result.flattenJson(collections, prefix + "/", false)) { - LOG_TRACE("%s loaded successfully", prefix.c_str()); + result.parse(prefix + "/", false); - WRITE_LOCKER(_lock); - _collections.clear(); - _shardIds.clear(); + WRITE_LOCKER(_lock); + _collections.clear(); + _shardIds.clear(); - std::map::const_iterator it; - for (it = collections.begin(); it != collections.end(); ++it) { - const std::string& key = (*it).first; + std::map::iterator it = result._values.begin(); - // each entry consists of a database id and a collection id, separated by '/' - std::vector parts = triagens::basics::StringUtils::split(key, '/'); + for (; it != result._values.end(); ++it) { + const std::string& key = (*it).first; + + // each entry consists of a database id and a collection id, separated by '/' + std::vector parts = triagens::basics::StringUtils::split(key, '/'); - if (parts.size() != 2) { - // invalid entry - LOG_WARNING("found invalid collection key in agency: '%s'", key.c_str()); - continue; - } + if (parts.size() != 2) { + // invalid entry + LOG_WARNING("found invalid collection key in agency: '%s'", key.c_str()); + continue; + } - const std::string& database = parts[0]; - const std::string& collection = parts[1]; + const std::string& database = parts[0]; + const std::string& collection = parts[1]; - // check whether we have created an entry for the database already - AllCollections::iterator it2 = _collections.find(database); - if (it2 == _collections.end()) { - // not yet, so create an entry for the database - DatabaseCollections empty; - _collections.insert(std::make_pair(database, empty)); - it2 = _collections.find(database); - } + // check whether we have created an entry for the database already + AllCollections::iterator it2 = _collections.find(database); - if (collection == "Lock" || collection == "Version") { - continue; - } + if (it2 == _collections.end()) { + // not yet, so create an entry for the database + DatabaseCollections empty; + _collections.insert(std::make_pair(database, empty)); + it2 = _collections.find(database); + } - const CollectionInfo collectionData((*it).second); + TRI_json_t* json = (*it).second._json; + // steal the json + (*it).second._json = 0; - // insert the collection into the existing map - - (*it2).second.insert(std::make_pair(collection, collectionData)); - (*it2).second.insert(std::make_pair(collectionData.name(), collectionData)); - - std::map shards = collectionData.shardIds(); - std::map::const_iterator it3 = shards.begin(); + const CollectionInfo collectionData(json); - while (it3 != shards.end()) { - const std::string shardId = (*it3).first; - const std::string serverId = (*it3).second; + // insert the collection into the existing map - _shardIds.insert(std::make_pair(shardId, serverId)); - ++it3; - } + (*it2).second.insert(std::make_pair(collection, collectionData)); + (*it2).second.insert(std::make_pair(collectionData.name(), collectionData)); + + std::map shards = collectionData.shardIds(); + std::map::const_iterator it3 = shards.begin(); + + while (it3 != shards.end()) { + const std::string shardId = (*it3).first; + const std::string serverId = (*it3).second; + + _shardIds.insert(std::make_pair(shardId, serverId)); + ++it3; } _collectionsValid = true; @@ -690,23 +703,23 @@ void ClusterInfo::loadServers () { } if (result.successful()) { - std::map servers; - - if (result.flattenJson(servers, prefix + "/", false)) { - LOG_TRACE("%s loaded successfully", prefix.c_str()); + result.parse(prefix + "/", false); - WRITE_LOCKER(_lock); - _servers.clear(); + WRITE_LOCKER(_lock); + _servers.clear(); - std::map::const_iterator it; - for (it = servers.begin(); it != servers.end(); ++it) { - _servers.insert(std::make_pair((*it).first, (*it).second)); - } + std::map::const_iterator it = result._values.begin(); - _serversValid = true; - - return; + while (it != result._values.end()) { + const std::string server = triagens::basics::JsonHelper::getStringValue((*it).second._json, ""); + + _servers.insert(std::make_pair((*it).first, server)); + ++it; } + + _serversValid = true; + + return; } LOG_TRACE("Error while loading %s", prefix.c_str()); @@ -766,23 +779,19 @@ void ClusterInfo::loadCurrentDBServers () { } if (result.successful()) { - std::map servers; + result.parse(prefix + "/", false); - if (result.flattenJson(servers, prefix + "/", false)) { - LOG_TRACE("%s loaded successfully", prefix.c_str()); - - WRITE_LOCKER(_lock); - _DBServers.clear(); + WRITE_LOCKER(_lock); + _DBServers.clear(); - std::map::const_iterator it; - for (it = servers.begin(); it != servers.end(); ++it) { - _DBServers.insert(std::make_pair - ((*it).first, (*it).second)); - } + std::map::const_iterator it = result._values.begin(); - _DBServersValid = true; - return; + for (; it != result._values.end(); ++it) { + _DBServers.insert(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, ""))); } + + _DBServersValid = true; + return; } LOG_TRACE("Error while loading %s", prefix.c_str()); @@ -830,17 +839,13 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) { } if (result.successful()) { - std::map out; - - if (! result.flattenJson(out, prefix, false)) { - LOG_FATAL_AND_EXIT("Got an invalid JSON response for %s", prefix.c_str()); - } + result.parse(prefix, false); // check if we can find ourselves in the list returned by the agency - std::map::const_iterator it = out.find(serverID); + std::map::const_iterator it = result._values.find(serverID); - if (it != out.end()) { - return (*it).second; + if (it != result._values.end()) { + return triagens::basics::JsonHelper::getStringValue((*it).second._json, ""); } } diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 2fa93c5a28..c4054e572b 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -70,6 +70,8 @@ namespace triagens { CollectionInfo (); CollectionInfo (std::string const&); + + CollectionInfo (struct TRI_json_s*); CollectionInfo (CollectionInfo const&); diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index e79239cef5..d9e98066ff 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -27,6 +27,7 @@ #include "HeartbeatThread.h" #include "Basics/ConditionLocker.h" +#include "Basics/JsonHelper.h" #include "BasicsC/logging.h" #include "Cluster/ServerState.h" @@ -113,18 +114,16 @@ void HeartbeatThread::run () { AgencyCommResult result = _agency.getValues("Plan/Version", false); if (result.successful()) { - std::map out; - - if (result.flattenJson(out, "", false)) { - std::map::const_iterator it = out.begin(); + result.parse("", false); - if (it != out.end()) { - // there is a plan version - uint64_t planVersion = triagens::basics::StringUtils::uint64((*it).second); + std::map::iterator it = result._values.begin(); - if (planVersion > lastPlanVersion) { - handlePlanChange(planVersion, lastPlanVersion); - } + if (it != result._values.end()) { + // there is a plan version + uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json); + + if (planVersion > lastPlanVersion) { + handlePlanChange(planVersion, lastPlanVersion); } } } @@ -230,17 +229,14 @@ uint64_t HeartbeatThread::getLastCommandIndex () { AgencyCommResult result = _agency.getValues("Sync/Commands/" + _myId, false); if (result.successful()) { - std::map out; + result.parse("Sync/Commands/", false); + + std::map::iterator it = result._values.find(_myId); - if (result.flattenJson(out, "Sync/Commands/", true)) { - // check if we can find ourselves in the list returned by the agency - std::map::const_iterator it = out.find(_myId); - - if (it != out.end()) { - // found something - LOG_TRACE("last command index was: '%s'", (*it).second.c_str()); - return triagens::basics::StringUtils::uint64((*it).second); - } + if (it != result._values.end()) { + // found something + LOG_TRACE("last command index was: '%llu'", (unsigned long long) (*it).second._index); + return (*it).second._index; } } @@ -277,35 +273,22 @@ bool HeartbeatThread::handlePlanChange (uint64_t currentPlanVersion, /// notified about this particular change again). //////////////////////////////////////////////////////////////////////////////// -bool HeartbeatThread::handleStateChange (AgencyCommResult const& result, +bool HeartbeatThread::handleStateChange (AgencyCommResult& result, uint64_t& lastCommandIndex) { - std::map out; + result.parse("Sync/Commands/", false); - if (result.flattenJson(out, "Sync/Commands/", true)) { - // get the new value of "modifiedIndex" - std::map::const_iterator it = out.find(_myId); + std::map::const_iterator it = result._values.find(_myId); - if (it != out.end()) { - lastCommandIndex = triagens::basics::StringUtils::uint64((*it).second); - } - } + if (it != result._values.end()) { + lastCommandIndex = (*it).second._index; - out.clear(); - - if (result.flattenJson(out, "Sync/Commands/", false)) { - // get the new value! - std::map::const_iterator it = out.find(_myId); - - if (it != out.end()) { - const std::string command = (*it).second; - - ServerState::StateEnum newState = ServerState::stringToState(command); - - if (newState != ServerState::STATE_UNDEFINED) { - // state change. - ServerState::instance()->setState(newState); - return true; - } + const std::string command = triagens::basics::JsonHelper::getStringValue((*it).second._json, ""); + ServerState::StateEnum newState = ServerState::stringToState(command); + + if (newState != ServerState::STATE_UNDEFINED) { + // state change. + ServerState::instance()->setState(newState); + return true; } } diff --git a/arangod/Cluster/HeartbeatThread.h b/arangod/Cluster/HeartbeatThread.h index b032b47c7e..14f4855812 100644 --- a/arangod/Cluster/HeartbeatThread.h +++ b/arangod/Cluster/HeartbeatThread.h @@ -131,7 +131,7 @@ namespace triagens { /// @brief handles a state change //////////////////////////////////////////////////////////////////////////////// - bool handleStateChange (AgencyCommResult const&, + bool handleStateChange (AgencyCommResult&, uint64_t&); //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 47de046844..b95e5b8856 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -26,6 +26,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "ServerState.h" +#include "Basics/JsonHelper.h" #include "Basics/ReadLocker.h" #include "Basics/WriteLocker.h" #include "BasicsC/logging.h" @@ -502,18 +503,17 @@ ServerState::RoleEnum ServerState::checkCoordinatorsList (std::string const& id) return ServerState::ROLE_UNDEFINED; } - - std::map out; - if (! result.flattenJson(out, "Plan/Coordinators/", false)) { + + if (! result.parse("Plan/Coordinators/", false)) { LOG_TRACE("Got an invalid JSON response for Plan/Coordinators"); return ServerState::ROLE_UNDEFINED; } // check if we can find ourselves in the list returned by the agency - std::map::const_iterator it = out.find(id); + std::map::const_iterator it = result._values.find(id); - if (it != out.end()) { + if (it != result._values.end()) { // we are in the list. this means we are a primary server return ServerState::ROLE_COORDINATOR; } @@ -554,30 +554,25 @@ ServerState::RoleEnum ServerState::checkServersList (std::string const& id) { return ServerState::ROLE_UNDEFINED; } - - std::map out; - if (! result.flattenJson(out, "Plan/DBServers/", false)) { - LOG_TRACE("Got an invalid JSON response for Plan/DBServers"); - - return ServerState::ROLE_UNDEFINED; - } ServerState::RoleEnum role = ServerState::ROLE_UNDEFINED; // check if we can find ourselves in the list returned by the agency - std::map::const_iterator it = out.find(id); + result.parse("Plan/DBServers/", false); + std::map::const_iterator it = result._values.find(id); - if (it != out.end()) { + if (it != result._values.end()) { // we are in the list. this means we are a primary server role = ServerState::ROLE_PRIMARY; } else { // check if we are a secondary... - it = out.begin(); + it = result._values.begin(); - while (it != out.end()) { - const std::string value = (*it).second; - if (value == id) { + while (it != result._values.end()) { + const std::string name = triagens::basics::JsonHelper::getStringValue((*it).second._json, ""); + + if (name == id) { role = ServerState::ROLE_SECONDARY; break; } diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 0854d24931..e5b3d34f25 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -85,8 +85,18 @@ static v8::Handle JS_CasAgency (v8::Arguments const& argv) { } const std::string key = TRI_ObjectToString(argv[0]); - const std::string oldValue = TRI_ObjectToString(argv[1]); - const std::string newValue = TRI_ObjectToString(argv[2]); + + TRI_json_t* oldJson = TRI_ObjectToJson(argv[1]); + + if (oldJson == 0) { + TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert to JSON"); + } + + TRI_json_t* newJson = TRI_ObjectToJson(argv[2]); + if (newJson == 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); + TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert to JSON"); + } double ttl = 0.0; if (argv.Length() > 3) { @@ -104,7 +114,10 @@ static v8::Handle JS_CasAgency (v8::Arguments const& argv) { } AgencyComm comm; - AgencyCommResult result = comm.casValue(key, oldValue, newValue, ttl, timeout); + AgencyCommResult result = comm.casValue(key, oldJson, newJson, ttl, timeout); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson); if (! result.successful()) { if (! shouldThrow) { @@ -184,50 +197,43 @@ static v8::Handle JS_GetAgency (v8::Arguments const& argv) { if (! result.successful()) { return scope.Close(v8::ThrowException(CreateAgencyException(result))); } + + result.parse("", false); v8::Handle l = v8::Object::New(); if (withIndexes) { - // return an object for each key - std::map outValues; - std::map outIndexes; - - result.flattenJson(outValues, "", false); - result.flattenJson(outIndexes, "", true); + std::map::const_iterator it = result._values.begin(); - assert(outValues.size() == outIndexes.size()); - - std::map::const_iterator it = outValues.begin(); - std::map::const_iterator it2 = outIndexes.begin(); - - while (it != outValues.end()) { + while (it != result._values.end()) { const std::string key = (*it).first; - const std::string value = (*it).second; - const std::string idx = (*it2).second; + TRI_json_t const* json = (*it).second._json; + const std::string idx = StringUtils::itoa((*it).second._index); - v8::Handle sub = v8::Object::New(); + if (json != 0) { + v8::Handle sub = v8::Object::New(); - sub->Set(v8::String::New("value"), v8::String::New(value.c_str(), value.size())); - sub->Set(v8::String::New("index"), v8::String::New(idx.c_str(), idx.size())); + sub->Set(v8::String::New("value"), TRI_ObjectJson(json)); + sub->Set(v8::String::New("index"), v8::String::New(idx.c_str(), idx.size())); - l->Set(v8::String::New(key.c_str(), key.size()), sub); + l->Set(v8::String::New(key.c_str(), key.size()), sub); + } ++it; - ++it2; } } else { // return just the value for each key - std::map out; - - result.flattenJson(out, "", false); - std::map::const_iterator it = out.begin(); + std::map::const_iterator it = result._values.begin(); - while (it != out.end()) { + while (it != result._values.end()) { const std::string key = (*it).first; - const std::string value = (*it).second; + TRI_json_t const* json = (*it).second._json; + + if (json != 0) { + l->Set(v8::String::New(key.c_str(), key.size()), TRI_ObjectJson(json)); + } - l->Set(v8::String::New(key.c_str(), key.size()), v8::String::New(value.c_str(), value.size())); ++it; } } @@ -266,18 +272,19 @@ static v8::Handle JS_ListAgency (v8::Arguments const& argv) { } // return just the value for each key - std::map out; - result.flattenJson(out, ""); - std::map::const_iterator it = out.begin(); + result.parse("", true); + std::map::const_iterator it = result._values.begin(); // skip first entry - ++it; + if (it != result._values.end()) { + ++it; + } if (flat) { v8::Handle l = v8::Array::New(); uint32_t i = 0; - while (it != out.end()) { + while (it != result._values.end()) { const std::string key = (*it).first; l->Set(i++, v8::String::New(key.c_str(), key.size())); @@ -289,9 +296,9 @@ static v8::Handle JS_ListAgency (v8::Arguments const& argv) { else { v8::Handle l = v8::Object::New(); - while (it != out.end()) { + while (it != result._values.end()) { const std::string key = (*it).first; - const bool isDirectory = (*it).second; + const bool isDirectory = (*it).second._isDir; l->Set(v8::String::New(key.c_str(), key.size()), v8::Boolean::New(isDirectory)); ++it; @@ -454,7 +461,12 @@ static v8::Handle JS_SetAgency (v8::Arguments const& argv) { } const std::string key = TRI_ObjectToString(argv[0]); - const std::string value = TRI_ObjectToString(argv[1]); + + TRI_json_t* json = TRI_ObjectToJson(argv[1]); + + if (json == 0) { + TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert to JSON"); + } double ttl = 0.0; if (argv.Length() > 2) { @@ -462,7 +474,9 @@ static v8::Handle JS_SetAgency (v8::Arguments const& argv) { } AgencyComm comm; - AgencyCommResult result = comm.setValue(key, value, ttl); + AgencyCommResult result = comm.setValue(key, json, ttl); + + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); if (! result.successful()) { return scope.Close(v8::ThrowException(CreateAgencyException(result))); @@ -509,17 +523,19 @@ static v8::Handle JS_WatchAgency (v8::Arguments const& argv) { return scope.Close(v8::ThrowException(CreateAgencyException(result))); } - std::map out; - result.flattenJson(out, "", false); - std::map::const_iterator it = out.begin(); + result.parse("", false); + std::map::const_iterator it = result._values.begin(); v8::Handle l = v8::Object::New(); - while (it != out.end()) { + while (it != result._values.end()) { const std::string key = (*it).first; - const std::string value = (*it).second; + TRI_json_t* json = (*it).second._json; + + if (json != 0) { + l->Set(v8::String::New(key.c_str(), key.size()), TRI_ObjectJson(json)); + } - l->Set(v8::String::New(key.c_str(), key.size()), v8::String::New(value.c_str(), value.size())); ++it; } diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index 77bbdaec0f..159853871d 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1914,7 +1914,7 @@ static v8::Handle CreateCollectionCoordinator (v8::Arguments const& a TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME); } - AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, JsonHelper::toString(json), 0.0); + AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, json, 0.0); TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); } } diff --git a/init-cluster.sh b/init-cluster.sh index 1bed98c720..bba83d95a7 100755 --- a/init-cluster.sh +++ b/init-cluster.sh @@ -5,12 +5,12 @@ NAME="meier" ETCD="http://127.0.0.1:4001" echo "initialising cluster $NAME" bin/arangom -a "$ETCD" -p "/$NAME/" init -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=none" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=none" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=none" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d "value=none" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=none" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=none" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=\"none\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=\"none\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=\"none\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d "value=\"none\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=\"none\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=\"none\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pavel" -d "value=tcp://127.0.0.1:8530" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=tcp://127.0.0.1:8531" || exit 1 diff --git a/lib/Basics/JsonHelper.cpp b/lib/Basics/JsonHelper.cpp index 7c51cf28c8..d104eb45b5 100644 --- a/lib/Basics/JsonHelper.cpp +++ b/lib/Basics/JsonHelper.cpp @@ -27,6 +27,7 @@ #include "Basics/JsonHelper.h" +#include "BasicsC/conversions.h" #include "BasicsC/string-buffer.h" using namespace triagens::basics; @@ -39,6 +40,52 @@ using namespace triagens::basics; // --SECTION-- public static methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief convert a uint64 into a JSON string +//////////////////////////////////////////////////////////////////////////////// + +TRI_json_t* JsonHelper::uint64String (TRI_memory_zone_t* zone, + uint64_t value) { + char buffer[21]; + size_t len; + + len = TRI_StringUInt64InPlace(value, (char*) &buffer); + + return TRI_CreateString2CopyJson(zone, buffer, len); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief convert a uint64 into a JSON string +//////////////////////////////////////////////////////////////////////////////// + +uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json) { + if (json != 0) { + if (json->_type == TRI_JSON_STRING) { + return TRI_UInt64String2(json->_value._string.data, json->_value._string.length - 1); + } + else if (json->_type == TRI_JSON_NUMBER) { + return (uint64_t) json->_value._number; + } + } + + return 0; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief convert a uint64 into a JSON string +//////////////////////////////////////////////////////////////////////////////// + +uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json, + char const* name) { + + if (json == 0) { + return 0; + } + + TRI_json_t const* element = TRI_LookupArrayJson(json, name); + return stringUInt64(element); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a JSON key/value object from a list of strings //////////////////////////////////////////////////////////////////////////////// @@ -139,6 +186,17 @@ TRI_json_t* JsonHelper::fromString (std::string const& data) { return json; } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create JSON from string +//////////////////////////////////////////////////////////////////////////////// + +TRI_json_t* JsonHelper::fromString (char const* data, + size_t length) { + TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data); + + return json; +} //////////////////////////////////////////////////////////////////////////////// /// @brief stringify json diff --git a/lib/Basics/JsonHelper.h b/lib/Basics/JsonHelper.h index fb7a1ee03d..e96dc96197 100644 --- a/lib/Basics/JsonHelper.h +++ b/lib/Basics/JsonHelper.h @@ -55,6 +55,26 @@ namespace triagens { public: +//////////////////////////////////////////////////////////////////////////////// +/// @brief convert a uint64 into a JSON string +//////////////////////////////////////////////////////////////////////////////// + + static TRI_json_t* uint64String (TRI_memory_zone_t*, + uint64_t); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief convert a uint64 into a JSON string +//////////////////////////////////////////////////////////////////////////////// + + static uint64_t stringUInt64 (TRI_json_t const*); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief convert a uint64 into a JSON string +//////////////////////////////////////////////////////////////////////////////// + + static uint64_t stringUInt64 (TRI_json_t const*, + char const*); + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a JSON object from a key/value object of strings //////////////////////////////////////////////////////////////////////////////// @@ -87,6 +107,13 @@ namespace triagens { static TRI_json_t* fromString (std::string const&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief create JSON from string +//////////////////////////////////////////////////////////////////////////////// + + static TRI_json_t* fromString (char const*, + size_t); + //////////////////////////////////////////////////////////////////////////////// /// @brief stringify json //////////////////////////////////////////////////////////////////////////////// diff --git a/utils/arangom.in b/utils/arangom.in index cbe5861c33..632d425604 100755 --- a/utils/arangom.in +++ b/utils/arangom.in @@ -43,21 +43,21 @@ if [ "$1" == "init" ] ; then set Target/MapIDToEndpoint set Target/Version 1 - set Target/Lock UNLOCKED + set Target/Lock "\"UNLOCKED\"" set Target/DBServers set Target/Coordinators set Target/Databases/@Usystem "{}" set Target/Collections/@Usystem set Plan/Version 1 - set Plan/Lock UNLOCKED + set Plan/Lock "\"UNLOCKED\"" set Plan/DBServers set Plan/Coordinators set Plan/Databases/@Usystem "{}" set Plan/Collections/@Usystem set Current/Version 1 - set Current/Lock UNLOCKED + set Current/Lock "\"UNLOCKED\"" set Current/DBServers set Current/Coordinators set Current/Databases/@Usystem @@ -68,7 +68,7 @@ if [ "$1" == "init" ] ; then set Sync/ServerStates set Sync/Problems - set Sync/ClusterManager none + set Sync/ClusterManager "\"none\"" set Sync/LatestID 0 set Sync/Commands set Sync/HeartbeatIntervalMs 1000 From 85967c3786b66de9959fa52fb3b9cd8dcc40a213 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 13:38:13 +0100 Subject: [PATCH 10/13] fixed invalid JSON value --- init-cluster.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/init-cluster.sh b/init-cluster.sh index bba83d95a7..0902b2d50b 100755 --- a/init-cluster.sh +++ b/init-cluster.sh @@ -12,9 +12,9 @@ curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=\"none\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=\"none\"" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pavel" -d "value=tcp://127.0.0.1:8530" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=tcp://127.0.0.1:8531" || exit 1 -curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=tcp://127.0.0.1:8529" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pavel" -d "value=\"tcp://127.0.0.1:8530\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=\"tcp://127.0.0.1:8531\"" || exit 1 +curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1 From 97476ccf088285a20307c415dffb821519c103ef Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 15 Jan 2014 13:59:41 +0100 Subject: [PATCH 11/13] Create Collection completed. Use new AgencyComm. --- arangod/V8Server/v8-vocbase.cpp | 38 +++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index f70115f7e7..b728315a9e 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -1937,10 +1937,18 @@ static v8::Handle CreateCollectionCoordinator ( res = agency.getValues("Current/Collections/" + databaseName + "/" + cid, true); if (res.successful()) { - // FIXME - // Now extract JSON, look into length of "shards" entry, if equal to - // numberOfShards then we are done. - return scope.Close(v8::True()); + res.parse("", false); + map::iterator it = res._values.begin(); + if (it != res._values.end()) { + TRI_json_t const* json = (*it).second._json; + TRI_json_t const* shards = TRI_LookupArrayJson(json, "shards"); + if (TRI_IsArrayJson(shards)) { + size_t len = shards->_value._objects._length / 2; + if (len == numberOfShards) { + return scope.Close(v8::True()); + } + } + } } res = agency.watchValue("Current/Version", index, 1.0, false); if (!res.successful()) { @@ -8397,13 +8405,11 @@ static v8::Handle JS_CreateDatabase_Coordinator (v8::Arguments const& } uint64_t index = res._index; while (true) { - map done; res = ac.getValues("Current/Databases/"+name, true); if (res.successful()) { - if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { - if (done.size() >= DBServers.size()) { - return scope.Close(v8::True()); - } + res.parse("Current/Databases/"+name+"/", false); + if (res._values.size() >= DBServers.size()) { + return scope.Close(v8::True()); } } res = ac.watchValue("Current/Version", index, 1.0, false); @@ -8596,20 +8602,24 @@ static v8::Handle JS_DropDatabase_Coordinator (v8::Arguments const& a res = ac.getValues("Current/Version", false); if (!res.successful()) { - TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); } uint64_t index = res._index; while (true) { map done; res = ac.getValues("Current/Databases/"+name, true); if (res.successful()) { - if (res.flattenJson(done, "Current/Databases/"+name+"/",false)) { - if (done.size() > 0) { - return scope.Close(v8::True()); - } + res.parse("Current/Databases/"+name+"/", false); + if (res._values.size() == 0) { + return scope.Close(v8::True()); } } res = ac.watchValue("Current/Version", index, 1.0, false); + if (!res.successful()) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, + "could not read version of current in agency"); + } index = res._index; } } From fcb545d268e617ca2a5433346e4b4feb07af6a04 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 14:00:42 +0100 Subject: [PATCH 12/13] removed unused methods --- arangod/Cluster/AgencyComm.cpp | 238 ++------------------------------- arangod/Cluster/AgencyComm.h | 32 ----- js/server/tests/cluster.js | 10 +- 3 files changed, 18 insertions(+), 262 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 98324e35d9..48df9177de 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -243,231 +243,6 @@ std::string AgencyCommResult::errorDetails () const { return _message + " (" + errorMessage + ")"; } -/* -//////////////////////////////////////////////////////////////////////////////// -/// @brief recursively flatten the JSON response into a map -/// -/// stripKeyPrefix is decoded, as is the _globalPrefix -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyCommResult::processJsonNode (TRI_json_t const* node, - std::map& out, - std::string const& stripKeyPrefix) const { - if (! TRI_IsArrayJson(node)) { - return true; - } - - // get "key" attribute - TRI_json_t const* key = TRI_LookupArrayJson(node, "key"); - - if (! TRI_IsStringJson(key)) { - return false; - } - - std::string keydecoded - = AgencyComm::decodeKey(string(key->_value._string.data, - key->_value._string.length-1)); - - // make sure we don't strip more bytes than the key is long - const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size(); - const size_t length = keydecoded.size(); - - std::string prefix; - if (offset >= length) { - prefix = ""; - } - else { - prefix = keydecoded.substr(offset); - } - - // get "dir" attribute - TRI_json_t const* dir = TRI_LookupArrayJson(node, "dir"); - bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean); - - if (isDir) { - out.insert(std::make_pair(prefix, true)); - - // is a directory, so there may be a "nodes" attribute - TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes"); - - if (! TRI_IsListJson(nodes)) { - // if directory is empty... - return true; - } - - const size_t n = TRI_LengthVector(&nodes->_value._objects); - - for (size_t i = 0; i < n; ++i) { - if (! processJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i), - out, - stripKeyPrefix)) { - return false; - } - } - } - else { - // not a directory - - // get "value" attribute - TRI_json_t const* value = TRI_LookupArrayJson(node, "value"); - - if (TRI_IsStringJson(value)) { - if (! prefix.empty()) { - // otherwise return value - out.insert(std::make_pair(prefix, false)); - } - } - } - - return true; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief recursively flatten the JSON response into a map -/// -/// stripKeyPrefix is decoded, as is the _globalPrefix -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyCommResult::processJsonNode (TRI_json_t const* node, - std::map& out, - std::string const& stripKeyPrefix, - bool returnIndex) const { - if (! TRI_IsArrayJson(node)) { - return true; - } - - // get "key" attribute - TRI_json_t const* key = TRI_LookupArrayJson(node, "key"); - - if (! TRI_IsStringJson(key)) { - return false; - } - - std::string keydecoded - = AgencyComm::decodeKey(string(key->_value._string.data, - key->_value._string.length-1)); - - // make sure we don't strip more bytes than the key is long - const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size(); - const size_t length = keydecoded.size(); - - std::string prefix; - if (offset >= length) { - prefix = ""; - } - else { - prefix = keydecoded.substr(offset); - } - - // get "dir" attribute - TRI_json_t const* dir = TRI_LookupArrayJson(node, "dir"); - bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean); - - if (isDir) { - // is a directory, so there may be a "nodes" attribute - TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes"); - - if (! TRI_IsListJson(nodes)) { - // if directory is empty... - return true; - } - - const size_t n = TRI_LengthVector(&nodes->_value._objects); - - for (size_t i = 0; i < n; ++i) { - if (! processJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i), - out, - stripKeyPrefix, - returnIndex)) { - return false; - } - } - } - else { - // not a directory - - // get "value" attribute - TRI_json_t const* value = TRI_LookupArrayJson(node, "value"); - - if (TRI_IsStringJson(value)) { - if (! prefix.empty()) { - if (returnIndex) { - // return "modifiedIndex" - TRI_json_t const* modifiedIndex = TRI_LookupArrayJson(node, "modifiedIndex"); - - if (! TRI_IsNumberJson(modifiedIndex)) { - return false; - } - // convert the number to an integer - out.insert(std::make_pair(prefix, - triagens::basics::StringUtils::itoa((uint64_t) modifiedIndex->_value._number))); - } - else { - // otherwise return value - out.insert(std::make_pair(prefix, - std::string(value->_value._string.data, value->_value._string.length - 1))); - } - - } - } - } - - return true; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief turn a result into a map -/// -/// note that stripKeyPrefix is a decoded, normal key! -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyCommResult::flattenJson (std::map& out, - std::string const& stripKeyPrefix) const { - TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); - - if (! TRI_IsArrayJson(json)) { - if (json != 0) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - } - return false; - } - - // get "node" attribute - TRI_json_t const* node = TRI_LookupArrayJson(json, "node"); - - const bool result = processJsonNode(node, out, stripKeyPrefix); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - - return result; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief turn a result into a map -/// -/// note that stripKeyPrefix is a decoded, normal key! -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyCommResult::flattenJson (std::map& out, - std::string const& stripKeyPrefix, - bool returnIndex) const { - TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); - - if (! TRI_IsArrayJson(json)) { - if (json != 0) { - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - } - return false; - } - - // get "node" attribute - TRI_json_t const* node = TRI_LookupArrayJson(json, "node"); - - const bool result = processJsonNode(node, out, stripKeyPrefix, returnIndex); - TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); - - return result; -} -*/ //////////////////////////////////////////////////////////////////////////////// /// @brief recursively flatten the JSON response into a map /// @@ -1464,6 +1239,19 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, while (tries++ < maxTries) { result = getValues(key, false); + + if (result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) { + TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "0", 1); + + if (json != 0) { + // create the key on the fly + setValue(key, json, 0.0); + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + tries--; + + continue; + } + } if (! result.successful()) { return result; diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index cb607c6be9..6ad7bc98ed 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -204,39 +204,7 @@ namespace triagens { const std::string body () const { return _body; } -/* -//////////////////////////////////////////////////////////////////////////////// -/// @brief recursively flatten the JSON response into a map -//////////////////////////////////////////////////////////////////////////////// - bool processJsonNode (TRI_json_t const*, - std::map&, - std::string const&) const; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief recursively flatten the JSON response into a map -//////////////////////////////////////////////////////////////////////////////// - - bool processJsonNode (TRI_json_t const*, - std::map&, - std::string const&, - bool) const; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief turn a result into a map -//////////////////////////////////////////////////////////////////////////////// - - bool flattenJson (std::map&, - std::string const&) const; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief turn a result into a map -//////////////////////////////////////////////////////////////////////////////// - - bool flattenJson (std::map&, - std::string const&, - bool) const; -*/ //////////////////////////////////////////////////////////////////////////////// /// @brief recursively flatten the JSON response into a map /// diff --git a/js/server/tests/cluster.js b/js/server/tests/cluster.js index 85cb04b733..f67a17abf7 100644 --- a/js/server/tests/cluster.js +++ b/js/server/tests/cluster.js @@ -232,7 +232,7 @@ function ClusterEnabledSuite () { shardKeys: [ "_key" ], shards: { "s1" : "myself", "s2" : "other" } }; - assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); + assertTrue(agency.set("Current/Collections/test/" + collection.id, collection)); assertTrue(ci.doesDatabaseExist("test")); assertFalse(ci.doesDatabaseExist("UnitTestsAgencyNonExisting")); @@ -251,7 +251,7 @@ function ClusterEnabledSuite () { shardKeys: [ "_key" ], shards: { "s1" : "myself", "s2" : "other" } }; - assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); + assertTrue(agency.set("Current/Collections/test/" + collection.id, collection)); var data = ci.getCollectionInfo("test", collection.id); @@ -277,7 +277,7 @@ function ClusterEnabledSuite () { shards: { "s1" : "myself", "s2" : "other", "s3" : "foo", "s4" : "bar" } }; - assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); + assertTrue(agency.set("Current/Collections/test/" + collection.id, collection)); var data = ci.getCollectionInfo("test", collection.id); @@ -303,14 +303,14 @@ function ClusterEnabledSuite () { shards: { "s1" : "myself" } }; - assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); + assertTrue(agency.set("Current/Collections/test/" + collection.id, collection)); ci.flush(); assertEqual("myself", ci.getResponsibleServer("s1")); assertEqual("", ci.getResponsibleServer("s9999")); collection.shards = { s1: "other", s2: "myself" }; - assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); + assertTrue(agency.set("Current/Collections/test/" + collection.id, collection)); ci.flush(); assertEqual("other", ci.getResponsibleServer("s1")); From 85c75926e3c91ae0e0318d71811a1405a9744562 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 15 Jan 2014 14:14:57 +0100 Subject: [PATCH 13/13] moved script --- utils/arangom.in => arangom | 0 init-cluster.sh | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename utils/arangom.in => arangom (100%) diff --git a/utils/arangom.in b/arangom similarity index 100% rename from utils/arangom.in rename to arangom diff --git a/init-cluster.sh b/init-cluster.sh index 0902b2d50b..a74617eb1b 100755 --- a/init-cluster.sh +++ b/init-cluster.sh @@ -4,7 +4,7 @@ mkdir -p data-pavel data-perry data-claus NAME="meier" ETCD="http://127.0.0.1:4001" echo "initialising cluster $NAME" -bin/arangom -a "$ETCD" -p "/$NAME/" init +./arangom -a "$ETCD" -p "/$NAME/" init curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=\"none\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=\"none\"" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=\"none\"" || exit 1