1
0
Fork 0

Merge branch 'sharding' of ssh://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Max Neunhoeffer 2014-01-15 11:09:07 +01:00
commit 03d7ff6571
5 changed files with 104 additions and 55 deletions

View File

@ -329,10 +329,7 @@ ClusterInfo::ClusterInfo ()
_DBServersValid(false) {
_uniqid._currentValue = _uniqid._upperValue = 0ULL;
// Actual loading is postponed until necessary:
// loadServers();
// loadDBServers();
// loadCollections();
// Actual loading into caches is postponed until necessary
}
////////////////////////////////////////////////////////////////////////////////
@ -397,7 +394,7 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) {
int tries = 0;
if (! _collectionsValid) {
loadCollections();
loadCurrentCollections();
++tries;
}
@ -412,8 +409,8 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) {
}
}
// must call loadCollections outside the lock
loadCollections();
// must load collections outside the lock
loadCurrentCollections();
}
return false;
@ -427,7 +424,7 @@ vector<DatabaseID> ClusterInfo::listDatabases () {
vector<DatabaseID> res;
if (! _collectionsValid) {
loadCollections();
loadCurrentCollections();
}
AllCollections::const_iterator it;
@ -437,27 +434,71 @@ vector<DatabaseID> ClusterInfo::listDatabases () {
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadPlannedDatabases () {
static const std::string prefix = "Plan/Databases";
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
std::map<std::string, std::string> databases;
if (result.flattenJson(databases, prefix + "/", false)) {
LOG_TRACE("%s loaded successfully", prefix.c_str());
WRITE_LOCKER(_lock);
_plannedDatabases.clear();
std::map<std::string, std::string>::const_iterator it;
for (it = databases.begin(); it != databases.end(); ++it) {
const std::string& name = (*it).first;
TRI_json_t* options = JsonHelper::fromString((*it).second);
_plannedDatabases.insert(std::make_pair<DatabaseID, TRI_json_t*>(name, options));
}
return;
}
}
LOG_TRACE("Error while loading %s", prefix.c_str());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about collections from the agency
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadCollections () {
void ClusterInfo::loadCurrentCollections () {
static const std::string prefix = "Current/Collections";
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues("Current/Collections", true);
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
std::map<std::string, std::string> collections;
if (result.flattenJson(collections, "Current/Collections/", false)) {
LOG_TRACE("Current/Collections loaded successfully");
if (result.flattenJson(collections, prefix + "/", false)) {
LOG_TRACE("%s loaded successfully", prefix.c_str());
WRITE_LOCKER(_lock);
_collections.clear();
@ -516,7 +557,7 @@ void ClusterInfo::loadCollections () {
}
}
LOG_TRACE("Error while loading Current/Collections");
LOG_TRACE("Error while loading %s", prefix.c_str());
_collectionsValid = false;
}
@ -527,12 +568,13 @@ void ClusterInfo::loadCollections () {
CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID,
CollectionID const& collectionID) {
if (! _collectionsValid) {
loadCollections();
}
int tries = 0;
if (! _collectionsValid) {
loadCurrentCollections();
++tries;
}
while (++tries <= 2) {
{
READ_LOCKER(_lock);
@ -549,8 +591,8 @@ CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID,
}
}
// must call loadCollections outside the lock
loadCollections();
// must load collections outside the lock
loadCurrentCollections();
}
return CollectionInfo();
@ -603,7 +645,7 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
std::vector<CollectionInfo> result;
// always reload
loadCollections();
loadCurrentCollections();
READ_LOCKER(_lock);
// look up database by id
@ -635,21 +677,23 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadServers () {
static const std::string prefix = "Current/ServersRegistered";
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues("Current/ServersRegistered", true);
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
std::map<std::string, std::string> servers;
if (result.flattenJson(servers, "Current/ServersRegistered/", false)) {
LOG_TRACE("Current/ServersRegistered loaded successfully");
if (result.flattenJson(servers, prefix + "/", false)) {
LOG_TRACE("%s loaded successfully", prefix.c_str());
WRITE_LOCKER(_lock);
_servers.clear();
@ -665,7 +709,7 @@ void ClusterInfo::loadServers () {
}
}
LOG_TRACE("Error while loading Current/ServersRegistered");
LOG_TRACE("Error while loading %s", prefix.c_str());
_serversValid = false;
@ -679,13 +723,13 @@ void ClusterInfo::loadServers () {
////////////////////////////////////////////////////////////////////////////////
std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) {
int tries = 0;
if (! _serversValid) {
loadServers();
tries++;
}
int tries = 0;
while (++tries <= 2) {
{
READ_LOCKER(_lock);
@ -708,22 +752,24 @@ std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) {
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadDBServers () {
void ClusterInfo::loadCurrentDBServers () {
static const std::string prefix = "Current/DBServers";
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
if (locker.successful()) {
result = _agency.getValues("Current/DBServers", true);
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
std::map<std::string, std::string> servers;
if (result.flattenJson(servers, "Current/DBServers/", false)) {
LOG_TRACE("Current/DBServers loaded successfully");
if (result.flattenJson(servers, prefix + "/", false)) {
LOG_TRACE("%s loaded successfully", prefix.c_str());
WRITE_LOCKER(_lock);
_DBServers.clear();
@ -739,7 +785,7 @@ void ClusterInfo::loadDBServers () {
}
}
LOG_TRACE("Error while loading Current/DBServers");
LOG_TRACE("Error while loading %s", prefix.c_str());
_DBServersValid = false;
@ -751,9 +797,9 @@ void ClusterInfo::loadDBServers () {
/// currently registered
////////////////////////////////////////////////////////////////////////////////
std::vector<ServerID> ClusterInfo::getDBServers () {
std::vector<ServerID> ClusterInfo::getCurrentDBServers () {
if (! _DBServersValid) {
loadDBServers();
loadCurrentDBServers();
}
std::vector<ServerID> res;
@ -764,13 +810,14 @@ std::vector<ServerID> ClusterInfo::getDBServers () {
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lookup the server's endpoint by scanning Target/MapIDToEnpdoint for
/// our id
////////////////////////////////////////////////////////////////////////////////
std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) {
static const std::string prefix = "Target/MapIDToEndpoint/";
AgencyCommResult result;
// fetch value at Target/MapIDToEndpoint
@ -778,15 +825,15 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) {
AgencyCommLocker locker("Target", "READ");
if (locker.successful()) {
result = _agency.getValues("Target/MapIDToEndpoint/" + serverID, false);
result = _agency.getValues(prefix + serverID, false);
}
}
if (result.successful()) {
std::map<std::string, std::string> out;
if (! result.flattenJson(out, "Target/MapIDToEndpoint/", false)) {
LOG_FATAL_AND_EXIT("Got an invalid JSON response for Target/MapIDToEndpoint");
if (! result.flattenJson(out, prefix, false)) {
LOG_FATAL_AND_EXIT("Got an invalid JSON response for %s", prefix.c_str());
}
// check if we can find ourselves in the list returned by the agency
@ -811,7 +858,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
int tries = 0;
if (! _collectionsValid) {
loadCollections();
loadCurrentCollections();
tries++;
}
@ -825,8 +872,8 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
}
}
// must call loadCollections outside the lock
loadCollections();
// must load collections outside the lock
loadCurrentCollections();
}
return ServerID("");

View File

@ -298,7 +298,14 @@ namespace triagens {
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void loadCollections ();
void loadCurrentCollections ();
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void loadPlannedDatabases ();
////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection
@ -332,14 +339,14 @@ namespace triagens {
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void loadDBServers ();
void loadCurrentDBServers ();
////////////////////////////////////////////////////////////////////////////////
/// @brief return a list of all DBServers in the cluster that have
/// currently registered
////////////////////////////////////////////////////////////////////////////////
std::vector<ServerID> getDBServers ();
std::vector<ServerID> getCurrentDBServers ();
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about servers from the agency
@ -391,6 +398,7 @@ namespace triagens {
_uniqid;
// Cached data from the agency, we reload whenever necessary:
std::map<DatabaseID, struct TRI_json_s*> _plannedDatabases; // from Plan/Databases
AllCollections _collections; // from Current/Collections/
bool _collectionsValid;
std::map<ServerID, std::string> _servers; // from Current/ServersRegistered

View File

@ -795,7 +795,7 @@ static v8::Handle<v8::Value> JS_GetDBServers (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_USAGE(scope, "DBServers()");
}
std::vector<std::string> DBServers = ClusterInfo::instance()->getDBServers();
std::vector<std::string> DBServers = ClusterInfo::instance()->getCurrentDBServers();
v8::Handle<v8::Array> l = v8::Array::New();
@ -819,7 +819,7 @@ static v8::Handle<v8::Value> JS_ReloadDBServers (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_USAGE(scope, "reloadDBServers()");
}
ClusterInfo::instance()->loadDBServers();
ClusterInfo::instance()->loadCurrentDBServers();
return scope.Close(v8::Undefined());
}

View File

@ -1850,7 +1850,7 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
const std::string cid = StringUtils::itoa(id);
// fetch list of available servers in cluster, and shuffle them randomly
std::vector<std::string> dbServers = ClusterInfo::instance()->getDBServers();
std::vector<std::string> dbServers = ClusterInfo::instance()->getCurrentDBServers();
if (dbServers.empty()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster");
@ -8179,7 +8179,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
ClusterInfo* ci = ClusterInfo::instance();
if (argv.Length() == 0) {
ci->loadCollections();
ci->loadCurrentCollections();
vector<DatabaseID> list = ci->listDatabases();
v8::Handle<v8::Array> result = v8::Array::New();
for (size_t i = 0; i < list.size(); ++i) {
@ -8193,7 +8193,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
int tries = 0;
vector<ServerID> DBServers;
while (++tries <= 2) {
DBServers = ci->getDBServers();
DBServers = ci->getCurrentDBServers();
if (DBServers.size() != 0) {
ServerID sid = DBServers[0];
ClusterComm* cc = ClusterComm::instance();
@ -8227,7 +8227,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
delete res;
}
}
ci->loadDBServers(); // just in case some new have arrived
ci->loadCurrentDBServers(); // just in case some new have arrived
}
// Give up:
return scope.Close(v8::Undefined());
@ -8344,10 +8344,6 @@ static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const&
const string name = TRI_ObjectToString(argv[0]);
//ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
AgencyComm ac;
int ourerrno = TRI_ERROR_NO_ERROR;
ourerrno = CreateDatabaseInAgency("Plan",name);

View File

@ -92,8 +92,6 @@ function ClusterEnabledSuite () {
catch (err) {
}
});
agency.set("Sync/LatestID", "0");
};
return {