1
0
Fork 0

Merge branch 'json_agency_comm' of ssh://github.com/ArangoDB/ArangoDB into json_agency_comm

This commit is contained in:
Max Neunhoeffer 2016-05-03 12:55:23 +02:00
commit 3e2bd943ed
3 changed files with 70 additions and 71 deletions

View File

@ -35,8 +35,6 @@
using namespace arangodb;
#include <iostream>
AgencyCallback::AgencyCallback(AgencyComm& agency,
std::string const& key,
std::function<bool(VPackSlice const&)> const& cb,
@ -133,7 +131,7 @@ void AgencyCallback::waitWithFailover(double timeout) {
}
}
void AgencyCallback::waitForExecution(double maxTimeout) {
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
auto compareBuilder = std::make_shared<VPackBuilder>();
if (_lastData) {
compareBuilder = _lastData;

View File

@ -53,7 +53,7 @@ public:
std::string const key;
void refetchAndUpdate();
void waitForExecution(double);
void executeByCallbackOrTimeout(double);
private:

View File

@ -1003,59 +1003,25 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
AgencyComm ac;
AgencyCommResult res;
Mutex dbServersMutex;
double const realTimeout = getTimeout(timeout);
double const endTime = TRI_microtime() + realTimeout;
double const interval = getPollInterval();
{
AgencyCommLocker locker("Plan", "WRITE");
if (!locker.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
res = ac.casValue("Plan/Databases/" + name, slice, false, 0.0, realTimeout);
if (!res.successful()) {
if (res._statusCode ==
(int)arangodb::GeneralResponse::ResponseCode::PRECONDITION_FAILED) {
return setErrormsg(TRI_ERROR_ARANGO_DUPLICATE_NAME, errorMsg);
}
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN,
errorMsg);
}
}
// Now update our own cache of planned databases:
loadPlannedDatabases();
// Now wait for it to appear and be complete:
res.clear();
res = ac.getValues2("Current/Version", false);
if (!res.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
errorMsg);
}
// HERE
std::vector<ServerID> DBServers = getCurrentDBServers();
int count = 0; // this counts, when we have to reload the DBServers
std::string where = "Current/Databases/" + name;
while (TRI_microtime() <= endTime) {
int dbServerResult = -1;
std::function<bool(VPackSlice const& result)> dbServerChanged =
[&](VPackSlice const& result) {
size_t numDbServers;
{
MUTEX_LOCKER(guard, dbServersMutex);
numDbServers = DBServers.size();
}
if (result.isObject() && result.length() == numDbServers) {
VPackObjectIterator dbs(result);
res.clear();
res = ac.getValues2(where, true);
VPackSlice adbservers =
res._vpack->slice()[0]
.get(AgencyComm::prefixStripped()).get("Current")
.get("Databases").get(name);
if (res.successful() && !adbservers.isNone()) {
VPackObjectIterator dbs(adbservers);
if (dbs.size() == DBServers.size()) {
std::map<std::string, AgencyCommResultEntry>::iterator it;
std::string tmpMsg = "";
bool tmpHaveError = false;
@ -1082,16 +1048,47 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
if (tmpHaveError) {
errorMsg = "Error in creation of database:" + tmpMsg;
std::cout << errorMsg << std::endl;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
return true;
}
loadCurrentDatabases(); // update our cache
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
return true;
};
auto agencyCallback = std::make_shared<AgencyCallback>(
ac, "Current/Databases/" + name, dbServerChanged, true, false);
_agencyCallbackRegistry->registerCallback(agencyCallback);
{
AgencyCommLocker locker("Plan", "WRITE");
if (!locker.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
res = ac.casValue("Plan/Databases/" + name, slice, false, 0.0, realTimeout);
if (!res.successful()) {
if (res._statusCode ==
(int)arangodb::GeneralResponse::ResponseCode::PRECONDITION_FAILED) {
return setErrormsg(TRI_ERROR_ARANGO_DUPLICATE_NAME, errorMsg);
}
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN,
errorMsg);
}
}
res.clear();
_agencyCallbackRegistry->awaitNextChange(
"Current/Version", getReloadServerListTimeout() / interval);
// Now update our own cache of planned databases:
loadPlannedDatabases();
int count = 0; // this counts, when we have to reload the DBServers
while (TRI_microtime() <= endTime) {
agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval);
if (dbServerResult >= 0) {
break;
}
if (++count >= static_cast<int>(getReloadServerListTimeout() / interval)) {
// We update the list of DBServers every minute in case one of them
@ -1099,10 +1096,16 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
// if a new DBServer was added. However, in this case we report
// success a bit too early, which is not too bad.
loadCurrentDBServers();
{
MUTEX_LOCKER(guard, dbServersMutex);
DBServers = getCurrentDBServers();
}
count = 0;
}
}
_agencyCallbackRegistry->unregisterCallback(agencyCallback);
if (dbServerResult >= 0) {
return dbServerResult;
}
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
@ -1275,9 +1278,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
return true;
}
dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
return true;
}
return true;
};
@ -1318,7 +1319,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
loadPlannedCollections();
while (TRI_microtime() <= endTime) {
agencyCallback->waitForExecution(interval);
agencyCallback->executeByCallbackOrTimeout(interval);
if (dbServerResult >= 0) {
break;
@ -1410,7 +1411,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
loadPlannedCollections();
while (TRI_microtime() <= endTime) {
agencyCallback->waitForExecution(interval);
agencyCallback->executeByCallbackOrTimeout(interval);
if (dbServerResult >= 0) {
break;
}