1
0
Fork 0

code cleanup

This commit is contained in:
Jan Steemann 2014-01-17 10:21:05 +01:00
parent 9cbc6d55ca
commit 42b5686ee2
2 changed files with 123 additions and 63 deletions

View File

@ -39,6 +39,19 @@
using namespace triagens::arango;
using triagens::basics::JsonHelper;
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief a local helper to report errors and messages
////////////////////////////////////////////////////////////////////////////////
static inline int setErrormsg (int ourerrno, string& errorMsg) {
errorMsg = TRI_errno_string(ourerrno);
return ourerrno;
}
// -----------------------------------------------------------------------------
// --SECTION-- CollectionInfo class
// -----------------------------------------------------------------------------
@ -612,13 +625,6 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
return result;
}
// A local helper to report errors and messages:
static inline int set_errormsg(int ourerrno, string& errorMsg) {
errorMsg = TRI_errno_string(ourerrno);
return ourerrno;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create database in coordinator, the return value is an ArangoDB
/// error code and the errorMsg is set accordingly. One possible error
@ -627,23 +633,28 @@ static inline int set_errormsg(int ourerrno, string& errorMsg) {
int ClusterInfo::createDatabaseCoordinator (string const& name,
TRI_json_t const* json,
string errorMsg, double timeout) {
string errorMsg,
double timeout) {
AgencyComm ac;
AgencyCommResult res;
const double realTimeout = getTimeout(timeout);
const double endTime = TRI_microtime() + realTimeout;
const double interval = getPollInterval();
{
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0);
res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, realTimeout);
if (!res.successful()) {
if (res._statusCode == 412) {
return set_errormsg(TRI_ERROR_CLUSTER_DATABASE_NAME_EXISTS, errorMsg);
if (res._statusCode == triagens::rest::HttpResponse::PRECONDITION_FAILED) {
return setErrormsg(TRI_ERROR_CLUSTER_DATABASE_NAME_EXISTS, errorMsg);
}
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE_IN_PLAN,
errorMsg);
}
}
@ -651,18 +662,16 @@ int ClusterInfo::createDatabaseCoordinator (string const& name,
// Now wait for it to appear and be complete:
res = ac.getValues("Current/Version", false);
if (!res.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
errorMsg);
}
uint64_t index = res._index;
double endtime = TRI_microtime();
endtime += timeout == 0.0 ? 1e50 : timeout;
vector<ServerID> DBServers = getCurrentDBServers();
int count = 0; // this counts, when we have to reload the DBServers
string where = "Current/Databases/" + name;
while (TRI_microtime() <= endtime) {
while (TRI_microtime() <= endTime) {
res = ac.getValues(where, true);
if (res.successful() && res.parse(where+"/", false)) {
if (res._values.size() == DBServers.size()) {
@ -694,12 +703,13 @@ int ClusterInfo::createDatabaseCoordinator (string const& name,
errorMsg = "Error in creation of database:" + tmpMsg;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
}
return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg);
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
res = ac.watchValue("Current/Version", index, 5.0, false);
res = ac.watchValue("Current/Version", index, getReloadServerListTimeout() / interval, false);
index = res._index;
if (++count >= 12) {
if (++count >= static_cast<int>(getReloadServerListTimeout() / interval)) {
// We update the list of DBServers every minute in case one of them
// was taken away since we last looked. This also helps (slightly)
// if a new DBServer was added. However, in this case we report
@ -709,7 +719,7 @@ int ClusterInfo::createDatabaseCoordinator (string const& name,
count = 0;
}
}
return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
////////////////////////////////////////////////////////////////////////////////
@ -723,23 +733,27 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg,
AgencyComm ac;
AgencyCommResult res;
const double realTimeout = getTimeout(timeout);
const double endTime = TRI_microtime() + realTimeout;
const double interval = getPollInterval();
{
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
if (! ac.exists("Plan/Databases/" + name)) {
return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
}
res = ac.removeValues("Plan/Databases/"+name, false);
if (!res.successful()) {
if (res._statusCode == rest::HttpResponse::NOT_FOUND) {
return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
}
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_PLAN,
errorMsg);
}
}
@ -747,15 +761,13 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg,
// Now wait for it to appear and be complete:
res = ac.getValues("Current/Version", false);
if (!res.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
errorMsg);
}
uint64_t index = res._index;
double endtime = TRI_microtime();
endtime += timeout == 0.0 ? 1e50 : timeout;
string where = "Current/Databases/" + name;
while (TRI_microtime() <= endtime) {
while (TRI_microtime() <= endTime) {
res = ac.getValues(where, true);
if (res.successful() && res.parse(where+"/", false)) {
if (res._values.size() == 0) {
@ -763,18 +775,19 @@ int ClusterInfo::dropDatabaseCoordinator (string const& name, string& errorMsg,
if (locker.successful()) {
res = ac.removeValues(where, true);
if (res.successful()) {
return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg);
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
return set_errormsg(
return setErrormsg(
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_DATABASE_IN_CURRENT, errorMsg);
}
return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg);
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
res = ac.watchValue("Current/Version", index, 5.0, false);
res = ac.watchValue("Current/Version", index, interval, false);
index = res._index;
}
return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
////////////////////////////////////////////////////////////////////////////////
@ -790,26 +803,30 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
string errorMsg, double timeout) {
AgencyComm ac;
const double realTimeout = getTimeout(timeout);
const double endTime = TRI_microtime() + realTimeout;
const double interval = getPollInterval();
{
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
if (! ac.exists("Plan/Databases/" + databaseName)) {
return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
}
if (ac.exists("Plan/Collections/" + databaseName + "/"+collectionID)) {
return set_errormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg);
}
AgencyCommResult result
= ac.setValue("Plan/Collections/" + databaseName + "/"+collectionID,
json, 0.0);
if (!result.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
errorMsg);
}
}
@ -817,15 +834,13 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
// Now wait for it to appear and be complete:
AgencyCommResult res = ac.getValues("Current/Version", false);
if (!res.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
errorMsg);
}
uint64_t index = res._index;
double endtime = TRI_microtime();
endtime += timeout == 0.0 ? 1e50 : timeout;
string where = "Current/Collections/" + databaseName + "/" + collectionID;
while (TRI_microtime() <= endtime) {
while (TRI_microtime() <= endTime) {
res = ac.getValues(where, true);
if (res.successful() && res.parse(where+"/", false)) {
cout << "Seeing " << res._values.size() << "shards." << endl;
@ -858,13 +873,14 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
errorMsg = "Error in creation of collection:" + tmpMsg;
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
}
return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg);
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
res = ac.watchValue("Current/Version", index, 5.0, false);
res = ac.watchValue("Current/Version", index, interval, false);
index = res._index;
}
return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
@ -881,24 +897,28 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
AgencyComm ac;
AgencyCommResult res;
const double realTimeout = getTimeout(timeout);
const double endTime = TRI_microtime() + realTimeout;
const double interval = getPollInterval();
{
AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, errorMsg);
}
if (! ac.exists("Plan/Databases/" + databaseName)) {
return set_errormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
}
res = ac.removeValues("Plan/Collections/"+databaseName+"/"+collectionID,
false);
if (!res.successful()) {
if (res._statusCode == rest::HttpResponse::NOT_FOUND) {
return set_errormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_PLAN,
errorMsg);
}
}
@ -906,15 +926,13 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
// Now wait for it to appear and be complete:
res = ac.getValues("Current/Version", false);
if (!res.successful()) {
return set_errormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
errorMsg);
}
uint64_t index = res._index;
double endtime = TRI_microtime();
endtime += timeout == 0.0 ? 1e50 : timeout;
string where = "Current/Collections/" + databaseName + "/" + collectionID;
while (TRI_microtime() <= endtime) {
while (TRI_microtime() <= endTime) {
res = ac.getValues(where, true);
if (res.successful() && res.parse(where+"/", false)) {
if (res._values.size() == 0) {
@ -923,18 +941,19 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
res = ac.removeValues("Current/Collections/"+databaseName+"/"+
collectionID, true);
if (res.successful()) {
return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg);
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
return set_errormsg(
return setErrormsg(
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT, errorMsg);
}
return set_errormsg(TRI_ERROR_NO_ERROR, errorMsg);
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
}
}
res = ac.watchValue("Current/Version", index, 5.0, false);
res = ac.watchValue("Current/Version", index, interval, false);
index = res._index;
}
return set_errormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -448,6 +448,35 @@ namespace triagens {
ServerID getResponsibleServer (ShardID const&);
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief get an operation timeout
////////////////////////////////////////////////////////////////////////////////
double getTimeout (double timeout) const {
if (timeout == 0.0) {
return 24.0 * 3600.0;
}
return timeout;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the poll interval
////////////////////////////////////////////////////////////////////////////////
double getPollInterval () const {
return 5.0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the timeout for reloading the server list
////////////////////////////////////////////////////////////////////////////////
double getReloadServerListTimeout () const {
return 60.0;
}
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
@ -495,6 +524,18 @@ namespace triagens {
static const uint64_t MinIdsPerBatch = 100;
////////////////////////////////////////////////////////////////////////////////
/// @brief default wait timeout
////////////////////////////////////////////////////////////////////////////////
static const double operationTimeout;
////////////////////////////////////////////////////////////////////////////////
/// @brief reload timeout
////////////////////////////////////////////////////////////////////////////////
static const double reloadServerListTimeout;
};
} // end namespace arango