1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Jan Steemann 2014-01-21 15:14:13 +01:00
commit 7e732174ce
14 changed files with 954 additions and 120 deletions

View File

@ -113,6 +113,86 @@ CollectionInfo::~CollectionInfo () {
}
}
// -----------------------------------------------------------------------------
// --SECTION-- CollectionInfoCurrent class
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief creates an empty collection info object
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent::CollectionInfoCurrent () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from json
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent::CollectionInfoCurrent (ShardID const& shardID, TRI_json_t* json) {
_jsons.insert(make_pair<ShardID, TRI_json_t*>(shardID, json));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from another
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent::CollectionInfoCurrent (CollectionInfoCurrent const& other) :
_jsons(other._jsons) {
copyAllJsons();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from json
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent& CollectionInfoCurrent::operator= (CollectionInfoCurrent const& other) {
if (this == &other) {
return *this;
}
freeAllJsons();
_jsons = other._jsons;
copyAllJsons();
return *this;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroys a collection info object
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent::~CollectionInfoCurrent () {
freeAllJsons();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief free all pointers to TRI_json_t in the map _jsons
////////////////////////////////////////////////////////////////////////////////
void CollectionInfoCurrent::freeAllJsons () {
map<ShardID, TRI_json_t*>::iterator it;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
if (it->second != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, it->second);
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief copy TRI_json_t behind the pointers in the map _jsons
////////////////////////////////////////////////////////////////////////////////
void CollectionInfoCurrent::copyAllJsons () {
map<ShardID, TRI_json_t*>::iterator it;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
if (0 != it->second) {
it->second = TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second);
}
}
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
@ -202,10 +282,12 @@ void ClusterInfo::flush () {
WRITE_LOCKER(_lock);
_collectionsValid = false;
_collectionsCurrentValid = false;
_serversValid = false;
_DBServersValid = false;
_collections.clear();
_collectionsCurrent.clear();
_servers.clear();
_shardIds.clear();
@ -468,7 +550,6 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
WRITE_LOCKER(_lock);
_collections.clear();
_shardIds.clear();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
@ -508,17 +589,6 @@ void ClusterInfo::loadPlannedCollections (bool acquireLock) {
(*it2).second.insert(std::make_pair<CollectionID, CollectionInfo>(collection, collectionData));
(*it2).second.insert(std::make_pair<CollectionID, CollectionInfo>(collectionData.name(), collectionData));
std::map<std::string, std::string> shards = collectionData.shardIds();
std::map<std::string, std::string>::const_iterator it3 = shards.begin();
while (it3 != shards.end()) {
const std::string shardId = (*it3).first;
const std::string serverId = (*it3).second;
_shardIds.insert(std::make_pair<ShardID, ServerID>(shardId, serverId));
++it3;
}
}
_collectionsValid = true;
return;
@ -575,7 +645,7 @@ TRI_col_info_t ClusterInfo::getCollectionProperties (CollectionInfo const& colle
info._type = collection.type();
info._cid = collection.id();
info._revision = 0; // TODO
info._maximalSize = collection.maximalSize();
info._maximalSize = collection.journalSize();
const std::string name = collection.name();
memcpy(info._name, name.c_str(), name.size());
@ -634,6 +704,147 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about current collections from the agency
/// Usually one does not have to call this directly. Note that this is
/// necessarily complicated, since here we have to consider information
/// about all shards of a collection.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadCurrentCollections (bool acquireLock) {
static const std::string prefix = "Current/Collections";
AgencyCommResult result;
{
if (acquireLock) {
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues(prefix, true);
}
}
else {
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
result.parse(prefix + "/", false);
WRITE_LOCKER(_lock);
_collectionsCurrent.clear();
_shardIds.clear();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
for (; it != result._values.end(); ++it) {
const std::string key = (*it).first;
// each entry consists of a database id, a collection id, and a shardID,
// separated by '/'
std::vector<std::string> parts = triagens::basics::StringUtils::split(key, '/');
if (parts.size() != 3) {
// invalid entry
LOG_WARNING("found invalid collection key in current in agency: '%s'", key.c_str());
continue;
}
const std::string database = parts[0];
const std::string collection = parts[1];
const std::string shardID = parts[2];
// check whether we have created an entry for the database already
AllCollectionsCurrent::iterator it2 = _collectionsCurrent.find(database);
if (it2 == _collectionsCurrent.end()) {
// not yet, so create an entry for the database
DatabaseCollectionsCurrent empty;
_collectionsCurrent.insert(std::make_pair<DatabaseID, DatabaseCollectionsCurrent>(database, empty));
it2 = _collectionsCurrent.find(database);
}
TRI_json_t* json = (*it).second._json;
// steal the json
(*it).second._json = 0;
// check whether we already have a CollectionInfoCurrent:
DatabaseCollectionsCurrent::iterator it3;
it3 = it2->second.find(collection);
if (it3 == it2->second.end()) {
const CollectionInfoCurrent collectionDataCurrent(shardID, json);
it2->second.insert(make_pair<CollectionID, CollectionInfoCurrent>
(collection, collectionDataCurrent));
it3 = it2->second.find(collection);
}
else {
it3->second.add(shardID, json);
}
// Note that we have only inserted the CollectionInfoCurrent under
// the collection ID and not under the name! It is not possible
// to query the current collection info by name. This is because
// the correct place to hold the current name is in the plan.
// Thus: Look there and get the collection ID from there. Then
// ask about the current collection info.
// Now take note of this shard and its responsible server:
std::string DBserver = triagens::basics::JsonHelper::getStringValue
(json, "DBserver", "");
if (DBserver != "") {
_shardIds.insert(make_pair<ShardID, ServerID>(shardID, DBserver));
}
}
_collectionsCurrentValid = true;
return;
}
LOG_TRACE("Error while loading %s", prefix.c_str());
_collectionsCurrentValid = false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection in current. This returns information about
/// all shards in the collection.
/// If it is not found in the cache, the cache is reloaded once.
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent ClusterInfo::getCollectionCurrent
(DatabaseID const& databaseID,
CollectionID const& collectionID) {
int tries = 0;
if (! _collectionsCurrentValid) {
loadCurrentCollections(true);
++tries;
}
while (++tries <= 2) {
{
READ_LOCKER(_lock);
// look up database by id
AllCollectionsCurrent::const_iterator it = _collectionsCurrent.find(databaseID);
if (it != _collectionsCurrent.end()) {
// look up collection by id
DatabaseCollectionsCurrent::const_iterator it2 = (*it).second.find(collectionID);
if (it2 != (*it).second.end()) {
return (*it2).second;
}
}
}
// must load collections outside the lock
loadCurrentCollections(true);
}
return CollectionInfoCurrent();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create database in coordinator, the return value is an ArangoDB
/// error code and the errorMsg is set accordingly. One possible error
@ -1191,7 +1402,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
}
// must load collections outside the lock
loadPlannedCollections(true);
loadCurrentCollections(true);
}
return ServerID("");

View File

@ -183,8 +183,8 @@ namespace triagens {
/// @brief returns the maximal journal size
////////////////////////////////////////////////////////////////////////////////
TRI_voc_size_t maximalSize () const {
return triagens::basics::JsonHelper::getNumericValue<TRI_voc_size_t>(_json, "maximalSize", 0);
TRI_voc_size_t journalSize () const {
return triagens::basics::JsonHelper::getNumericValue<TRI_voc_size_t>(_json, "journalSize", 0);
}
////////////////////////////////////////////////////////////////////////////////
@ -219,6 +219,415 @@ namespace triagens {
};
// -----------------------------------------------------------------------------
// --SECTION-- class CollectionInfoCurrent
// -----------------------------------------------------------------------------
class CollectionInfoCurrent {
friend class ClusterInfo;
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
public:
CollectionInfoCurrent ();
CollectionInfoCurrent (ShardID const&, struct TRI_json_s*);
CollectionInfoCurrent (CollectionInfoCurrent const&);
CollectionInfoCurrent& operator= (CollectionInfoCurrent const&);
~CollectionInfoCurrent ();
private:
void freeAllJsons ();
void copyAllJsons ();
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief add a new shardID and JSON pair, returns true if OK and false
/// if the shardID already exists. In the latter case nothing happens.
/// The CollectionInfoCurrent object takes ownership of the TRI_json_t*.
////////////////////////////////////////////////////////////////////////////////
bool add (ShardID const& shardID, TRI_json_t* json) {
map<ShardID, TRI_json_t*>::iterator it = _jsons.find(shardID);
if (it == _jsons.end()) {
_jsons.insert(make_pair<ShardID, TRI_json_t*>(shardID, json));
return true;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the collection id
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t id () const {
// The id will always be the same in every shard
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.begin();
if (it != _jsons.end()) {
TRI_json_t* _json = it->second;
return triagens::basics::JsonHelper::stringUInt64(_json, "id");
}
else {
return 0;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the collection type
////////////////////////////////////////////////////////////////////////////////
TRI_col_type_e type () const {
// The type will always be the same in every shard
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.begin();
if (it != _jsons.end()) {
TRI_json_t* _json = it->second;
return triagens::basics::JsonHelper::getNumericValue<TRI_col_type_e>
(_json, "type", TRI_COL_TYPE_UNKNOWN);
}
else {
return TRI_COL_TYPE_UNKNOWN;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the collection status for one shardID
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_col_status_e status (ShardID const& shardID) const {
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.find(shardID);
if (it != _jsons.end()) {
TRI_json_t* _json = _jsons.begin()->second;
return triagens::basics::JsonHelper::getNumericValue
<TRI_vocbase_col_status_e>
(_json, "status", TRI_VOC_COL_STATUS_CORRUPTED);
}
return TRI_VOC_COL_STATUS_CORRUPTED;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the collection status for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, TRI_vocbase_col_status_e> status () const {
map<ShardID, TRI_vocbase_col_status_e> m;
map<ShardID, TRI_json_t*>::const_iterator it;
TRI_vocbase_col_status_e s;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
TRI_json_t* _json = it->second;
s = triagens::basics::JsonHelper::getNumericValue
<TRI_vocbase_col_status_e>
(_json, "status", TRI_VOC_COL_STATUS_CORRUPTED);
m.insert(make_pair<ShardID, TRI_vocbase_col_status_e>(it->first,s));
}
return m;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief local helper to return boolean flags
////////////////////////////////////////////////////////////////////////////////
private:
bool getFlag (char const* name, ShardID const& shardID) const {
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.find(shardID);
if (it != _jsons.end()) {
TRI_json_t* _json = _jsons.begin()->second;
return triagens::basics::JsonHelper::getBooleanValue(_json,
name, false);
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief local helper to return a map to boolean
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> getFlag (char const* name ) const {
map<ShardID, bool> m;
map<ShardID, TRI_json_t*>::const_iterator it;
bool b;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
TRI_json_t* _json = it->second;
b = triagens::basics::JsonHelper::getBooleanValue(_json,
name, false);
m.insert(make_pair<ShardID, bool>(it->first,b));
}
return m;
}
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the deleted flag for a shardID
////////////////////////////////////////////////////////////////////////////////
bool deleted (ShardID const& shardID) const {
return getFlag("deleted", shardID);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the deleted flag for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> deleted () const {
return getFlag("deleted");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the doCompact flag for a shardID
////////////////////////////////////////////////////////////////////////////////
bool doCompact (ShardID const& shardID) const {
return getFlag("doCompact", shardID);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the doCompact flag for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> doCompact () const {
return getFlag("doCompact");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the isSystem flag for a shardID
////////////////////////////////////////////////////////////////////////////////
bool isSystem (ShardID const& shardID) const {
return getFlag("isSystem", shardID);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the isSystem flag for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> isSystem () const {
return getFlag("isSystem");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the isVolatile flag for a shardID
////////////////////////////////////////////////////////////////////////////////
bool isVolatile (ShardID const& shardID) const {
return getFlag("isVolatile", shardID);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the isVolatile flag for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> isVolatile () const {
return getFlag("isVolatile");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the error flag for a shardID
////////////////////////////////////////////////////////////////////////////////
bool error (ShardID const& shardID) const {
return getFlag("error", shardID);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the error flag for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> error () const {
return getFlag("error");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the waitForSync flag for a shardID
////////////////////////////////////////////////////////////////////////////////
bool waitForSync (ShardID const& shardID) const {
return getFlag("waitForSync", shardID);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the waitForSync flag for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, bool> waitForSync () const {
return getFlag("waitForSync");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a copy of the key options
/// the caller is responsible for freeing it
////////////////////////////////////////////////////////////////////////////////
TRI_json_t* keyOptions () const {
// The id will always be the same in every shard
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.begin();
if (it != _jsons.end()) {
TRI_json_t* _json = it->second;
TRI_json_t const* keyOptions
= triagens::basics::JsonHelper::getArrayElement
(_json, "keyOptions");
if (keyOptions != 0) {
return TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, keyOptions);
}
return 0;
}
else {
return 0;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the maximal journal size for one shardID
////////////////////////////////////////////////////////////////////////////////
TRI_voc_size_t journalSize (ShardID const& shardID) const {
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.find(shardID);
if (it != _jsons.end()) {
TRI_json_t* _json = _jsons.begin()->second;
return triagens::basics::JsonHelper::getNumericValue
<TRI_voc_size_t> (_json, "journalSize", 0);
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the maximal journal size for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, TRI_voc_size_t> journalSize () const {
map<ShardID, TRI_voc_size_t> m;
map<ShardID, TRI_json_t*>::const_iterator it;
TRI_voc_size_t s;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
TRI_json_t* _json = it->second;
s = triagens::basics::JsonHelper::getNumericValue
<TRI_voc_size_t> (_json, "journalSize", 0);
m.insert(make_pair<ShardID, TRI_voc_size_t>(it->first,s));
}
return m;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the errorNum for one shardID
////////////////////////////////////////////////////////////////////////////////
int errorNum (ShardID const& shardID) const {
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.find(shardID);
if (it != _jsons.end()) {
TRI_json_t* _json = _jsons.begin()->second;
return triagens::basics::JsonHelper::getNumericValue
<int> (_json, "errorNum", 0);
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the errorNum for all shardIDs
////////////////////////////////////////////////////////////////////////////////
map<ShardID, int> errorNum () const {
map<ShardID, int> m;
map<ShardID, TRI_json_t*>::const_iterator it;
TRI_voc_size_t s;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
TRI_json_t* _json = it->second;
s = triagens::basics::JsonHelper::getNumericValue
<int> (_json, "errorNum", 0);
m.insert(make_pair<ShardID, int>(it->first,s));
}
return m;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the shard keys
////////////////////////////////////////////////////////////////////////////////
vector<string> shardKeys () const {
// The shardKeys will always be the same in every shard
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.begin();
if (it != _jsons.end()) {
TRI_json_t* _json = it->second;
TRI_json_t* const node
= triagens::basics::JsonHelper::getArrayElement
(_json, "shardKeys");
return triagens::basics::JsonHelper::stringList(node);
}
else {
vector<string> result;
return result;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the shard ids that are currently in the collection
////////////////////////////////////////////////////////////////////////////////
vector<ShardID> shardIDs () const {
vector<ShardID> v;
map<ShardID, TRI_json_t*>::const_iterator it;
for (it = _jsons.begin(); it != _jsons.end(); ++it) {
v.push_back(it->first);
}
return v;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the responsible server for one shardID
////////////////////////////////////////////////////////////////////////////////
string responsibleServer (ShardID const& shardID) const {
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.find(shardID);
if (it != _jsons.end()) {
TRI_json_t* _json = _jsons.begin()->second;
return triagens::basics::JsonHelper::getStringValue
(_json, "DBserver", "");
}
return string("");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the errorMessage entry for one shardID
////////////////////////////////////////////////////////////////////////////////
string errorMessage (ShardID const& shardID) const {
map<ShardID, TRI_json_t*>::const_iterator it = _jsons.find(shardID);
if (it != _jsons.end()) {
TRI_json_t* _json = _jsons.begin()->second;
return triagens::basics::JsonHelper::getStringValue
(_json, "errorMessage", "");
}
return string("");
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
map<ShardID, TRI_json_t*> _jsons;
};
// -----------------------------------------------------------------------------
// --SECTION-- class ClusterInfo
// -----------------------------------------------------------------------------
@ -230,8 +639,14 @@ namespace triagens {
class ClusterInfo {
private:
typedef std::map<CollectionID, CollectionInfo> DatabaseCollections;
typedef std::map<DatabaseID, DatabaseCollections> AllCollections;
typedef std::map<CollectionID, CollectionInfo>
DatabaseCollections;
typedef std::map<DatabaseID, DatabaseCollections>
AllCollections;
typedef std::map<CollectionID, CollectionInfoCurrent>
DatabaseCollectionsCurrent;
typedef std::map<DatabaseID, DatabaseCollectionsCurrent>
AllCollectionsCurrent;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
@ -370,6 +785,24 @@ namespace triagens {
const std::vector<CollectionInfo> getCollections (DatabaseID const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about current collections from the agency
/// Usually one does not have to call this directly. Note that this is
/// necessarily complicated, since here we have to consider information
/// about all shards of a collection.
////////////////////////////////////////////////////////////////////////////////
void loadCurrentCollections (bool = true);
////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection in current. This returns information about
/// all shards in the collection.
/// If it is not found in the cache, the cache is reloaded once.
////////////////////////////////////////////////////////////////////////////////
CollectionInfoCurrent getCollectionCurrent (DatabaseID const&,
CollectionID const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief create database in coordinator
////////////////////////////////////////////////////////////////////////////////
@ -497,16 +930,25 @@ namespace triagens {
_uniqid;
// Cached data from the agency, we reload whenever necessary:
std::map<DatabaseID, struct TRI_json_s*> _plannedDatabases; // from Plan/Databases
std::map<DatabaseID, std::map<ServerID, struct TRI_json_s*> > _currentDatabases; // from Current/Databases
std::map<DatabaseID, struct TRI_json_s*> _plannedDatabases;
// from Plan/Databases
std::map<DatabaseID, std::map<ServerID, struct TRI_json_s*> >
_currentDatabases; // from Current/Databases
AllCollections _collections; // from Current/Collections/
AllCollections _collections;
// from Plan/Collections/
bool _collectionsValid;
std::map<ServerID, std::string> _servers; // from Current/ServersRegistered
AllCollectionsCurrent _collectionsCurrent;
// from Current/Collections/
bool _collectionsCurrentValid;
std::map<ServerID, std::string> _servers;
// from Current/ServersRegistered
bool _serversValid;
std::map<ServerID, ServerID> _DBServers; // from Current/DBServers
std::map<ServerID, ServerID> _DBServers;
// from Current/DBServers
bool _DBServersValid;
std::map<ShardID, ServerID> _shardIds; // from Current/ShardLocation
std::map<ShardID, ServerID> _shardIds;
// from Plan/Collections/ ???
// -----------------------------------------------------------------------------
// --SECTION-- private static variables

View File

@ -745,7 +745,7 @@ static v8::Handle<v8::Value> JS_FlushClusterInfo (v8::Arguments const& argv) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the responsible server
/// @brief get the info about a collection in Plan
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_GetCollectionInfoClusterInfo (v8::Arguments const& argv) {
@ -766,6 +766,17 @@ static v8::Handle<v8::Value> JS_GetCollectionInfoClusterInfo (v8::Arguments cons
result->Set(v8::String::New("type"), v8::Number::New((int) ci.type()));
result->Set(v8::String::New("status"), v8::Number::New((int) ci.status()));
const string statusString = ci.statusString();
result->Set(v8::String::New("statusString"),
v8::String::New(statusString.c_str(), statusString.size()));
result->Set(v8::String::New("deleted"), v8::Boolean::New(ci.deleted()));
result->Set(v8::String::New("doCompact"), v8::Boolean::New(ci.doCompact()));
result->Set(v8::String::New("isSystem"), v8::Boolean::New(ci.isSystem()));
result->Set(v8::String::New("isVolatile"), v8::Boolean::New(ci.isVolatile()));
result->Set(v8::String::New("waitForSync"), v8::Boolean::New(ci.waitForSync()));
result->Set(v8::String::New("journalSize"), v8::Number::New(ci.journalSize()));
const std::vector<std::string>& sks = ci.shardKeys();
v8::Handle<v8::Array> shardKeys = v8::Array::New(sks.size());
for (uint32_t i = 0, n = sks.size(); i < n; ++i) {
@ -789,6 +800,71 @@ static v8::Handle<v8::Value> JS_GetCollectionInfoClusterInfo (v8::Arguments cons
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the info about a collection in Current
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_GetCollectionInfoCurrentClusterInfo (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 3) {
TRI_V8_EXCEPTION_USAGE(scope, "getCollectionInfoCurrent(<database-id>, <collection-id>, <shardID>)");
}
ShardID shardID = TRI_ObjectToString(argv[2]);
CollectionInfo ci = ClusterInfo::instance()->getCollection(
TRI_ObjectToString(argv[0]),
TRI_ObjectToString(argv[1]));
v8::Handle<v8::Object> result = v8::Object::New();
// First some stuff from Plan for which Current does not make sense:
const std::string cid = triagens::basics::StringUtils::itoa(ci.id());
const std::string& name = ci.name();
result->Set(v8::String::New("id"), v8::String::New(cid.c_str(), cid.size()));
result->Set(v8::String::New("name"), v8::String::New(name.c_str(), name.size()));
CollectionInfoCurrent cic = ClusterInfo::instance()->getCollectionCurrent(
TRI_ObjectToString(argv[0]), cid);
result->Set(v8::String::New("type"), v8::Number::New((int) ci.type()));
// Now the Current information, if we actually got it:
TRI_vocbase_col_status_e s = cic.status(shardID);
result->Set(v8::String::New("status"), v8::Number::New((int) cic.status(shardID)));
if (s == TRI_VOC_COL_STATUS_CORRUPTED) {
return scope.Close(result);
}
const string statusString = TRI_GetStatusStringCollectionVocBase(s);
result->Set(v8::String::New("statusString"),
v8::String::New(statusString.c_str(), statusString.size()));
result->Set(v8::String::New("deleted"), v8::Boolean::New(cic.deleted(shardID)));
result->Set(v8::String::New("doCompact"), v8::Boolean::New(cic.doCompact(shardID)));
result->Set(v8::String::New("isSystem"), v8::Boolean::New(cic.isSystem(shardID)));
result->Set(v8::String::New("isVolatile"), v8::Boolean::New(cic.isVolatile(shardID)));
result->Set(v8::String::New("waitForSync"), v8::Boolean::New(cic.waitForSync(shardID)));
result->Set(v8::String::New("journalSize"), v8::Number::New(cic.journalSize(shardID)));
const std::string serverID = cic.responsibleServer(shardID);
result->Set(v8::String::New("responsibleServer"),
v8::String::New(serverID.c_str(), serverID.size()));
// TODO: fill "indexes"
v8::Handle<v8::Array> indexes = v8::Array::New();
result->Set(v8::String::New("indexes"), indexes);
// Finally, report any possible error:
bool error = cic.error(shardID);
result->Set(v8::String::New("error"), v8::Boolean::New(error));
if (error) {
result->Set(v8::String::New("errorNum"), v8::Number::New(cic.errorNum(shardID)));
const string errorMessage = cic.errorMessage(shardID);
result->Set(v8::String::New("errorMessage"),
v8::String::New(errorMessage.c_str(), errorMessage.size()));
}
return scope.Close(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the responsible server
////////////////////////////////////////////////////////////////////////////////
@ -1595,6 +1671,7 @@ void TRI_InitV8Cluster (v8::Handle<v8::Context> context) {
TRI_AddMethodVocbase(rt, "listDatabases", JS_ListDatabases);
TRI_AddMethodVocbase(rt, "flush", JS_FlushClusterInfo, true);
TRI_AddMethodVocbase(rt, "getCollectionInfo", JS_GetCollectionInfoClusterInfo);
TRI_AddMethodVocbase(rt, "getCollectionInfoCurrent", JS_GetCollectionInfoCurrentClusterInfo);
TRI_AddMethodVocbase(rt, "getResponsibleServer", JS_GetResponsibleServerClusterInfo);
TRI_AddMethodVocbase(rt, "getServerEndpoint", JS_GetServerEndpointClusterInfo);
TRI_AddMethodVocbase(rt, "getDBServers", JS_GetDBServers);

View File

@ -28,6 +28,7 @@
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
(function() {
"use strict";
@ -36,7 +37,11 @@ var FoxxController = require("org/arangodb/foxx").Controller,
controller = new FoxxController(applicationContext),
_ = require("underscore"),
Communication = require("org/arangodb/sharding/agency-communication"),
comm = new Communication.Communication();
comm = new Communication.Communication(),
beats = comm.sync.Heartbeats(),
servers = comm.current.DBServers(),
dbs = comm.current.Databases(),
coords = comm.current.Coordinators();
/** Get all DBServers
*
@ -44,42 +49,11 @@ var FoxxController = require("org/arangodb/foxx").Controller,
* within the cluster
*/
controller.get("/DBServers", function(req, res) {
var list = {
Pavel: {
role: "primary",
secondary: "Sally",
address: "tcp://192.168.0.1:1337"
},
Pancho: {
role: "primary",
secondary: "none",
address: "tcp://192.168.0.2:1337"
},
Pablo: {
role: "primary",
secondary: "Sandy",
address: "tcp://192.168.0.5:1337"
},
Sally: {
role: "secondary",
address: "tcp://192.168.1.1:1337"
},
Sandy: {
role: "secondary",
address: "tcp://192.168.1.5:1337"
}
},
noBeat = ["Sandy"],
serving = ["Pancho", "Pavel"],
beats = comm.sync.Heartbeats(),
resList = [];
list = comm.current.DBServers().getList();
noBeat = beats.noBeat();
var resList = [],
list = servers.getList(),
noBeat = beats.noBeat(),
serving = beats.getServing();
_.each(list, function(v, k) {
v.name = k;
resList.push(v);
@ -95,3 +69,34 @@ controller.get("/DBServers", function(req, res) {
});
res.json(resList);
});
controller.get("/Databases", function(req, res) {
var list = dbs.getList();
res.json(_.map(list, function(d) {
return {name: d};
}));
});
controller.get("/:dbname/Collections", function(req, res) {
var dbname = req.params("dbname"),
selected = dbs.select(dbname);
res.json(_.map(selected.getCollections(),
function(c) {
return {name: c};
})
);
});
controller.get("/:dbname/:colname/Shards/:servername", function(req, res) {
var dbname = req.params("dbname"),
colname = req.params("colname"),
servername = req.params("servername"),
selected = dbs.select(dbname).collection(colname);
res.json(_.map(selected.getShardsForServer(servername),
function(c) {
return {id: c};
})
);
});
}());

View File

@ -5,9 +5,14 @@
window.ClusterCollections = Backbone.Collection.extend({
model: window.ClusterCollection,
url: "/_admin/aardvark/cluster/Collections",
url: function() {
return "/_admin/aardvark/cluster/"
+ this.dbname + "/"
+ "Collections";
},
getList: function() {
getList: function(db) {
this.dbname = db;
this.fetch({
async: false
});

View File

@ -8,9 +8,18 @@
model: window.ClusterShard,
url: "/_admin/aardvark/cluster/Shards",
url: function() {
return "/_admin/aardvark/cluster/"
+ this.dbname + "/"
+ this.colname + "/"
+ "Shards/"
+ this.server;
},
getList: function() {
getList: function(dbname, colname, server) {
this.dbname = dbname;
this.colname = colname;
this.server = server;
this.fetch({
async: false
});

View File

@ -13,7 +13,7 @@
<ul>
<% _.each(shards, function(v) { %>
<li>
<button id="<%=v.name%>" class="btn btn-server btn-<%=statusClass(v.status)%> shard"><%=v.name%></button>
<button id="<%=v.id%>" class="btn btn-server btn-<%=statusClass(v.status)%> shard"><%=v.id%></button>
</li>
<% }); %>
</ul>

View File

@ -22,9 +22,11 @@
loadCollection: function(e) {
var id = e.currentTarget.id;
this.shardsView.render({
name: id
});
this.shardsView.render(
this.db,
id,
this.server
);
},
unrender: function() {
@ -32,9 +34,11 @@
this.shardsView.unrender();
},
render: function() {
render: function(db, server) {
this.db = db;
this.server = server;
$(this.el).html(this.template.render({
collections: this.collection.getList()
collections: this.collection.getList(this.db)
}));
this.shardsView.unrender();
return this;

View File

@ -22,7 +22,7 @@
loadDatabase: function(e) {
var id = e.currentTarget.id;
this.colView.render(id);
this.colView.render(id, this.server);
},
unrender: function() {
@ -30,7 +30,8 @@
this.colView.unrender();
},
render: function(){
render: function(server) {
this.server = server;
$(this.el).html(this.template.render({
databases: this.collection.getList()
}));

View File

@ -22,9 +22,7 @@
loadServer: function(e) {
var id = e.currentTarget.id;
this.dbView.render({
name: id
});
this.dbView.render(id);
this.render(true);
},

View File

@ -14,9 +14,9 @@
$(this.el).html("");
},
render: function() {
render: function(db, col, server) {
$(this.el).html(this.template.render({
shards: this.collection.getList()
shards: this.collection.getList(db, col, server)
}));
return this;
}

View File

@ -381,6 +381,7 @@ function createLocalCollections (plannedCollections) {
payload.errorMessage = err2.errorMessage;
}
payload.DBserver = ourselves;
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, payload ]);
@ -415,6 +416,7 @@ function createLocalCollections (plannedCollections) {
payload.errorMessage = err3.errorMessage;
}
payload.DBserver = ourselves;
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, payload ]);

View File

@ -39,7 +39,10 @@ exports.Communication = function() {
storeServersInCache,
Target,
mapCollectionIDsToNames,
updateCollectionRouteForName,
updateDatabaseRoutes,
difference,
self = this,
_ = require("underscore");
splitServerName = function(route) {
@ -135,17 +138,17 @@ exports.Communication = function() {
var target = addLevel(this, "target", "Target");
addLevel(target, "dbServers", "DBServers", ["get", "set", "remove", "checkVersion"]);
addLevel(target, "db", "Collections", ["list"]);
addLevelsForDBs(target.db, true);
//addLevelsForDBs(target.db, true);
addLevel(target, "coordinators", "Coordinators", ["list", "set", "remove", "checkVersion"]);
var plan = addLevel(this, "plan", "Plan");
addLevel(plan, "dbServers", "DBServers", ["get", "checkVersion"]);
addLevel(plan, "db", "Collections", ["list"]);
addLevelsForDBs(plan.db);
//addLevelsForDBs(plan.db);
addLevel(plan, "coordinators", "Coordinators", ["list", "checkVersion"]);
var current = addLevel(this, "current", "Current");
addLevel(current, "dbServers", "DBServers", ["get", "checkVersion"]);
addLevel(current, "db", "Collections", ["list"]);
addLevelsForDBs(current.db);
//addLevelsForDBs(current.db);
addLevel(current, "coordinators", "Coordinators", ["list", "checkVersion"]);
addLevel(current, "registered", "ServersRegistered", ["get", "checkVersion"]);
@ -153,6 +156,7 @@ exports.Communication = function() {
addLevel(sync, "beat", "ServerStates", ["get"]);
addLevel(sync, "interval", "HeartbeatIntervalMs", ["get"]);
this.addLevel = addLevel;
};
agency = new AgencyWrapper();
@ -162,6 +166,33 @@ exports.Communication = function() {
// --SECTION-- Helper Functions
// -----------------------------------------------------------------------------
updateDatabaseRoutes = function(base, writeAccess) {
var list = self.plan.Databases().getList();
_.each(_.keys(base), function(k) {
if (k !== "route" && k !== "list") {
delete base[k];
}
});
_.each(list, function(d) {
agency.addLevel(base, d, d, ["get", "checkVersion"]);
});
};
updateCollectionRouteForName = function(route, db, name, writeAccess) {
var list = self.plan.Databases().select(db).getCollectionObjects();
var cId = null;
_.each(list, function(v, k) {
if (v.name === name) {
cId = splitServerName(k);
}
});
var acts = ["get"];
if (writeAccess) {
acts.push("set");
}
agency.addLevel(route, name, cId, acts);
};
////////////////////////////////////////////////////////////////////////////////
/// @brief Stores database servers in cache
///
@ -318,18 +349,27 @@ exports.Communication = function() {
/// It allos to get a list of collections and to select one of them for
/// further information.
////////////////////////////////////////////////////////////////////////////////
var DBObject = function(route, writeAccess) {
var DBObject = function(route, db, writeAccess) {
var cache;
var getList = function() {
var getRaw = function() {
if (!cache || !route.checkVersion()) {
cache = _.keys(mapCollectionIDsToNames(route.get(true))).sort();
cache = route.get(true);
}
return cache;
};
var getList = function() {
return _.keys(mapCollectionIDsToNames(
self.plan.Databases().select(db).getCollectionObjects()
)).sort();
};
this.getCollectionObjects = function() {
return getRaw();
};
this.getCollections = function() {
return getList();
};
this.collection = function(name) {
updateCollectionRouteForName(route, db, name, writeAccess);
var colroute = route[name];
if (!colroute) {
return false;
@ -350,11 +390,12 @@ exports.Communication = function() {
return route.list();
};
this.select = function(name) {
updateDatabaseRoutes(route, writeAccess);
var subroute = route[name];
if (!subroute) {
return false;
}
return new DBObject(subroute, writeAccess);
return new DBObject(subroute, name, writeAccess);
};
};

View File

@ -101,6 +101,42 @@
"11235": {name: "s"},
"6512": {name: "a"},
"123": {name: "d"}
},
current: {
_system: {
"98213": {
"sg1": {},
"sg2": {},
"sg3": {}
},
"87123": {
"sv1": {},
"sv2": {},
"sv3": {}
},
"89123": {
"se1": {},
"se2": {},
"se3": {}
}
},
a_db: {
"11235": {
"s01": {},
"s02": {},
"s03": {}
},
"6512": {
"s11": {},
"s12": {},
"s13": {}
},
"123": {
"s21": {},
"s22": {},
"s23": {}
}
}
}
};
var ips = {
@ -162,8 +198,8 @@
dummy.current.coordinators = createResult([agencyRoutes.current, agencyRoutes.sub.coords], coordinators);
dummy.current.registered = createResult([agencyRoutes.current, agencyRoutes.sub.registered], ips);
dummy.current.databases = databases;
dummy.current.syscollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "_system"], collections._system);
dummy.current.acollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "a_db"], collections.a_db);
dummy.current.syscollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "_system"], collections.current._system);
dummy.current.acollections = createResult([agencyRoutes.current, agencyRoutes.sub.databases, agencyRoutes.sub.colls, "a_db"], collections.current.a_db);
dummy.current.vInfo = vInfo;
dummy.sync = {};
@ -240,9 +276,9 @@
}
break;
default:
fail();
fail("Requested route: GET " + route);
}
fail();
fail("Requested route: GET " + route);
},
list: function(route, recursive, flat) {
var parts = route.split("/");
@ -280,7 +316,7 @@
}
break;
default:
fail();
fail("Requested route: LIST " + route);
}
}
};
@ -631,6 +667,7 @@
assertTrue(wasCalled, "Agency has not been informed to move shard..");
assertEqual(colV.getServerForShard(shard), target);
}
};
};
@ -777,6 +814,7 @@
assertEqual(colV.getServerForShard("v1"), "pavel");
assertEqual(colV.getServerForShard("v2"), "paul");
}
};
};
@ -927,6 +965,7 @@
assertEqual(colV.getServerForShard("v1"), "pavel");
assertEqual(colV.getServerForShard("v2"), "paul");
}
};
};