diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index b78ca024a6..eddfbf17aa 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -113,6 +113,86 @@ CollectionInfo::~CollectionInfo () { } } +// ----------------------------------------------------------------------------- +// --SECTION-- CollectionInfoCurrent class +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors / destructors +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates an empty collection info object +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfoCurrent::CollectionInfoCurrent () { +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from json +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfoCurrent::CollectionInfoCurrent (ShardID const& shardID, TRI_json_t* json) { + _jsons.insert(make_pair(shardID, json)); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from another +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfoCurrent::CollectionInfoCurrent (CollectionInfoCurrent const& other) : + _jsons(other._jsons) { + copyAllJsons(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a collection info object from json +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfoCurrent& CollectionInfoCurrent::operator= (CollectionInfoCurrent const& other) { + if (this == &other) { + return *this; + } + freeAllJsons(); + _jsons = other._jsons; + copyAllJsons(); + return *this; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroys a collection info object +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfoCurrent::~CollectionInfoCurrent () { + freeAllJsons(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief free all pointers to TRI_json_t in the map _jsons +//////////////////////////////////////////////////////////////////////////////// + +void CollectionInfoCurrent::freeAllJsons () { + map::iterator it; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + if (it->second != 0) { + TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, it->second); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief copy TRI_json_t behind the pointers in the map _jsons +//////////////////////////////////////////////////////////////////////////////// + +void CollectionInfoCurrent::copyAllJsons () { + map::iterator it; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + if (0 != it->second) { + it->second = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second); + } + } +} + // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- @@ -202,10 +282,12 @@ void ClusterInfo::flush () { WRITE_LOCKER(_lock); _collectionsValid = false; + _collectionsCurrentValid = false; _serversValid = false; _DBServersValid = false; _collections.clear(); + _collectionsCurrent.clear(); _servers.clear(); _shardIds.clear(); @@ -468,7 +550,6 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { WRITE_LOCKER(_lock); _collections.clear(); - _shardIds.clear(); std::map::iterator it = result._values.begin(); @@ -508,17 +589,6 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) { (*it2).second.insert(std::make_pair(collection, collectionData)); (*it2).second.insert(std::make_pair(collectionData.name(), collectionData)); - std::map shards = collectionData.shardIds(); - std::map::const_iterator it3 = shards.begin(); - - while (it3 != shards.end()) { - const std::string shardId = (*it3).first; - const std::string serverId = (*it3).second; - - _shardIds.insert(std::make_pair(shardId, serverId)); - ++it3; - } - } _collectionsValid = true; return; @@ -575,7 +645,7 @@ TRI_col_info_t ClusterInfo::getCollectionProperties (CollectionInfo const& colle info._type = collection.type(); info._cid = collection.id(); info._revision = 0; // TODO - info._maximalSize = collection.maximalSize(); + info._maximalSize = collection.journalSize(); const std::string name = collection.name(); memcpy(info._name, name.c_str(), name.size()); @@ -634,6 +704,147 @@ const std::vector ClusterInfo::getCollections (DatabaseID const& return result; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about current collections from the agency +/// Usually one does not have to call this directly. Note that this is +/// necessarily complicated, since here we have to consider information +/// about all shards of a collection. +//////////////////////////////////////////////////////////////////////////////// + +void ClusterInfo::loadCurrentCollections (bool acquireLock) { + static const std::string prefix = "Current/Collections"; + + AgencyCommResult result; + + { + if (acquireLock) { + AgencyCommLocker locker("Current", "READ"); + + if (locker.successful()) { + result = _agency.getValues(prefix, true); + } + } + else { + result = _agency.getValues(prefix, true); + } + } + + if (result.successful()) { + result.parse(prefix + "/", false); + + WRITE_LOCKER(_lock); + _collectionsCurrent.clear(); + _shardIds.clear(); + + std::map::iterator it = result._values.begin(); + + for (; it != result._values.end(); ++it) { + const std::string key = (*it).first; + + // each entry consists of a database id, a collection id, and a shardID, + // separated by '/' + std::vector parts = triagens::basics::StringUtils::split(key, '/'); + + if (parts.size() != 3) { + // invalid entry + LOG_WARNING("found invalid collection key in current in agency: '%s'", key.c_str()); + continue; + } + + const std::string database = parts[0]; + const std::string collection = parts[1]; + const std::string shardID = parts[2]; + + // check whether we have created an entry for the database already + AllCollectionsCurrent::iterator it2 = _collectionsCurrent.find(database); + + if (it2 == _collectionsCurrent.end()) { + // not yet, so create an entry for the database + DatabaseCollectionsCurrent empty; + _collectionsCurrent.insert(std::make_pair(database, empty)); + it2 = _collectionsCurrent.find(database); + } + + TRI_json_t* json = (*it).second._json; + // steal the json + (*it).second._json = 0; + + // check whether we already have a CollectionInfoCurrent: + DatabaseCollectionsCurrent::iterator it3; + it3 = it2->second.find(collection); + if (it3 == it2->second.end()) { + const CollectionInfoCurrent collectionDataCurrent(shardID, json); + it2->second.insert(make_pair + (collection, collectionDataCurrent)); + it3 = it2->second.find(collection); + } + else { + it3->second.add(shardID, json); + } + + // Note that we have only inserted the CollectionInfoCurrent under + // the collection ID and not under the name! It is not possible + // to query the current collection info by name. This is because + // the correct place to hold the current name is in the plan. + // Thus: Look there and get the collection ID from there. Then + // ask about the current collection info. + + // Now take note of this shard and its responsible server: + std::string DBserver = triagens::basics::JsonHelper::getStringValue + (json, "DBserver", ""); + if (DBserver != "") { + _shardIds.insert(make_pair(shardID, DBserver)); + } + } + _collectionsCurrentValid = true; + return; + } + + LOG_TRACE("Error while loading %s", prefix.c_str()); + _collectionsCurrentValid = false; + +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask about a collection in current. This returns information about +/// all shards in the collection. +/// If it is not found in the cache, the cache is reloaded once. +//////////////////////////////////////////////////////////////////////////////// + +CollectionInfoCurrent ClusterInfo::getCollectionCurrent + (DatabaseID const& databaseID, + CollectionID const& collectionID) { + int tries = 0; + + if (! _collectionsCurrentValid) { + loadCurrentCollections(true); + ++tries; + } + + while (++tries <= 2) { + { + READ_LOCKER(_lock); + // look up database by id + AllCollectionsCurrent::const_iterator it = _collectionsCurrent.find(databaseID); + + if (it != _collectionsCurrent.end()) { + // look up collection by id + DatabaseCollectionsCurrent::const_iterator it2 = (*it).second.find(collectionID); + + if (it2 != (*it).second.end()) { + return (*it2).second; + } + } + } + + // must load collections outside the lock + loadCurrentCollections(true); + } + + return CollectionInfoCurrent(); +} + + //////////////////////////////////////////////////////////////////////////////// /// @brief create database in coordinator, the return value is an ArangoDB /// error code and the errorMsg is set accordingly. One possible error @@ -1191,7 +1402,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) { } // must load collections outside the lock - loadPlannedCollections(true); + loadCurrentCollections(true); } return ServerID(""); diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 488ef38ec5..377b6ff0fc 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -183,8 +183,8 @@ namespace triagens { /// @brief returns the maximal journal size //////////////////////////////////////////////////////////////////////////////// - TRI_voc_size_t maximalSize () const { - return triagens::basics::JsonHelper::getNumericValue(_json, "maximalSize", 0); + TRI_voc_size_t journalSize () const { + return triagens::basics::JsonHelper::getNumericValue(_json, "journalSize", 0); } //////////////////////////////////////////////////////////////////////////////// @@ -219,6 +219,415 @@ namespace triagens { }; +// ----------------------------------------------------------------------------- +// --SECTION-- class CollectionInfoCurrent +// ----------------------------------------------------------------------------- + + class CollectionInfoCurrent { + friend class ClusterInfo; + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors / destructors +// ----------------------------------------------------------------------------- + + public: + + CollectionInfoCurrent (); + + CollectionInfoCurrent (ShardID const&, struct TRI_json_s*); + + CollectionInfoCurrent (CollectionInfoCurrent const&); + + CollectionInfoCurrent& operator= (CollectionInfoCurrent const&); + + ~CollectionInfoCurrent (); + + private: + + void freeAllJsons (); + + void copyAllJsons (); + +// ----------------------------------------------------------------------------- +// --SECTION-- public methods +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief add a new shardID and JSON pair, returns true if OK and false +/// if the shardID already exists. In the latter case nothing happens. +/// The CollectionInfoCurrent object takes ownership of the TRI_json_t*. +//////////////////////////////////////////////////////////////////////////////// + + bool add (ShardID const& shardID, TRI_json_t* json) { + map::iterator it = _jsons.find(shardID); + if (it == _jsons.end()) { + _jsons.insert(make_pair(shardID, json)); + return true; + } + return false; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection id +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_cid_t id () const { + // The id will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + return triagens::basics::JsonHelper::stringUInt64(_json, "id"); + } + else { + return 0; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection type +//////////////////////////////////////////////////////////////////////////////// + + TRI_col_type_e type () const { + // The type will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + return triagens::basics::JsonHelper::getNumericValue + (_json, "type", TRI_COL_TYPE_UNKNOWN); + } + else { + return TRI_COL_TYPE_UNKNOWN; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection status for one shardID +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_col_status_e status (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getNumericValue + + (_json, "status", TRI_VOC_COL_STATUS_CORRUPTED); + } + return TRI_VOC_COL_STATUS_CORRUPTED; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the collection status for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map status () const { + map m; + map::const_iterator it; + TRI_vocbase_col_status_e s; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + s = triagens::basics::JsonHelper::getNumericValue + + (_json, "status", TRI_VOC_COL_STATUS_CORRUPTED); + m.insert(make_pair(it->first,s)); + } + return m; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief local helper to return boolean flags +//////////////////////////////////////////////////////////////////////////////// + + private: + + bool getFlag (char const* name, ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getBooleanValue(_json, + name, false); + } + return false; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief local helper to return a map to boolean +//////////////////////////////////////////////////////////////////////////////// + + map getFlag (char const* name ) const { + map m; + map::const_iterator it; + bool b; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + b = triagens::basics::JsonHelper::getBooleanValue(_json, + name, false); + m.insert(make_pair(it->first,b)); + } + return m; + } + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the deleted flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool deleted (ShardID const& shardID) const { + return getFlag("deleted", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the deleted flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map deleted () const { + return getFlag("deleted"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the doCompact flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool doCompact (ShardID const& shardID) const { + return getFlag("doCompact", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the doCompact flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map doCompact () const { + return getFlag("doCompact"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isSystem flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool isSystem (ShardID const& shardID) const { + return getFlag("isSystem", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isSystem flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map isSystem () const { + return getFlag("isSystem"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isVolatile flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool isVolatile (ShardID const& shardID) const { + return getFlag("isVolatile", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the isVolatile flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map isVolatile () const { + return getFlag("isVolatile"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the error flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool error (ShardID const& shardID) const { + return getFlag("error", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the error flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map error () const { + return getFlag("error"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the waitForSync flag for a shardID +//////////////////////////////////////////////////////////////////////////////// + + bool waitForSync (ShardID const& shardID) const { + return getFlag("waitForSync", shardID); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the waitForSync flag for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map waitForSync () const { + return getFlag("waitForSync"); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns a copy of the key options +/// the caller is responsible for freeing it +//////////////////////////////////////////////////////////////////////////////// + + TRI_json_t* keyOptions () const { + // The id will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + TRI_json_t const* keyOptions + = triagens::basics::JsonHelper::getArrayElement + (_json, "keyOptions"); + + if (keyOptions != 0) { + return TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, keyOptions); + } + + return 0; + } + else { + return 0; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the maximal journal size for one shardID +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_size_t journalSize (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getNumericValue + (_json, "journalSize", 0); + } + return 0; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the maximal journal size for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map journalSize () const { + map m; + map::const_iterator it; + TRI_voc_size_t s; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + s = triagens::basics::JsonHelper::getNumericValue + (_json, "journalSize", 0); + m.insert(make_pair(it->first,s)); + } + return m; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the errorNum for one shardID +//////////////////////////////////////////////////////////////////////////////// + + int errorNum (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getNumericValue + (_json, "errorNum", 0); + } + return 0; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the errorNum for all shardIDs +//////////////////////////////////////////////////////////////////////////////// + + map errorNum () const { + map m; + map::const_iterator it; + TRI_voc_size_t s; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + TRI_json_t* _json = it->second; + s = triagens::basics::JsonHelper::getNumericValue + (_json, "errorNum", 0); + m.insert(make_pair(it->first,s)); + } + return m; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the shard keys +//////////////////////////////////////////////////////////////////////////////// + + vector shardKeys () const { + // The shardKeys will always be the same in every shard + map::const_iterator it = _jsons.begin(); + if (it != _jsons.end()) { + TRI_json_t* _json = it->second; + TRI_json_t* const node + = triagens::basics::JsonHelper::getArrayElement + (_json, "shardKeys"); + return triagens::basics::JsonHelper::stringList(node); + } + else { + vector result; + return result; + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the shard ids that are currently in the collection +//////////////////////////////////////////////////////////////////////////////// + + vector shardIDs () const { + vector v; + map::const_iterator it; + for (it = _jsons.begin(); it != _jsons.end(); ++it) { + v.push_back(it->first); + } + return v; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the responsible server for one shardID +//////////////////////////////////////////////////////////////////////////////// + + string responsibleServer (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getStringValue + (_json, "DBserver", ""); + } + return string(""); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief returns the errorMessage entry for one shardID +//////////////////////////////////////////////////////////////////////////////// + + string errorMessage (ShardID const& shardID) const { + map::const_iterator it = _jsons.find(shardID); + if (it != _jsons.end()) { + TRI_json_t* _json = _jsons.begin()->second; + return triagens::basics::JsonHelper::getStringValue + (_json, "errorMessage", ""); + } + return string(""); + } + +// ----------------------------------------------------------------------------- +// --SECTION-- private methods +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + + map _jsons; + }; + + // ----------------------------------------------------------------------------- // --SECTION-- class ClusterInfo // ----------------------------------------------------------------------------- @@ -230,8 +639,14 @@ namespace triagens { class ClusterInfo { private: - typedef std::map DatabaseCollections; - typedef std::map AllCollections; + typedef std::map + DatabaseCollections; + typedef std::map + AllCollections; + typedef std::map + DatabaseCollectionsCurrent; + typedef std::map + AllCollectionsCurrent; // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors @@ -370,6 +785,24 @@ namespace triagens { const std::vector getCollections (DatabaseID const&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief (re-)load the information about current collections from the agency +/// Usually one does not have to call this directly. Note that this is +/// necessarily complicated, since here we have to consider information +/// about all shards of a collection. +//////////////////////////////////////////////////////////////////////////////// + + void loadCurrentCollections (bool = true); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ask about a collection in current. This returns information about +/// all shards in the collection. +/// If it is not found in the cache, the cache is reloaded once. +//////////////////////////////////////////////////////////////////////////////// + + CollectionInfoCurrent getCollectionCurrent (DatabaseID const&, + CollectionID const&); + //////////////////////////////////////////////////////////////////////////////// /// @brief create database in coordinator //////////////////////////////////////////////////////////////////////////////// @@ -497,16 +930,25 @@ namespace triagens { _uniqid; // Cached data from the agency, we reload whenever necessary: - std::map _plannedDatabases; // from Plan/Databases - std::map > _currentDatabases; // from Current/Databases + std::map _plannedDatabases; + // from Plan/Databases + std::map > + _currentDatabases; // from Current/Databases - AllCollections _collections; // from Current/Collections/ - bool _collectionsValid; - std::map _servers; // from Current/ServersRegistered - bool _serversValid; - std::map _DBServers; // from Current/DBServers - bool _DBServersValid; - std::map _shardIds; // from Current/ShardLocation + AllCollections _collections; + // from Plan/Collections/ + bool _collectionsValid; + AllCollectionsCurrent _collectionsCurrent; + // from Current/Collections/ + bool _collectionsCurrentValid; + std::map _servers; + // from Current/ServersRegistered + bool _serversValid; + std::map _DBServers; + // from Current/DBServers + bool _DBServersValid; + std::map _shardIds; + // from Plan/Collections/ ??? // ----------------------------------------------------------------------------- // --SECTION-- private static variables diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 5df18380f3..f38a5bae8e 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -745,7 +745,7 @@ static v8::Handle JS_FlushClusterInfo (v8::Arguments const& argv) { } //////////////////////////////////////////////////////////////////////////////// -/// @brief get the responsible server +/// @brief get the info about a collection in Plan //////////////////////////////////////////////////////////////////////////////// static v8::Handle JS_GetCollectionInfoClusterInfo (v8::Arguments const& argv) { @@ -766,6 +766,17 @@ static v8::Handle JS_GetCollectionInfoClusterInfo (v8::Arguments cons result->Set(v8::String::New("type"), v8::Number::New((int) ci.type())); result->Set(v8::String::New("status"), v8::Number::New((int) ci.status())); + const string statusString = ci.statusString(); + result->Set(v8::String::New("statusString"), + v8::String::New(statusString.c_str(), statusString.size())); + + result->Set(v8::String::New("deleted"), v8::Boolean::New(ci.deleted())); + result->Set(v8::String::New("doCompact"), v8::Boolean::New(ci.doCompact())); + result->Set(v8::String::New("isSystem"), v8::Boolean::New(ci.isSystem())); + result->Set(v8::String::New("isVolatile"), v8::Boolean::New(ci.isVolatile())); + result->Set(v8::String::New("waitForSync"), v8::Boolean::New(ci.waitForSync())); + result->Set(v8::String::New("journalSize"), v8::Number::New(ci.journalSize())); + const std::vector& sks = ci.shardKeys(); v8::Handle shardKeys = v8::Array::New(sks.size()); for (uint32_t i = 0, n = sks.size(); i < n; ++i) { @@ -789,6 +800,71 @@ static v8::Handle JS_GetCollectionInfoClusterInfo (v8::Arguments cons return scope.Close(result); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the info about a collection in Current +//////////////////////////////////////////////////////////////////////////////// + +static v8::Handle JS_GetCollectionInfoCurrentClusterInfo (v8::Arguments const& argv) { + v8::HandleScope scope; + + if (argv.Length() != 3) { + TRI_V8_EXCEPTION_USAGE(scope, "getCollectionInfoCurrent(, , )"); + } + + ShardID shardID = TRI_ObjectToString(argv[2]); + + CollectionInfo ci = ClusterInfo::instance()->getCollection( + TRI_ObjectToString(argv[0]), + TRI_ObjectToString(argv[1])); + + v8::Handle result = v8::Object::New(); + // First some stuff from Plan for which Current does not make sense: + const std::string cid = triagens::basics::StringUtils::itoa(ci.id()); + const std::string& name = ci.name(); + result->Set(v8::String::New("id"), v8::String::New(cid.c_str(), cid.size())); + result->Set(v8::String::New("name"), v8::String::New(name.c_str(), name.size())); + + CollectionInfoCurrent cic = ClusterInfo::instance()->getCollectionCurrent( + TRI_ObjectToString(argv[0]), cid); + + result->Set(v8::String::New("type"), v8::Number::New((int) ci.type())); + // Now the Current information, if we actually got it: + TRI_vocbase_col_status_e s = cic.status(shardID); + result->Set(v8::String::New("status"), v8::Number::New((int) cic.status(shardID))); + if (s == TRI_VOC_COL_STATUS_CORRUPTED) { + return scope.Close(result); + } + const string statusString = TRI_GetStatusStringCollectionVocBase(s); + result->Set(v8::String::New("statusString"), + v8::String::New(statusString.c_str(), statusString.size())); + + result->Set(v8::String::New("deleted"), v8::Boolean::New(cic.deleted(shardID))); + result->Set(v8::String::New("doCompact"), v8::Boolean::New(cic.doCompact(shardID))); + result->Set(v8::String::New("isSystem"), v8::Boolean::New(cic.isSystem(shardID))); + result->Set(v8::String::New("isVolatile"), v8::Boolean::New(cic.isVolatile(shardID))); + result->Set(v8::String::New("waitForSync"), v8::Boolean::New(cic.waitForSync(shardID))); + result->Set(v8::String::New("journalSize"), v8::Number::New(cic.journalSize(shardID))); + const std::string serverID = cic.responsibleServer(shardID); + result->Set(v8::String::New("responsibleServer"), + v8::String::New(serverID.c_str(), serverID.size())); + + // TODO: fill "indexes" + v8::Handle indexes = v8::Array::New(); + result->Set(v8::String::New("indexes"), indexes); + + // Finally, report any possible error: + bool error = cic.error(shardID); + result->Set(v8::String::New("error"), v8::Boolean::New(error)); + if (error) { + result->Set(v8::String::New("errorNum"), v8::Number::New(cic.errorNum(shardID))); + const string errorMessage = cic.errorMessage(shardID); + result->Set(v8::String::New("errorMessage"), + v8::String::New(errorMessage.c_str(), errorMessage.size())); + } + + return scope.Close(result); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief get the responsible server //////////////////////////////////////////////////////////////////////////////// @@ -1595,6 +1671,7 @@ void TRI_InitV8Cluster (v8::Handle context) { TRI_AddMethodVocbase(rt, "listDatabases", JS_ListDatabases); TRI_AddMethodVocbase(rt, "flush", JS_FlushClusterInfo, true); TRI_AddMethodVocbase(rt, "getCollectionInfo", JS_GetCollectionInfoClusterInfo); + TRI_AddMethodVocbase(rt, "getCollectionInfoCurrent", JS_GetCollectionInfoCurrentClusterInfo); TRI_AddMethodVocbase(rt, "getResponsibleServer", JS_GetResponsibleServerClusterInfo); TRI_AddMethodVocbase(rt, "getServerEndpoint", JS_GetServerEndpointClusterInfo); TRI_AddMethodVocbase(rt, "getDBServers", JS_GetDBServers); diff --git a/js/apps/system/aardvark/cluster.js b/js/apps/system/aardvark/cluster.js index 5f209fa9fa..739825550a 100644 --- a/js/apps/system/aardvark/cluster.js +++ b/js/apps/system/aardvark/cluster.js @@ -28,70 +28,75 @@ /// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// +(function() { -"use strict"; - -// Initialise a new FoxxController called controller under the urlPrefix: "cluster". -var FoxxController = require("org/arangodb/foxx").Controller, - controller = new FoxxController(applicationContext), - _ = require("underscore"), - Communication = require("org/arangodb/sharding/agency-communication"), - comm = new Communication.Communication(); - -/** Get all DBServers - * - * Get a list of all running and expected DBServers - * within the cluster - */ -controller.get("/DBServers", function(req, res) { - var list = { - Pavel: { - role: "primary", - secondary: "Sally", - address: "tcp://192.168.0.1:1337" - }, - Pancho: { - role: "primary", - secondary: "none", - address: "tcp://192.168.0.2:1337" - }, - Pablo: { - role: "primary", - secondary: "Sandy", - address: "tcp://192.168.0.5:1337" - }, - Sally: { - role: "secondary", - address: "tcp://192.168.1.1:1337" - }, - Sandy: { - role: "secondary", - address: "tcp://192.168.1.5:1337" - } - }, - noBeat = ["Sandy"], - serving = ["Pancho", "Pavel"], + "use strict"; + // Initialise a new FoxxController called controller under the urlPrefix: "cluster". + var FoxxController = require("org/arangodb/foxx").Controller, + controller = new FoxxController(applicationContext), + _ = require("underscore"), + Communication = require("org/arangodb/sharding/agency-communication"), + comm = new Communication.Communication(), beats = comm.sync.Heartbeats(), - resList = []; - - list = comm.current.DBServers().getList(); - noBeat = beats.noBeat(); - serving = beats.getServing(); + servers = comm.current.DBServers(), + dbs = comm.current.Databases(), + coords = comm.current.Coordinators(); + /** Get all DBServers + * + * Get a list of all running and expected DBServers + * within the cluster + */ + controller.get("/DBServers", function(req, res) { + var resList = [], + list = servers.getList(), + noBeat = beats.noBeat(), + serving = beats.getServing(); - _.each(list, function(v, k) { - v.name = k; - resList.push(v); - if (_.contains(noBeat, k)) { - v.status = "critical"; - return; - } - if (v.role === "primary" && !_.contains(serving, k)) { - v.status = "warning"; - return; - } - v.status = "ok"; + _.each(list, function(v, k) { + v.name = k; + resList.push(v); + if (_.contains(noBeat, k)) { + v.status = "critical"; + return; + } + if (v.role === "primary" && !_.contains(serving, k)) { + v.status = "warning"; + return; + } + v.status = "ok"; + }); + res.json(resList); }); - res.json(resList); -}); + + controller.get("/Databases", function(req, res) { + var list = dbs.getList(); + res.json(_.map(list, function(d) { + return {name: d}; + })); + }); + + controller.get("/:dbname/Collections", function(req, res) { + var dbname = req.params("dbname"), + selected = dbs.select(dbname); + res.json(_.map(selected.getCollections(), + function(c) { + return {name: c}; + }) + ); + }); + + controller.get("/:dbname/:colname/Shards/:servername", function(req, res) { + var dbname = req.params("dbname"), + colname = req.params("colname"), + servername = req.params("servername"), + selected = dbs.select(dbname).collection(colname); + res.json(_.map(selected.getShardsForServer(servername), + function(c) { + return {id: c}; + }) + ); + }); + +}()); diff --git a/js/apps/system/aardvark/frontend/js/collections/clusterCollections.js b/js/apps/system/aardvark/frontend/js/collections/clusterCollections.js index d413cf8164..600606a7e9 100644 --- a/js/apps/system/aardvark/frontend/js/collections/clusterCollections.js +++ b/js/apps/system/aardvark/frontend/js/collections/clusterCollections.js @@ -5,9 +5,14 @@ window.ClusterCollections = Backbone.Collection.extend({ model: window.ClusterCollection, - url: "/_admin/aardvark/cluster/Collections", + url: function() { + return "/_admin/aardvark/cluster/" + + this.dbname + "/" + + "Collections"; + }, - getList: function() { + getList: function(db) { + this.dbname = db; this.fetch({ async: false }); diff --git a/js/apps/system/aardvark/frontend/js/collections/clusterShards.js b/js/apps/system/aardvark/frontend/js/collections/clusterShards.js index 94a9cc9e4b..9b69c316cf 100644 --- a/js/apps/system/aardvark/frontend/js/collections/clusterShards.js +++ b/js/apps/system/aardvark/frontend/js/collections/clusterShards.js @@ -8,9 +8,18 @@ model: window.ClusterShard, - url: "/_admin/aardvark/cluster/Shards", + url: function() { + return "/_admin/aardvark/cluster/" + + this.dbname + "/" + + this.colname + "/" + + "Shards/" + + this.server; + }, - getList: function() { + getList: function(dbname, colname, server) { + this.dbname = dbname; + this.colname = colname; + this.server = server; this.fetch({ async: false }); diff --git a/js/apps/system/aardvark/frontend/js/templates/clusterShardsView.ejs b/js/apps/system/aardvark/frontend/js/templates/clusterShardsView.ejs index d7d75e691d..0071475141 100644 --- a/js/apps/system/aardvark/frontend/js/templates/clusterShardsView.ejs +++ b/js/apps/system/aardvark/frontend/js/templates/clusterShardsView.ejs @@ -13,7 +13,7 @@
    <% _.each(shards, function(v) { %>
  • - +
  • <% }); %>
diff --git a/js/apps/system/aardvark/frontend/js/views/clusterCollectionView.js b/js/apps/system/aardvark/frontend/js/views/clusterCollectionView.js index 891f7d47c4..7404ee220b 100644 --- a/js/apps/system/aardvark/frontend/js/views/clusterCollectionView.js +++ b/js/apps/system/aardvark/frontend/js/views/clusterCollectionView.js @@ -22,9 +22,11 @@ loadCollection: function(e) { var id = e.currentTarget.id; - this.shardsView.render({ - name: id - }); + this.shardsView.render( + this.db, + id, + this.server + ); }, unrender: function() { @@ -32,9 +34,11 @@ this.shardsView.unrender(); }, - render: function() { + render: function(db, server) { + this.db = db; + this.server = server; $(this.el).html(this.template.render({ - collections: this.collection.getList() + collections: this.collection.getList(this.db) })); this.shardsView.unrender(); return this; diff --git a/js/apps/system/aardvark/frontend/js/views/clusterDatabaseView.js b/js/apps/system/aardvark/frontend/js/views/clusterDatabaseView.js index 501b36de65..f8877e6e2d 100644 --- a/js/apps/system/aardvark/frontend/js/views/clusterDatabaseView.js +++ b/js/apps/system/aardvark/frontend/js/views/clusterDatabaseView.js @@ -22,7 +22,7 @@ loadDatabase: function(e) { var id = e.currentTarget.id; - this.colView.render(id); + this.colView.render(id, this.server); }, unrender: function() { @@ -30,7 +30,8 @@ this.colView.unrender(); }, - render: function(){ + render: function(server) { + this.server = server; $(this.el).html(this.template.render({ databases: this.collection.getList() })); diff --git a/js/apps/system/aardvark/frontend/js/views/clusterServerView.js b/js/apps/system/aardvark/frontend/js/views/clusterServerView.js index 7932063b2c..4772c75a03 100644 --- a/js/apps/system/aardvark/frontend/js/views/clusterServerView.js +++ b/js/apps/system/aardvark/frontend/js/views/clusterServerView.js @@ -22,9 +22,7 @@ loadServer: function(e) { var id = e.currentTarget.id; - this.dbView.render({ - name: id - }); + this.dbView.render(id); this.render(true); }, diff --git a/js/apps/system/aardvark/frontend/js/views/clusterShardsView.js b/js/apps/system/aardvark/frontend/js/views/clusterShardsView.js index 6717aa0c00..b48b7f53cc 100644 --- a/js/apps/system/aardvark/frontend/js/views/clusterShardsView.js +++ b/js/apps/system/aardvark/frontend/js/views/clusterShardsView.js @@ -14,9 +14,9 @@ $(this.el).html(""); }, - render: function() { + render: function(db, col, server) { $(this.el).html(this.template.render({ - shards: this.collection.getList() + shards: this.collection.getList(db, col, server) })); return this; } diff --git a/js/server/modules/org/arangodb/cluster.js b/js/server/modules/org/arangodb/cluster.js index c4d834826e..05496f4668 100644 --- a/js/server/modules/org/arangodb/cluster.js +++ b/js/server/modules/org/arangodb/cluster.js @@ -381,6 +381,7 @@ function createLocalCollections (plannedCollections) { payload.errorMessage = err2.errorMessage; } + payload.DBserver = ourselves; writeLocked({ part: "Current" }, createCollectionAgency, [ database, shard, payload ]); @@ -415,6 +416,7 @@ function createLocalCollections (plannedCollections) { payload.errorMessage = err3.errorMessage; } + payload.DBserver = ourselves; writeLocked({ part: "Current" }, createCollectionAgency, [ database, shard, payload ]); diff --git a/js/server/modules/org/arangodb/sharding/agency-communication.js b/js/server/modules/org/arangodb/sharding/agency-communication.js index c2fd236d41..5aa15fe81d 100644 --- a/js/server/modules/org/arangodb/sharding/agency-communication.js +++ b/js/server/modules/org/arangodb/sharding/agency-communication.js @@ -39,7 +39,10 @@ exports.Communication = function() { storeServersInCache, Target, mapCollectionIDsToNames, + updateCollectionRouteForName, + updateDatabaseRoutes, difference, + self = this, _ = require("underscore"); splitServerName = function(route) { @@ -135,17 +138,17 @@ exports.Communication = function() { var target = addLevel(this, "target", "Target"); addLevel(target, "dbServers", "DBServers", ["get", "set", "remove", "checkVersion"]); addLevel(target, "db", "Collections", ["list"]); - addLevelsForDBs(target.db, true); + //addLevelsForDBs(target.db, true); addLevel(target, "coordinators", "Coordinators", ["list", "set", "remove", "checkVersion"]); var plan = addLevel(this, "plan", "Plan"); addLevel(plan, "dbServers", "DBServers", ["get", "checkVersion"]); addLevel(plan, "db", "Collections", ["list"]); - addLevelsForDBs(plan.db); + //addLevelsForDBs(plan.db); addLevel(plan, "coordinators", "Coordinators", ["list", "checkVersion"]); var current = addLevel(this, "current", "Current"); addLevel(current, "dbServers", "DBServers", ["get", "checkVersion"]); addLevel(current, "db", "Collections", ["list"]); - addLevelsForDBs(current.db); + //addLevelsForDBs(current.db); addLevel(current, "coordinators", "Coordinators", ["list", "checkVersion"]); addLevel(current, "registered", "ServersRegistered", ["get", "checkVersion"]); @@ -153,6 +156,7 @@ exports.Communication = function() { addLevel(sync, "beat", "ServerStates", ["get"]); addLevel(sync, "interval", "HeartbeatIntervalMs", ["get"]); + this.addLevel = addLevel; }; agency = new AgencyWrapper(); @@ -162,6 +166,33 @@ exports.Communication = function() { // --SECTION-- Helper Functions // ----------------------------------------------------------------------------- + updateDatabaseRoutes = function(base, writeAccess) { + var list = self.plan.Databases().getList(); + _.each(_.keys(base), function(k) { + if (k !== "route" && k !== "list") { + delete base[k]; + } + }); + _.each(list, function(d) { + agency.addLevel(base, d, d, ["get", "checkVersion"]); + }); + }; + + updateCollectionRouteForName = function(route, db, name, writeAccess) { + var list = self.plan.Databases().select(db).getCollectionObjects(); + var cId = null; + _.each(list, function(v, k) { + if (v.name === name) { + cId = splitServerName(k); + } + }); + var acts = ["get"]; + if (writeAccess) { + acts.push("set"); + } + agency.addLevel(route, name, cId, acts); + }; + //////////////////////////////////////////////////////////////////////////////// /// @brief Stores database servers in cache /// @@ -318,18 +349,27 @@ exports.Communication = function() { /// It allos to get a list of collections and to select one of them for /// further information. //////////////////////////////////////////////////////////////////////////////// - var DBObject = function(route, writeAccess) { + var DBObject = function(route, db, writeAccess) { var cache; - var getList = function() { + var getRaw = function() { if (!cache || !route.checkVersion()) { - cache = _.keys(mapCollectionIDsToNames(route.get(true))).sort(); + cache = route.get(true); } return cache; }; + var getList = function() { + return _.keys(mapCollectionIDsToNames( + self.plan.Databases().select(db).getCollectionObjects() + )).sort(); + }; + this.getCollectionObjects = function() { + return getRaw(); + }; this.getCollections = function() { return getList(); }; this.collection = function(name) { + updateCollectionRouteForName(route, db, name, writeAccess); var colroute = route[name]; if (!colroute) { return false; @@ -350,11 +390,12 @@ exports.Communication = function() { return route.list(); }; this.select = function(name) { + updateDatabaseRoutes(route, writeAccess); var subroute = route[name]; if (!subroute) { return false; } - return new DBObject(subroute, writeAccess); + return new DBObject(subroute, name, writeAccess); }; }; diff --git a/js/server/tests/sharding-agency-communication.js b/js/server/tests/sharding-agency-communication.js index 96caf3a30c..da9d4da71b 100644 --- a/js/server/tests/sharding-agency-communication.js +++ b/js/server/tests/sharding-agency-communication.js @@ -101,6 +101,42 @@ "11235": {name: "s"}, "6512": {name: "a"}, "123": {name: "d"} + }, + current: { + _system: { + "98213": { + "sg1": {}, + "sg2": {}, + "sg3": {} + }, + "87123": { + "sv1": {}, + "sv2": {}, + "sv3": {} + }, + "89123": { + "se1": {}, + "se2": {}, + "se3": {} + } + }, + a_db: { + "11235": { + "s01": {}, + "s02": {}, + "s03": {} + }, + "6512": { + "s11": {}, + "s12": {}, + "s13": {} + }, + "123": { + "s21": {}, + "s22": {}, + "s23": {} + } + } } }; var ips = { @@ -162,8 +198,8 @@ dummy.current.coordinators = createResult([agencyRoutes.current, agencyRoutes.sub.coords], coordinators); dummy.current.registered = createResult([agencyRoutes.current, agencyRoutes.sub.registered], ips); dummy.current.databases = databases; - dummy.current.syscollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "_system"], collections._system); - dummy.current.acollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "a_db"], collections.a_db); + dummy.current.syscollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "_system"], collections.current._system); + dummy.current.acollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "a_db"], collections.current.a_db); dummy.current.vInfo = vInfo; dummy.sync = {}; @@ -240,9 +276,9 @@ } break; default: - fail(); + fail("Requested route: GET " + route); } - fail(); + fail("Requested route: GET " + route); }, list: function(route, recursive, flat) { var parts = route.split("/"); @@ -280,7 +316,7 @@ } break; default: - fail(); + fail("Requested route: LIST " + route); } } }; @@ -562,7 +598,7 @@ ].sort(); assertEqual(dbs.getList(), list); }, - + testGetCollectionListForDatabase: function() { var syslist = [ "_graphs", @@ -631,6 +667,7 @@ assertTrue(wasCalled, "Agency has not been informed to move shard.."); assertEqual(colV.getServerForShard(shard), target); } + }; }; @@ -777,6 +814,7 @@ assertEqual(colV.getServerForShard("v1"), "pavel"); assertEqual(colV.getServerForShard("v2"), "paul"); } + }; }; @@ -877,7 +915,7 @@ ].sort(); assertEqual(dbs.getList(), list); }, - + testGetCollectionListForDatabase: function() { var syslist = [ "_graphs", @@ -927,6 +965,7 @@ assertEqual(colV.getServerForShard("v1"), "pavel"); assertEqual(colV.getServerForShard("v2"), "paul"); } + }; };