1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Michael Hackstein 2014-01-15 17:14:35 +01:00
commit 4a2d874678
7 changed files with 245 additions and 52 deletions

View File

@ -340,6 +340,8 @@ ClusterInfo* ClusterInfo::instance () {
ClusterInfo::ClusterInfo ()
: _agency(),
_uniqid(),
_plannedDatabases(),
_currentDatabases(),
_collectionsValid(false),
_serversValid(false),
_DBServersValid(false) {
@ -353,6 +355,8 @@ ClusterInfo::ClusterInfo ()
////////////////////////////////////////////////////////////////////////////////
ClusterInfo::~ClusterInfo () {
clearPlannedDatabases();
clearCurrentDatabases();
}
// -----------------------------------------------------------------------------
@ -400,33 +404,50 @@ void ClusterInfo::flush () {
_collections.clear();
_servers.clear();
_shardIds.clear();
clearPlannedDatabases();
clearCurrentDatabases();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ask whether a cluster database exists
////////////////////////////////////////////////////////////////////////////////
bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) {
bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID,
bool reload) {
int tries = 0;
if (! _collectionsValid) {
loadCurrentCollections();
if (reload) {
loadPlannedDatabases();
loadCurrentDatabases();
loadCurrentDBServers();
++tries;
}
while (++tries <= 2) {
{
READ_LOCKER(_lock);
// look up database by id
AllCollections::const_iterator it = _collections.find(databaseID);
const size_t expectedSize = _DBServers.size();
if (it != _collections.end()) {
return true;
// look up database by name
std::map<DatabaseID, TRI_json_t*>::const_iterator it = _plannedDatabases.find(databaseID);
if (it != _plannedDatabases.end()) {
// found the database in Plan
std::map<DatabaseID, std::map<ServerID, TRI_json_t*> >::const_iterator it2 = _currentDatabases.find(databaseID);
if (it2 != _currentDatabases.end()) {
// found the database in Current
return ((*it2).second.size() >= expectedSize);
}
}
}
// must load collections outside the lock
loadCurrentCollections();
loadPlannedDatabases();
loadCurrentDatabases();
loadCurrentDBServers();
}
return false;
@ -436,18 +457,77 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) {
/// @brief get list of databases in the cluster
////////////////////////////////////////////////////////////////////////////////
vector<DatabaseID> ClusterInfo::listDatabases () {
vector<DatabaseID> res;
vector<DatabaseID> ClusterInfo::listDatabases (bool reload) {
vector<DatabaseID> result;
if (! _collectionsValid) {
loadCurrentCollections();
if (reload) {
loadPlannedDatabases();
loadCurrentDatabases();
loadCurrentDBServers();
}
AllCollections::const_iterator it;
for (it = _collections.begin(); it != _collections.end(); ++it) {
res.push_back(it->first);
READ_LOCKER(_lock);
const size_t expectedSize = _DBServers.size();
std::map<DatabaseID, TRI_json_t*>::const_iterator it = _plannedDatabases.begin();
while (it != _plannedDatabases.end()) {
std::map<DatabaseID, std::map<ServerID, TRI_json_t*> >::const_iterator it2 = _currentDatabases.find((*it).first);
if (it2 != _currentDatabases.end()) {
if ((*it2).second.size() >= expectedSize) {
result.push_back((*it).first);
}
}
++it;
}
return res;
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of planned databases
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::clearPlannedDatabases () {
std::map<DatabaseID, TRI_json_t*>::iterator it = _plannedDatabases.begin();
while (it != _plannedDatabases.end()) {
TRI_json_t* json = (*it).second;
if (json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
++it;
}
_plannedDatabases.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of current databases
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::clearCurrentDatabases () {
std::map<DatabaseID, std::map<ServerID, TRI_json_t*> >::iterator it = _currentDatabases.begin();
while (it != _currentDatabases.end()) {
std::map<ServerID, TRI_json_t*>::iterator it2 = (*it).second.begin();
while (it2 != (*it).second.end()) {
TRI_json_t* json = (*it2).second;
if (json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
++it2;
}
++it;
}
_currentDatabases.clear();
}
////////////////////////////////////////////////////////////////////////////////
@ -472,7 +552,7 @@ void ClusterInfo::loadPlannedDatabases () {
result.parse(prefix + "/", false);
WRITE_LOCKER(_lock);
_plannedDatabases.clear();
clearPlannedDatabases();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
@ -483,6 +563,71 @@ void ClusterInfo::loadPlannedDatabases () {
// steal the json
(*it).second._json = 0;
_plannedDatabases.insert(std::make_pair<DatabaseID, TRI_json_t*>(name, options));
++it;
}
return;
}
LOG_TRACE("Error while loading %s", prefix.c_str());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about current databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadCurrentDatabases () {
static const std::string prefix = "Current/Databases";
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
result.parse(prefix + "/", false);
WRITE_LOCKER(_lock);
clearCurrentDatabases();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
while (it != result._values.end()) {
const std::string key = (*it).first;
// each entry consists of a database id and a collection id, separated by '/'
std::vector<std::string> parts = triagens::basics::StringUtils::split(key, '/');
if (parts.empty()) {
++it;
continue;
}
const std::string database = parts[0];
std::map<std::string, std::map<ServerID, TRI_json_t*> >::iterator it2 = _currentDatabases.find(database);
if (it2 == _currentDatabases.end()) {
// insert an empty list for this database
std::map<ServerID, TRI_json_t*> empty;
it2 = _currentDatabases.insert(std::make_pair<DatabaseID, std::map<ServerID, TRI_json_t*> >(database, empty)).first;
}
if (parts.size() == 2) {
// got a server name
TRI_json_t* json = (*it).second._json;
// steal the JSON
(*it).second._json = 0;
(*it2).second.insert(std::make_pair<ServerID, TRI_json_t*>(parts[1], json));
}
++it;
}
return;
@ -519,7 +664,7 @@ void ClusterInfo::loadCurrentCollections () {
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
for (; it != result._values.end(); ++it) {
const std::string& key = (*it).first;
const std::string key = (*it).first;
// each entry consists of a database id and a collection id, separated by '/'
std::vector<std::string> parts = triagens::basics::StringUtils::split(key, '/');
@ -530,8 +675,8 @@ void ClusterInfo::loadCurrentCollections () {
continue;
}
const std::string& database = parts[0];
const std::string& collection = parts[1];
const std::string database = parts[0];
const std::string collection = parts[1];
// check whether we have created an entry for the database already
AllCollections::iterator it2 = _collections.find(database);
@ -811,12 +956,17 @@ std::vector<ServerID> ClusterInfo::getCurrentDBServers () {
loadCurrentDBServers();
}
std::vector<ServerID> res;
std::map<ServerID, ServerID>::iterator i;
for (i = _DBServers.begin(); i != _DBServers.end(); ++i) {
res.push_back(i->first);
std::vector<ServerID> result;
READ_LOCKER(_lock);
std::map<ServerID, ServerID>::iterator it = _DBServers.begin();
while (it != _DBServers.end()) {
result.push_back((*it).first);
it++;
}
return res;
return result;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -287,13 +287,14 @@ namespace triagens {
/// @brief ask whether a cluster database exists
////////////////////////////////////////////////////////////////////////////////
bool doesDatabaseExist (DatabaseID const& databaseID);
bool doesDatabaseExist (DatabaseID const&,
bool = false);
////////////////////////////////////////////////////////////////////////////////
/// @brief get list of databases in the cluster
////////////////////////////////////////////////////////////////////////////////
vector<DatabaseID> listDatabases ();
vector<DatabaseID> listDatabases (bool = false);
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about collections from the agency
@ -302,6 +303,18 @@ namespace triagens {
void loadCurrentCollections ();
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of planned databases
////////////////////////////////////////////////////////////////////////////////
void clearPlannedDatabases ();
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of current databases
////////////////////////////////////////////////////////////////////////////////
void clearCurrentDatabases ();
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
@ -309,6 +322,13 @@ namespace triagens {
void loadPlannedDatabases ();
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about current databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void loadCurrentDatabases ();
////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection
/// If it is not found in the cache, the cache is reloaded once.
@ -401,6 +421,8 @@ namespace triagens {
// Cached data from the agency, we reload whenever necessary:
std::map<DatabaseID, struct TRI_json_s*> _plannedDatabases; // from Plan/Databases
std::map<DatabaseID, std::map<ServerID, struct TRI_json_s*> > _currentDatabases; // from Current/Databases
AllCollections _collections; // from Current/Collections/
bool _collectionsValid;
std::map<ServerID, std::string> _servers; // from Current/ServersRegistered

View File

@ -681,7 +681,7 @@ static v8::Handle<v8::Value> JS_DoesDatabaseExistClusterInfo (v8::Arguments cons
TRI_V8_EXCEPTION_USAGE(scope, "doesDatabaseExist(<database-id>)");
}
const bool result = ClusterInfo::instance()->doesDatabaseExist(TRI_ObjectToString(argv[0]));
const bool result = ClusterInfo::instance()->doesDatabaseExist(TRI_ObjectToString(argv[0]), true);
return scope.Close(v8::Boolean::New(result));
}
@ -694,10 +694,10 @@ static v8::Handle<v8::Value> JS_ListDatabases (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 0) {
TRI_V8_EXCEPTION_USAGE(scope, "doesDatabaseExist()");
TRI_V8_EXCEPTION_USAGE(scope, "listDatabases()");
}
vector<DatabaseID> res = ClusterInfo::instance()->listDatabases();
vector<DatabaseID> res = ClusterInfo::instance()->listDatabases(true);
v8::Handle<v8::Array> a = v8::Array::New(res.size());
vector<DatabaseID>::iterator it;
int count = 0;
@ -1634,8 +1634,7 @@ void TRI_InitV8Cluster (v8::Handle<v8::Context> context) {
TRI_AddMethodVocbase(rt, "drop", JS_Drop);
v8g->ClusterCommTempl = v8::Persistent<v8::ObjectTemplate>::New(isolate, rt);
TRI_AddGlobalFunctionVocbase(context, "ArangoClusterCommCtor",
ft->GetFunction());
TRI_AddGlobalFunctionVocbase(context, "ArangoClusterCommCtor", ft->GetFunction(), true);
// register the global object
ss = v8g->ClusterCommTempl->NewInstance();

View File

@ -7631,10 +7631,12 @@ static v8::Handle<v8::Value> JS_CompletionsVocbase (v8::Arguments const& argv) {
if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName();
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) {
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
if (ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) {
names = GetCollectionNamesCluster(vocbase, originalDatabase);
}
else {
TRI_InitVectorString(&names, TRI_UNKNOWN_MEM_ZONE);
}
names = GetCollectionNamesCluster(vocbase, originalDatabase);
}
else {
names = TRI_CollectionNamesVocBase(vocbase);
@ -8216,7 +8218,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
if (argv.Length() == 0) {
ci->loadCurrentCollections();
vector<DatabaseID> list = ci->listDatabases();
vector<DatabaseID> list = ci->listDatabases(true);
v8::Handle<v8::Array> result = v8::Array::New();
for (size_t i = 0; i < list.size(); ++i) {
result->Set((uint32_t) i, v8::String::New(list[i].c_str(),

12
arangom
View File

@ -29,34 +29,34 @@ function set() {
value=$2
if [ "x$value" == "x" ] ; then
echo "Creating directory $PREFIX$key"
$CURL -X PUT "$URL$PREFIX$key?dir=true" > /dev/null || exit 1
$CURL -X PUT -L "$URL$PREFIX$key?dir=true" > /dev/null || exit 1
else
echo "Setting key $PREFIX$key to value $value"
$CURL -X PUT "$URL$PREFIX$key" -d "value=$value" > /dev/null || exit 1
$CURL -X PUT -L "$URL$PREFIX$key" -d "value=$value" > /dev/null || exit 1
fi
}
if [ "$1" == "init" ] ; then
$CURL -X DELETE "$URL$PREFIX?recursive=true" > /dev/null
$CURL -X DELETE -L "$URL$PREFIX?recursive=true" > /dev/null
set Target/MapLocalToID
set Target/MapIDToEndpoint
set Target/Version 1
set Target/Version "\"1\""
set Target/Lock "\"UNLOCKED\""
set Target/DBServers
set Target/Coordinators
set Target/Databases/@Usystem "{}"
set Target/Collections/@Usystem
set Plan/Version 1
set Plan/Version "\"1\""
set Plan/Lock "\"UNLOCKED\""
set Plan/DBServers
set Plan/Coordinators
set Plan/Databases/@Usystem "{}"
set Plan/Collections/@Usystem
set Current/Version 1
set Current/Version "\"1\""
set Current/Lock "\"UNLOCKED\""
set Current/DBServers
set Current/Coordinators

View File

@ -17,6 +17,8 @@ curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Per
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Pavel" -d 'value={"name":"system"}}' || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Databases/@Usystem/Perry" -d 'value={"name":"system"}}' || exit 1
echo
echo start arangod with:

View File

@ -224,20 +224,38 @@ function ClusterEnabledSuite () {
////////////////////////////////////////////////////////////////////////////////
testDoesDatabaseExist : function () {
var collection = {
id: "123",
name: "mycollection",
type: 2,
status: 3, // LOADED
shardKeys: [ "_key" ],
shards: { "s1" : "myself", "s2" : "other" }
var database = {
name: "test"
};
assertTrue(agency.set("Current/Collections/test/" + collection.id, collection));
assertTrue(agency.set("Plan/Databases/" + database.name, database));
assertTrue(agency.set("Current/DBServers/Foo", "Bar"));
assertTrue(agency.set("Current/DBServers/Barz", "Bat"));
assertTrue(agency.set("Current/Databases/test/Foo", database));
assertTrue(agency.set("Current/Databases/test/Barz", database));
assertTrue(ci.doesDatabaseExist("test"));
assertFalse(ci.doesDatabaseExist("UnitTestsAgencyNonExisting"));
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test doesDatabaseExist
////////////////////////////////////////////////////////////////////////////////
testDoesDatabaseExistNotReady : function () {
var database = {
name: "test"
};
assertTrue(agency.set("Plan/Databases/" + database.name, database));
assertTrue(agency.set("Current/DBServers/Foo", "Bar"));
assertTrue(agency.set("Current/DBServers/Barz", "Bat"));
assertTrue(agency.set("Current/Databases/test/Foo", database));
assertFalse(ci.doesDatabaseExist("test"));
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test getCollectionInfo
////////////////////////////////////////////////////////////////////////////////