diff --git a/arangod/Cluster/ClusterInfoCurrent.cpp b/arangod/Cluster/ClusterInfoCurrent.cpp new file mode 100644 index 0000000000..6c5d34475b --- /dev/null +++ b/arangod/Cluster/ClusterInfoCurrent.cpp @@ -0,0 +1,1183 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief Class to get and cache information about the cluster state +/// +/// @file ClusterInfo.cpp +/// +/// DISCLAIMER +/// +/// Copyright 2010-2013 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Max Neunhoeffer +/// @author Jan Steemann +/// @author Copyright 2013, triagens GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#include "Cluster/ClusterInfo.h" + +#include "BasicsC/conversions.h" +#include "BasicsC/json.h" +#include "BasicsC/logging.h" +#include "Basics/JsonHelper.h" +#include "Basics/ReadLocker.h" +#include "Basics/WriteLocker.h" +#include "Basics/StringUtils.h" + +using namespace triagens::arango; +using triagens::basics::JsonHelper; + +// ----------------------------------------------------------------------------- +// --SECTION-- CollectionInfo class +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors / destructors +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates an empty collection info object +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo::CollectionInfo () { +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from json +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo::CollectionInfo (ShardID const& shardID, TRI_json_t* json) { + _jsons.insert(make_pair(shardID, json)); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from another +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo::CollectionInfo (CollectionInfo const& other) : + _jsons(other._jsons) { + copyAllJsons(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from json +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo& CollectionInfo::operator= (CollectionInfo const& other) { + if (this == &other) { + return *this; + } + freeAllJsons(); + _jsons = other._jsons; + copyAllJsons(); + return *this; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroys a collection info object +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo::~CollectionInfo () { + freeAllJsons(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief free all pointers to TRI_json_t in the map _jsons +//////////////////////////////////////////////////////////////////////////////// + +void CollectionInfo::freeAllJsons () { + map::iterator it; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + if (it->second != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, it->second); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief copy TRI_json_t behind the pointers in the map _jsons +//////////////////////////////////////////////////////////////////////////////// + +void CollectionInfo::copyAllJsons () { + map::iterator it; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + if (0 != it->second) { + it->second = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second); + } + } +} + +// ----------------------------------------------------------------------------- +// --SECTION-- private methods +// ----------------------------------------------------------------------------- + +ClusterInfo* ClusterInfo::_theinstance = 0; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns an instance of the cluster info class +//////////////////////////////////////////////////////////////////////////////// + +ClusterInfo* ClusterInfo::instance () { + // This does not have to be thread-safe, because we guarantee that + // this is called very early in the startup phase when there is still + // a single thread. + if (0 == _theinstance) { + _theinstance = new ClusterInfo(); // this now happens exactly once + } + return _theinstance; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a cluster info object +//////////////////////////////////////////////////////////////////////////////// + +ClusterInfo::ClusterInfo () + : _agency(), + _uniqid(), + _plannedDatabases(), + _currentDatabases(), + _collectionsValid(false), + _serversValid(false), + _DBServersValid(false) { + _uniqid._currentValue = _uniqid._upperValue = 0ULL; + + // Actual loading into caches is postponed until necessary +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroys a cluster info object +//////////////////////////////////////////////////////////////////////////////// + +ClusterInfo::~ClusterInfo () { + clearPlannedDatabases(); + clearCurrentDatabases(); +} + +// ----------------------------------------------------------------------------- +// --SECTION-- public methods +// ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask whether a cluster database exists +//////////////////////////////////////////////////////////////////////////////// + +uint64_t ClusterInfo::uniqid (uint64_t count) { + WRITE_LOCKER(_lock); + + if (_uniqid._currentValue >= _uniqid._upperValue) { + uint64_t fetch = count; + if (fetch < MinIdsPerBatch) { + fetch = MinIdsPerBatch; + } + + AgencyCommResult result = _agency.uniqid("Sync/LatestID", fetch, 0.0); + + if (! result.successful() || result._index == 0) { + return 0; + } + + _uniqid._currentValue = result._index; + _uniqid._upperValue = _uniqid._currentValue + fetch - 1; + + return _uniqid._currentValue++; + } + + return ++_uniqid._currentValue; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief flush the caches (used for testing) +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::flush () { + WRITE_LOCKER(_lock); + + _collectionsValid = false; + _serversValid = false; + _DBServersValid = false; + + _collections.clear(); + _servers.clear(); + _shardIds.clear(); + + clearPlannedDatabases(); + clearCurrentDatabases(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask whether a cluster database exists +//////////////////////////////////////////////////////////////////////////////// + +bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID, + bool reload) { + int tries = 0; + + if (reload) { + loadPlannedDatabases(); + loadCurrentDatabases(); + loadCurrentDBServers(); + ++tries; + } + + while (++tries <= 2) { + { + READ_LOCKER(_lock); + const size_t expectedSize = _DBServers.size(); + + // look up database by name + + std::map::const_iterator it = _plannedDatabases.find(databaseID); + + if (it != _plannedDatabases.end()) { + // found the database in Plan + std::map >::const_iterator it2 = _currentDatabases.find(databaseID); + + if (it2 != _currentDatabases.end()) { + // found the database in Current + + return ((*it2).second.size() >= expectedSize); + } + } + } + + loadPlannedDatabases(); + loadCurrentDatabases(); + loadCurrentDBServers(); + } + + return false; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get list of databases in the cluster +//////////////////////////////////////////////////////////////////////////////// + +vector ClusterInfo::listDatabases (bool reload) { + vector result; + + if (reload) { + loadPlannedDatabases(); + loadCurrentDatabases(); + loadCurrentDBServers(); + } + + READ_LOCKER(_lock); + const size_t expectedSize = _DBServers.size(); + + std::map::const_iterator it = _plannedDatabases.begin(); + + while (it != _plannedDatabases.end()) { + std::map >::const_iterator it2 = _currentDatabases.find((*it).first); + + if (it2 != _currentDatabases.end()) { + if ((*it2).second.size() >= expectedSize) { + result.push_back((*it).first); + } + } + + ++it; + } + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief flushes the list of planned databases +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::clearPlannedDatabases () { + std::map::iterator it = _plannedDatabases.begin(); + + while (it != _plannedDatabases.end()) { + TRI_json_t* json = (*it).second; + + if (json != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + } + ++it; + } + + _plannedDatabases.clear(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief flushes the list of current databases +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::clearCurrentDatabases () { + std::map >::iterator it = _currentDatabases.begin(); + + while (it != _currentDatabases.end()) { + std::map::iterator it2 = (*it).second.begin(); + + while (it2 != (*it).second.end()) { + TRI_json_t* json = (*it2).second; + + if (json != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); + } + + ++it2; + } + ++it; + } + + _currentDatabases.clear(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about planned databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadPlannedDatabases () { + static const std::string prefix = "Plan/Databases"; + + AgencyCommResult result; + + { + AgencyCommLocker locker("Plan", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + result.parse(prefix + "/", false); + + WRITE_LOCKER(_lock); + clearPlannedDatabases(); + + std::map::iterator it = result._values.begin(); + + while (it != result._values.end()) { + const std::string& name = (*it).first; + TRI_json_t* options = (*it).second._json; + + // steal the json + (*it).second._json = 0; + _plannedDatabases.insert(std::make_pair(name, options)); + + ++it; + } + + return; + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about current databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadCurrentDatabases () { + static const std::string prefix = "Current/Databases"; + + AgencyCommResult result; + + { + AgencyCommLocker locker("Plan", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + result.parse(prefix + "/", false); + + WRITE_LOCKER(_lock); + clearCurrentDatabases(); + + std::map::iterator it = result._values.begin(); + + while (it != result._values.end()) { + const std::string key = (*it).first; + + // each entry consists of a database id and a collection id, separated by '/' + std::vector parts = triagens::basics::StringUtils::split(key, '/'); + + if (parts.empty()) { + ++it; + continue; + } + const std::string database = parts[0]; + + std::map >::iterator it2 = _currentDatabases.find(database); + + if (it2 == _currentDatabases.end()) { + // insert an empty list for this database + std::map empty; + it2 = _currentDatabases.insert(std::make_pair >(database, empty)).first; + } + + if (parts.size() == 2) { + // got a server name + TRI_json_t* json = (*it).second._json; + // steal the JSON + (*it).second._json = 0; + (*it2).second.insert(std::make_pair(parts[1], json)); + } + + ++it; + } + + return; + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about collections from the agency +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadCurrentCollections () { + static const std::string prefix = "Current/Collections"; + + AgencyCommResult result; + + { + AgencyCommLocker locker("Current", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + result.parse(prefix + "/", false); + + WRITE_LOCKER(_lock); + _collections.clear(); + _shardIds.clear(); + + std::map::iterator it = result._values.begin(); + + for (; it != result._values.end(); ++it) { + const std::string key = (*it).first; + + // each entry consists of a database id and a collection id, separated by '/' + std::vector parts = triagens::basics::StringUtils::split(key, '/'); + + if (parts.size() != 3) { + // invalid entry + LOG_WARNING("found invalid collection key in agency: '%s'", key.c_str()); + continue; + } + + const std::string database = parts[0]; + const std::string collection = parts[1]; + const ShardID shardID = parts[2]; + + // check whether we have created an entry for the database already + AllCollections::iterator it2 = _collections.find(database); + + if (it2 == _collections.end()) { + // not yet, so create an entry for the database + DatabaseCollections empty; + _collections.insert(std::make_pair(database, empty)); + it2 = _collections.find(database); + } + + DatabaseCollections dbcolls = it2->second; + + TRI_json_t* json = (*it).second._json; + // steal the json + (*it).second._json = 0; + + CollectionInfo* collectionData; + + // check whether we have an entry for this collection already + DatabaseCollections::iterator it4 = dbcolls.find(collection); + if (it4 != dbcolls.end()) { + collectionData = it4->second; + if (!collectionData->add(shardID, json)) { + TRI_FreeJson(json); + } + } + else { + collectionData = new CollectionInfo(shardID, json); + } + + // insert the collection into the existing map + + (*it2).second.insert(std::make_pair(collection, collectionData)); + (*it2).second.insert(std::make_pair(collectionData.name(), collectionData)); + + std::map shards + = collectionData->shardIdsPlanned(); + std::map::const_iterator it3 = shards.begin(); + + while (it3 != shards.end()) { + const std::string shardId = (*it3).first; + const std::string serverId = (*it3).second; + + _shardIds.insert(std::make_pair(shardId, serverId)); + ++it3; + } + + _collectionsValid = true; + return; + } + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); + _collectionsValid = false; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask about a collection +/// If it is not found in the cache, the cache is reloaded once +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID, + CollectionID const& collectionID) { + int tries = 0; + + if (! _collectionsValid) { + loadCurrentCollections(); + ++tries; + } + + while (++tries <= 2) { + { + READ_LOCKER(_lock); + // look up database by id + AllCollections::const_iterator it = _collections.find(databaseID); + + if (it != _collections.end()) { + // look up collection by id + DatabaseCollections::const_iterator it2 = (*it).second.find(collectionID); + + if (it2 != (*it).second.end()) { + return *((*it2).second); + } + } + } + + // must load collections outside the lock + loadCurrentCollections(); + } + + return CollectionInfo(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get properties of a collection +//////////////////////////////////////////////////////////////////////////////// + +TRI_col_info_t ClusterInfo::getCollectionProperties (CollectionInfo const& collection) { + TRI_col_info_t info; + + info._type = collection.type(); + info._cid = collection.id(); + info._revision = 0; // TODO + info._maximalSize = collection.maximalSize(); + + const std::string name = collection.name(); + memcpy(info._name, name.c_str(), name.size()); + info._deleted = collection.deleted(); + info._doCompact = collection.doCompact(); + info._isSystem = collection.isSystem(); + info._isVolatile = collection.isVolatile(); + info._waitForSync = collection.waitForSync(); + info._keyOptions = collection.keyOptions(); + + return info; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get properties of a collection +//////////////////////////////////////////////////////////////////////////////// + +TRI_col_info_t ClusterInfo::getCollectionProperties (DatabaseID const& databaseID, + CollectionID const& collectionID) { + CollectionInfo ci = getCollection(databaseID, collectionID); + + return getCollectionProperties(ci); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask about all collections +//////////////////////////////////////////////////////////////////////////////// + +const std::vector ClusterInfo::getCollections (DatabaseID const& databaseID) { + std::vector result; + + // always reload + loadCurrentCollections(); + + READ_LOCKER(_lock); + // look up database by id + AllCollections::const_iterator it = _collections.find(databaseID); + + if (it == _collections.end()) { + return result; + } + + // iterate over all collections + DatabaseCollections::const_iterator it2 = (*it).second.begin(); + while (it2 != (*it).second.end()) { + char c = (*it2).first[0]; + + if (c < '0' || c > '9') { + // skip collections indexed by id + result.push_back((*it2).second); + } + + ++it2; + } + + return result; +} + +// A local helper to report errors and messages: + +static inline int set_errormsg(int ourerrno, string& errorMsg) { + errorMsg = TRI_errno_string(ourerrno); + return ourerrno; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create database in coordinator, the return value is an ArangoDB +/// error code and the errorMsg is set accordingly. One possible error +/// is a timeout, a timeout of 0.0 means no timeout. +//////////////////////////////////////////////////////////////////////////////// + +int ClusterInfo::createDatabaseCoordinator (string const& name, + TRI_json_t const* json, + string errorMsg, double timeout) { + AgencyComm ac; + AgencyCommResult res; + + { + AgencyCommLocker locker("Plan", "WRITE"); + + if (! locker.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg); + } + + res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0); + if (!res.successful()) { + if (res._statusCode == 412) { + return set_errormsg(TRI_ERROR_CLUSTER_DATABASE_NAME_EXISTS, errorMsg); + } + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN, + errorMsg); + } + } + + // Now wait for it to appear and be complete: + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, + errorMsg); + } + uint64_t index = res._index; + double endtime = TRI_microtime(); + endtime += timeout == 0.0 ? 1e50 : timeout; + + vector DBServers = getCurrentDBServers(); + int count = 0; // this counts, when we have to reload the DBServers + + string where = "Current/Databases/" + name; + while (TRI_microtime() <= endtime) { + res = ac.getValues(where, true); + if (res.successful() && res.parse(where+"/", false)) { + if (res._values.size() == DBServers.size()) { + map::iterator it; + string tmpMsg = ""; + bool tmpHaveError = false; + for (it = res._values.begin(); it != res._values.end(); ++it) { + TRI_json_t const* json = (*it).second._json; + TRI_json_t const* error = TRI_LookupArrayJson(json, "error"); + if (TRI_IsBooleanJson(error) && error->_value._boolean) { + tmpHaveError = true; + tmpMsg += " DBServer:"+it->first+":"; + TRI_json_t const* errorMessage + = TRI_LookupArrayJson(json, "errorMessage"); + if (TRI_IsStringJson(errorMessage)) { + tmpMsg += string(errorMessage->_value._string.data, + errorMessage->_value._string.length); + } + TRI_json_t const* errorNum = TRI_LookupArrayJson(json, "errorNum"); + if (TRI_IsNumberJson(errorNum)) { + tmpMsg += " (errNum="; + tmpMsg += basics::StringUtils::itoa(static_cast( + errorNum->_value._number)); + tmpMsg += ")"; + } + } + } + if (tmpHaveError) { + errorMsg = "Error in creation of database:" + tmpMsg; + return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE; + } + return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + } + res = ac.watchValue("Current/Version", index, 5.0, false); + index = res._index; + if (++count >= 12) { + // We update the list of DBServers every minute in case one of them + // was taken away since we last looked. This also helps (slightly) + // if a new DBServer was added. However, in this case we report + // success a bit too early, which is not too bad. + loadCurrentDBServers(); + DBServers = getCurrentDBServers(); + count = 0; + } + } + return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop database in coordinator, the return value is an ArangoDB +/// error code and the errorMsg is set accordingly. One possible error +/// is a timeout, a timeout of 0.0 means no timeout. +//////////////////////////////////////////////////////////////////////////////// + +int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg, + double timeout) { + AgencyComm ac; + AgencyCommResult res; + + { + AgencyCommLocker locker("Plan", "WRITE"); + + if (! locker.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg); + } + + if (! ac.exists("Plan/Databases/" + name)) { + return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); + } + + res = ac.removeValues("Plan/Databases/"+name, false); + if (!res.successful()) { + if (res._statusCode == rest::HttpResponse::NOT_FOUND) { + return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); + } + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN, + errorMsg); + } + } + + // Now wait for it to appear and be complete: + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, + errorMsg); + } + uint64_t index = res._index; + double endtime = TRI_microtime(); + endtime += timeout == 0.0 ? 1e50 : timeout; + + string where = "Current/Databases/" + name; + while (TRI_microtime() <= endtime) { + res = ac.getValues(where, true); + if (res.successful() && res.parse(where+"/", false)) { + if (res._values.size() == 0) { + AgencyCommLocker locker("Current", "WRITE"); + if (locker.successful()) { + res = ac.removeValues(where, true); + if (res.successful()) { + return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + return set_errormsg( + TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_CURRENT, errorMsg); + } + return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + } + res = ac.watchValue("Current/Version", index, 5.0, false); + index = res._index; + } + return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create collection in coordinator, the return value is an ArangoDB +/// error code and the errorMsg is set accordingly. One possible error +/// is a timeout, a timeout of 0.0 means no timeout. +//////////////////////////////////////////////////////////////////////////////// + +int ClusterInfo::createCollectionCoordinator (string const& databaseName, + string const& collectionID, + uint64_t numberOfShards, + TRI_json_t const* json, + string errorMsg, double timeout) { + AgencyComm ac; + + { + AgencyCommLocker locker("Plan", "WRITE"); + + if (! locker.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg); + } + + if (! ac.exists("Plan/Databases/" + databaseName)) { + return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); + } + + if (ac.exists("Plan/Collections/" + databaseName + "/"+collectionID)) { + return set_errormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg); + } + + AgencyCommResult result + = ac.setValue("Plan/Collections/" + databaseName + "/"+collectionID, + json, 0.0); + if (!result.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, + errorMsg); + } + } + + // Now wait for it to appear and be complete: + AgencyCommResult res = ac.getValues("Current/Version", false); + if (!res.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, + errorMsg); + } + uint64_t index = res._index; + double endtime = TRI_microtime(); + endtime += timeout == 0.0 ? 1e50 : timeout; + + string where = "Current/Collections/" + databaseName + "/" + collectionID; + while (TRI_microtime() <= endtime) { + res = ac.getValues(where, true); + if (res.successful() && res.parse(where+"/", false)) { + cout << "Seeing " << res._values.size() << "shards." << endl; + if (res._values.size() == numberOfShards) { + map::iterator it; + string tmpMsg = ""; + bool tmpHaveError = false; + for (it = res._values.begin(); it != res._values.end(); ++it) { + TRI_json_t const* json = (*it).second._json; + TRI_json_t const* error = TRI_LookupArrayJson(json, "error"); + if (TRI_IsBooleanJson(error) && error->_value._boolean) { + tmpHaveError = true; + tmpMsg += " shardID:"+it->first+":"; + TRI_json_t const* errorMessage + = TRI_LookupArrayJson(json, "errorMessage"); + if (TRI_IsStringJson(errorMessage)) { + tmpMsg += string(errorMessage->_value._string.data, + errorMessage->_value._string.length); + } + TRI_json_t const* errorNum = TRI_LookupArrayJson(json, "errorNum"); + if (TRI_IsNumberJson(errorNum)) { + tmpMsg += " (errNum="; + tmpMsg += basics::StringUtils::itoa(static_cast( + errorNum->_value._number)); + tmpMsg += ")"; + } + } + } + if (tmpHaveError) { + errorMsg = "Error in creation of collection:" + tmpMsg; + return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; + } + return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + } + res = ac.watchValue("Current/Version", index, 5.0, false); + index = res._index; + } + return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); +} + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop collection in coordinator, the return value is an ArangoDB +/// error code and the errorMsg is set accordingly. One possible error +/// is a timeout, a timeout of 0.0 means no timeout. +//////////////////////////////////////////////////////////////////////////////// + +int ClusterInfo::dropCollectionCoordinator (string const& databaseName, + string const& collectionID, + string& errorMsg, + double timeout) { + AgencyComm ac; + AgencyCommResult res; + + { + AgencyCommLocker locker("Plan", "WRITE"); + + if (! locker.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg); + } + + if (! ac.exists("Plan/Databases/" + databaseName)) { + return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); + } + + res = ac.removeValues("Plan/Collections/"+databaseName+"/"+collectionID, + false); + if (!res.successful()) { + if (res._statusCode == rest::HttpResponse::NOT_FOUND) { + return set_errormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg); + } + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN, + errorMsg); + } + } + + // Now wait for it to appear and be complete: + res = ac.getValues("Current/Version", false); + if (!res.successful()) { + return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, + errorMsg); + } + uint64_t index = res._index; + double endtime = TRI_microtime(); + endtime += timeout == 0.0 ? 1e50 : timeout; + + string where = "Current/Collections/" + databaseName + "/" + collectionID; + while (TRI_microtime() <= endtime) { + res = ac.getValues(where, true); + if (res.successful() && res.parse(where+"/", false)) { + if (res._values.size() == 0) { + AgencyCommLocker locker("Current", "WRITE"); + if (locker.successful()) { + res = ac.removeValues("Current/Collections/"+databaseName+"/"+ + collectionID, true); + if (res.successful()) { + return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + return set_errormsg( + TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, errorMsg); + } + return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg); + } + } + res = ac.watchValue("Current/Version", index, 5.0, false); + index = res._index; + } + return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about servers from the agency +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadServers () { + static const std::string prefix = "Current/ServersRegistered"; + + AgencyCommResult result; + + { + AgencyCommLocker locker("Current", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + result.parse(prefix + "/", false); + + WRITE_LOCKER(_lock); + _servers.clear(); + + std::map::const_iterator it = result._values.begin(); + + while (it != result._values.end()) { + const std::string server = triagens::basics::JsonHelper::getStringValue((*it).second._json, ""); + + _servers.insert(std::make_pair((*it).first, server)); + ++it; + } + + _serversValid = true; + + return; + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); + + _serversValid = false; + + return; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief find the endpoint of a server from its ID. +/// If it is not found in the cache, the cache is reloaded once, if +/// it is still not there an empty string is returned as an error. +//////////////////////////////////////////////////////////////////////////////// + +std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { + int tries = 0; + + if (! _serversValid) { + loadServers(); + tries++; + } + + while (++tries <= 2) { + { + READ_LOCKER(_lock); + std::map::const_iterator it = _servers.find(serverID); + + if (it != _servers.end()) { + return (*it).second; + } + } + + // must call loadServers outside the lock + loadServers(); + } + + return std::string(""); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about all DBservers from the agency +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadCurrentDBServers () { + static const std::string prefix = "Current/DBServers"; + + AgencyCommResult result; + + { + AgencyCommLocker locker("Current", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + result.parse(prefix + "/", false); + + WRITE_LOCKER(_lock); + _DBServers.clear(); + + std::map::const_iterator it = result._values.begin(); + + for (; it != result._values.end(); ++it) { + _DBServers.insert(std::make_pair((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, ""))); + } + + _DBServersValid = true; + return; + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); + + _DBServersValid = false; + + return; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return a list of all DBServers in the cluster that have +/// currently registered +//////////////////////////////////////////////////////////////////////////////// + +std::vector ClusterInfo::getCurrentDBServers () { + if (! _DBServersValid) { + loadCurrentDBServers(); + } + + std::vector result; + + READ_LOCKER(_lock); + std::map::iterator it = _DBServers.begin(); + + while (it != _DBServers.end()) { + result.push_back((*it).first); + it++; + } + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief lookup the server's endpoint by scanning Target/MapIDToEnpdoint for +/// our id +//////////////////////////////////////////////////////////////////////////////// + +std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) { + static const std::string prefix = "Target/MapIDToEndpoint/"; + + AgencyCommResult result; + + // fetch value at Target/MapIDToEndpoint + { + AgencyCommLocker locker("Target", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix + serverID, false); + } + } + + if (result.successful()) { + result.parse(prefix, false); + + // check if we can find ourselves in the list returned by the agency + std::map::const_iterator it = result._values.find(serverID); + + if (it != result._values.end()) { + return triagens::basics::JsonHelper::getStringValue((*it).second._json, ""); + } + } + + // not found + return ""; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief find the server who is responsible for a shard +/// If it is not found in the cache, the cache is reloaded once, if +/// it is still not there an empty string is returned as an error. +//////////////////////////////////////////////////////////////////////////////// + +ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) { + int tries = 0; + + if (! _collectionsValid) { + loadCurrentCollections(); + tries++; + } + + while (++tries <= 2) { + { + READ_LOCKER(_lock); + std::map::const_iterator it = _shardIds.find(shardID); + + if (it != _shardIds.end()) { + return (*it).second; + } + } + + // must load collections outside the lock + loadCurrentCollections(); + } + + return ServerID(""); +} + +// Local Variables: +// mode: outline-minor +// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" +// End: diff --git a/arangod/Cluster/ClusterInfoCurrent.h b/arangod/Cluster/ClusterInfoCurrent.h new file mode 100644 index 0000000000..0dc463d514 --- /dev/null +++ b/arangod/Cluster/ClusterInfoCurrent.h @@ -0,0 +1,748 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief Class to get and cache information about the cluster state +/// +/// @file ClusterInfo.h +/// +/// DISCLAIMER +/// +/// Copyright 2010-2013 triagens GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Max Neunhoeffer +/// @author Jan Steemann +/// @author Copyright 2013, triagens GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#ifndef TRIAGENS_CLUSTER_CLUSTER_INFO_H +#define TRIAGENS_CLUSTER_CLUSTER_INFO_H 1 + +#include "Basics/Common.h" +#include "Basics/JsonHelper.h" +#include "Cluster/AgencyComm.h" +#include "VocBase/collection.h" +#include "VocBase/voc-types.h" +#include "VocBase/vocbase.h" + +#ifdef __cplusplus +extern "C" { + struct TRI_json_s; + struct TRI_memory_zone_s; +#endif + +namespace triagens { + namespace arango { + class ClusterInfo; + +// ----------------------------------------------------------------------------- +// --SECTION-- some types for ClusterInfo +// ----------------------------------------------------------------------------- + + typedef std::string ServerID; // ID of a server + typedef std::string DatabaseID; // ID/name of a database + typedef std::string CollectionID; // ID of a collection + typedef std::string ShardID; // ID of a shard + +// ----------------------------------------------------------------------------- +// --SECTION-- class CollectionInfo +// ----------------------------------------------------------------------------- + + class CollectionInfo { + friend class ClusterInfo; + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors / destructors +// ----------------------------------------------------------------------------- + + public: + + CollectionInfo (); + + CollectionInfo (ShardID const&, struct TRI_json_s*); + + CollectionInfo (CollectionInfo const&); + + CollectionInfo& operator= (CollectionInfo const&); + + ~CollectionInfo (); + + private: + + void freeAllJsons (); + + void copyAllJsons (); + +// ----------------------------------------------------------------------------- +// --SECTION-- public methods +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief add a new shardID and JSON pair, returns true if OK and false +/// if the shardID already exists. In the latter case nothing happens. +/// The CollectionInfo object takes ownership of the TRI_json_t*. +//////////////////////////////////////////////////////////////////////////////// + + bool add (ShardID& shardID, TRI_json_t* json) { + map::iterator it = _jsons.find(shardID); + if (it == _jsons.end()) { + _jsons.insert(make_pair(shardID, json)); + return true; + } + return false; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection id +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_cid_t id () const { + // The id will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + return triagens::basics::JsonHelper::stringUInt64(_json, "id"); + } + else { + return 0; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection name for one shardID +//////////////////////////////////////////////////////////////////////////////// + + string name (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getStringValue + (_json, "name", ""); + } + return string(""); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection name for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map name () const { + map m; + map::const_iterator it; + string n; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + n = triagens::basics::JsonHelper::getStringValue(_json, "name", ""); + m.insert(make_pair(it->first,n)); + } + return m; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns a global name of the cluster collection +//////////////////////////////////////////////////////////////////////////////// + + string globalName () const { + // FIXME: do it + return "Hans"; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection type +//////////////////////////////////////////////////////////////////////////////// + + TRI_col_type_e type () const { + // The type will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + return triagens::basics::JsonHelper::getNumericValue + (_json, "type", TRI_COL_TYPE_UNKNOWN); + } + else { + return TRI_COL_TYPE_UNKNOWN; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection status for one shardID +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_col_status_e status (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getNumericValue + + (_json, "status", TRI_VOC_COL_STATUS_CORRUPTED); + } + return TRI_VOC_COL_STATUS_CORRUPTED; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection status for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map status () const { + map m; + map::const_iterator it; + TRI_vocbase_col_status_e s; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + s = triagens::basics::JsonHelper::getNumericValue + + (_json, "status", TRI_VOC_COL_STATUS_CORRUPTED); + m.insert(make_pair(it->first,s)); + } + return m; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns a global status of the cluster collection +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_col_status_e globalStatus () const { + // FIXME: do it + return TRI_VOC_COL_STATUS_CORRUPTED; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief local helper to return boolean flags +//////////////////////////////////////////////////////////////////////////////// + + private: + + bool getFlag (char const* name, ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getBooleanValue(_json, + name, false); + } + return false; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief local helper to return a map to boolean +//////////////////////////////////////////////////////////////////////////////// + + map getFlag (char const* name ) const { + map m; + map::const_iterator it; + bool b; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + b = triagens::basics::JsonHelper::getBooleanValue(_json, + name, false); + m.insert(make_pair(it->first,b)); + } + return m; + } + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the deleted flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool deleted (ShardID const& shardID) const { + return getFlag("deleted", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the deleted flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map deleted () const { + return getFlag("deleted"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the doCompact flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool doCompact (ShardID const& shardID) const { + return getFlag("doCompact", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the doCompact flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map doCompact () const { + return getFlag("doCompact"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isSystem flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool isSystem (ShardID const& shardID) const { + return getFlag("isSystem", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isSystem flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map isSystem () const { + return getFlag("isSystem"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isVolatile flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool isVolatile (ShardID const& shardID) const { + return getFlag("isVolatile", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isVolatile flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map isVolatile () const { + return getFlag("isVolatile"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the waitForSync flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool waitForSync (ShardID const& shardID) const { + return getFlag("waitForSync", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the waitForSync flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map waitForSync () const { + return getFlag("waitForSync"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns a copy of the key options +/// the caller is responsible for freeing it +//////////////////////////////////////////////////////////////////////////////// + + TRI_json_t* keyOptions () const { + // The id will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + TRI_json_t const* keyOptions + = triagens::basics::JsonHelper::getArrayElement + (_json, "keyOptions"); + + if (keyOptions != 0) { + return TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, keyOptions); + } + + return 0; + } + else { + return 0; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the maximal journal size for one shardID +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_size_t journalSize (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getNumericValue + (_json, "journalSize", 0); + } + return 0; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the maximal journal size for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map journalSize () const { + map m; + map::const_iterator it; + TRI_voc_size_t s; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + s = triagens::basics::JsonHelper::getNumericValue + (_json, "journalSize", 0); + m.insert(make_pair(it->first,s)); + } + return m; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the shard keys +//////////////////////////////////////////////////////////////////////////////// + + vector shardKeys () const { + // The shardKeys will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + TRI_json_t* const node + = triagens::basics::JsonHelper::getArrayElement + (_json, "shardKeys"); + return triagens::basics::JsonHelper::stringList(node); + } + else { + vector result; + return result; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the shard ids that should be in the collection +//////////////////////////////////////////////////////////////////////////////// + + map shardIdsPlanned () const { + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + TRI_json_t* const node + = triagens::basics::JsonHelper::getArrayElement(_json, "shards"); + return triagens::basics::JsonHelper::stringObject(node); + } + else { + map result; + return result; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the shard ids that are currently in the collection +//////////////////////////////////////////////////////////////////////////////// + + vector shardIDs () const { + vector v; + map::const_iterator it; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + v.push_back(it->first); + } + return v; + } + +// ----------------------------------------------------------------------------- +// --SECTION-- private methods +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + + map _jsons; + }; + + +// ----------------------------------------------------------------------------- +// --SECTION-- class ClusterInfo +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// --SECTION-- typedefs +// ----------------------------------------------------------------------------- + + class ClusterInfo { + private: + + typedef std::map DatabaseCollections; + typedef std::map AllCollections; + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors and destructors +// ----------------------------------------------------------------------------- + + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialises library +/// We are a singleton class, therefore nobody is allowed to create +/// new instances or copy them, except we ourselves. +//////////////////////////////////////////////////////////////////////////////// + + ClusterInfo (); + ClusterInfo (ClusterInfo const&); // not implemented + void operator= (ClusterInfo const&); // not implemented + +//////////////////////////////////////////////////////////////////////////////// +/// @brief shuts down library +//////////////////////////////////////////////////////////////////////////////// + + public: + + ~ClusterInfo (); + +// ----------------------------------------------------------------------------- +// --SECTION-- public static methods +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the unique instance +//////////////////////////////////////////////////////////////////////////////// + + static ClusterInfo* instance (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief cleanup function to call once when shutting down +//////////////////////////////////////////////////////////////////////////////// + + static void cleanup () { + delete _theinstance; + _theinstance = 0; + } + +// ----------------------------------------------------------------------------- +// --SECTION-- public methods +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get a number of cluster-wide unique IDs, returns the first +/// one and guarantees that are reserved for the caller. +//////////////////////////////////////////////////////////////////////////////// + + uint64_t uniqid (uint64_t = 1); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief flush the caches (used for testing only) +//////////////////////////////////////////////////////////////////////////////// + + void flush (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask whether a cluster database exists +//////////////////////////////////////////////////////////////////////////////// + + bool doesDatabaseExist (DatabaseID const&, + bool = false); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get list of databases in the cluster +//////////////////////////////////////////////////////////////////////////////// + + vector listDatabases (bool = false); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about collections from the agency +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + + void loadCurrentCollections (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief flushes the list of planned databases +//////////////////////////////////////////////////////////////////////////////// + + void clearPlannedDatabases (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief flushes the list of current databases +//////////////////////////////////////////////////////////////////////////////// + + void clearCurrentDatabases (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about planned databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + + void loadPlannedDatabases (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about current databases +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + + void loadCurrentDatabases (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask about a collection +/// If it is not found in the cache, the cache is reloaded once. +//////////////////////////////////////////////////////////////////////////////// + + CollectionInfo getCollection (DatabaseID const&, + CollectionID const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get properties of a collection +//////////////////////////////////////////////////////////////////////////////// + + TRI_col_info_t getCollectionProperties (CollectionInfo const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief get properties of a collection +//////////////////////////////////////////////////////////////////////////////// + + TRI_col_info_t getCollectionProperties (DatabaseID const&, + CollectionID const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask about all collections +//////////////////////////////////////////////////////////////////////////////// + + const std::vector getCollections (DatabaseID const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create database in coordinator +//////////////////////////////////////////////////////////////////////////////// + + int createDatabaseCoordinator (string const& name, + TRI_json_t const* json, + string errorMsg, double timeout); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop database in coordinator +//////////////////////////////////////////////////////////////////////////////// + + int dropDatabaseCoordinator (string const& name, string& errorMsg, + double timeout); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create collection in coordinator +//////////////////////////////////////////////////////////////////////////////// + + int createCollectionCoordinator (string const& databaseName, + string const& collectionID, + uint64_t numberOfShards, + TRI_json_t const* json, + string errorMsg, double timeout); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop collection in coordinator +//////////////////////////////////////////////////////////////////////////////// + + int dropCollectionCoordinator (string const& databaseName, + string const& collectionID, + string& errorMsg, + double timeout); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about all DBservers from the agency +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + + void loadCurrentDBServers (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return a list of all DBServers in the cluster that have +/// currently registered +//////////////////////////////////////////////////////////////////////////////// + + std::vector getCurrentDBServers (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about servers from the agency +/// Usually one does not have to call this directly. +//////////////////////////////////////////////////////////////////////////////// + + void loadServers (); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief find the endpoint of a server from its ID. +/// If it is not found in the cache, the cache is reloaded once, if +/// it is still not there an empty string is returned as an error. +//////////////////////////////////////////////////////////////////////////////// + + std::string getServerEndpoint (ServerID const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief lookup the server's endpoint by scanning Target/MapIDToEnpdoint for +/// our id +//////////////////////////////////////////////////////////////////////////////// + + std::string getTargetServerEndpoint (ServerID const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief find the server who is responsible for a shard +/// If it is not found in the cache, the cache is reloaded once, if +/// it is still not there an empty string is returned as an error. +//////////////////////////////////////////////////////////////////////////////// + + ServerID getResponsibleServer (ShardID const&); + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + + AgencyComm _agency; + triagens::basics::ReadWriteLock _lock; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief uniqid sequence +//////////////////////////////////////////////////////////////////////////////// + + struct { + uint64_t _currentValue; + uint64_t _upperValue; + } + _uniqid; + + // Cached data from the agency, we reload whenever necessary: + std::map _plannedDatabases; // from Plan/Databases + std::map > _currentDatabases; // from Current/Databases + + AllCollections _collections; // from Current/Collections/ + bool _collectionsValid; + std::map _servers; // from Current/ServersRegistered + bool _serversValid; + std::map _DBServers; // from Current/DBServers + bool _DBServersValid; + std::map _shardIds; // from Current/ShardLocation + +// ----------------------------------------------------------------------------- +// --SECTION-- private static variables +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief the sole instance +//////////////////////////////////////////////////////////////////////////////// + + static ClusterInfo* _theinstance; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief how big a batch is for unique ids +//////////////////////////////////////////////////////////////////////////////// + + static const uint64_t MinIdsPerBatch = 100; + + }; + + } // end namespace arango +} // end namespace triagens + +#ifdef __cplusplus +} +#endif + +#endif + +// Local Variables: +// mode: outline-minor +// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" +// End: