1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
Michael Hackstein 2016-09-20 16:18:49 +02:00
commit 049471d91f
3 changed files with 48 additions and 69 deletions

View File

@ -65,6 +65,8 @@ void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex) {
AgencyCommResult result = _agency.getValues(key);
if (!result.successful()) {
LOG(ERR) << "Callback getValues to agency was not successful: "
<< result.errorCode() << " " << result.errorMessage();
return;
}
@ -124,18 +126,11 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
// One needs to acquire the mutex of the condition variable
// before entering this function!
auto compareBuilder = std::make_shared<VPackBuilder>();
if (_lastData) {
compareBuilder = _lastData;
}
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0))) {
if (!_lastData || !_lastData->slice().equals(compareBuilder->slice())) {
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure
refetchAndUpdate(false);
}
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure
refetchAndUpdate(false);
}
}

View File

@ -839,7 +839,6 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
return true;
}
loadCurrent(); // update our cache
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
}
return true;
@ -897,6 +896,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
}
if (*dbServerResult >= 0) {
loadCurrent(); // update our cache
return *dbServerResult;
}
@ -1062,7 +1062,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
}
}
}
loadCurrent();
if (tmpHaveError) {
*errMsg = "Error in creation of collection:" + tmpMsg;
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
@ -1120,8 +1119,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
while (true) {
errorMsg = *errMsg;
if (*dbServerResult >= 0) {
loadCurrent();
return *dbServerResult;
}
@ -1151,37 +1151,21 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
double const realTimeout = getTimeout(timeout);
double const endTime = TRI_microtime() + realTimeout;
double const interval = getPollInterval();
auto dbServerResult = std::make_shared<int>(-1);
auto errMsg = std::make_shared<std::string>();
std::function<bool(VPackSlice const& result)> dbServerChanged =
[=](VPackSlice const& result) {
AgencyComm ac;
if (result.isObject() && result.length() == 0) {
// ...remove the entire directory for the collection
AgencyCommResult res;
res = ac.removeValues(
"Current/Collections/" + databaseName + "/" + collectionID, true);
if (res.successful()) {
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
return true;
}
*dbServerResult = setErrormsg(
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT,
*errMsg);
return true;
loadCurrent();
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
return true;
}
return true;
};
// monitor the entry for the collection
std::string const where =
"Current/Collections/" + databaseName + "/" + collectionID;
// ATTENTION: The following callback calls the above closure in a
// different thread. Nevertheless, the closure accesses some of our
// local variables. Therefore we have to protect all accesses to them
@ -1203,25 +1187,29 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
AgencyWriteTransaction trans(
{delPlanCollection, incrementVersion},precondition);
res = ac.sendTransactionWithFailover(trans);
// Update our own cache:
loadPlan();
{
CONDITION_LOCKER(locker, agencyCallback->_cv);
while (true) {
errorMsg = *errMsg;
if (*dbServerResult >= 0) {
// ...remove the entire directory for the collection
ac.removeValues(
"Current/Collections/" + databaseName + "/" + collectionID, true);
loadCurrent();
return *dbServerResult;
}
if (TRI_microtime() > endTime) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
agencyCallback->executeByCallbackOrTimeout(interval);
}
}
@ -1618,15 +1606,13 @@ int ClusterInfo::ensureIndexCoordinator(
}
resBuilder->add("isNewlyCreated", VPackValue(true));
}
loadCurrent();
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
return true;
}
}
return true;
};
// ATTENTION: The following callback calls the above closure in a
// different thread. Nevertheless, the closure accesses some of our
@ -1644,19 +1630,19 @@ int ClusterInfo::ensureIndexCoordinator(
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP);
AgencyPrecondition oldValue(key, AgencyPrecondition::VALUE, collection);
AgencyWriteTransaction trx ({newValue, incrementVersion}, oldValue);
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
if (!result.successful()) {
resultBuilder = *resBuilder;
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
errorMsg);
}
loadPlan();
TRI_ASSERT(*numberOfShards > 0);
{
CONDITION_LOCKER(locker, agencyCallback->_cv);
@ -1664,15 +1650,16 @@ int ClusterInfo::ensureIndexCoordinator(
errorMsg = *errMsg;
resultBuilder = *resBuilder;
if (*dbServerResult >= 0) {
loadCurrent();
return *dbServerResult;
}
if (TRI_microtime() > endTime) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
agencyCallback->executeByCallbackOrTimeout(interval);
}
}
@ -1741,7 +1728,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
}
VPackObjectIterator shards(current);
if (shards.size() == (size_t)localNumberOfShards) {
bool found = false;
for (auto const& shard : shards) {
@ -1767,13 +1754,12 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
}
if (!found) {
loadCurrent();
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
}
}
return true;
};
// ATTENTION: The following callback calls the above closure in a
// different thread. Nevertheless, the closure accesses some of our
// local variables. Therefore we have to protect all accesses to them
@ -1789,30 +1775,30 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
// and the write lock we acquire below something has changed. Therefore
// we first get the previous value and then do a compare and swap operation.
VPackBuilder tmp;
VPackSlice indexes;
{
std::shared_ptr<LogicalCollection> c =
getCollection(databaseName, collectionID);
READ_LOCKER(readLocker, _planProt.lock);
if (c == nullptr) {
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
}
c->getIndexesVPack(tmp, false);
indexes = tmp.slice();
if (!indexes.isArray()) {
// no indexes present, so we can't delete our index
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
}
MUTEX_LOCKER(guard, *numberOfShardsMutex);
*numberOfShards = c->numberOfShards();
}
bool found = false;
VPackBuilder newIndexes;
{
@ -1822,14 +1808,14 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
for (auto const& indexSlice: VPackArrayIterator(indexes)) {
VPackSlice id = indexSlice.get("id");
VPackSlice type = indexSlice.get("type");
if (!id.isString() || !type.isString()) {
continue;
}
if (idString == id.copyString()) {
// found our index, ignore it when copying
found = true;
std::string const typeString = type.copyString();
if (typeString == "primary" || typeString == "edge") {
return setErrormsg(TRI_ERROR_FORBIDDEN, errorMsg);
@ -1842,7 +1828,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
if (!found) {
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
}
VPackBuilder newCollectionBuilder;
{
VPackObjectBuilder newCollectionObjectBuilder(&newCollectionBuilder);
@ -1862,15 +1848,15 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
AgencyPrecondition prec(key, AgencyPrecondition::VALUE, previous);
AgencyWriteTransaction trx ({newVal, incrementVersion}, prec);
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
if (!result.successful()) {
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
errorMsg);
}
// load our own cache:
loadPlan();
{
MUTEX_LOCKER(guard, *numberOfShardsMutex);
TRI_ASSERT(*numberOfShards > 0);
@ -1882,8 +1868,9 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
while (true) {
errorMsg = *errMsg;
if (*dbServerResult >= 0) {
loadCurrent();
return *dbServerResult;
}
@ -1919,7 +1906,7 @@ void ClusterInfo::loadServers() {
AgencyCommResult result = _agency.getValues(prefixServers);
if (result.successful()) {
velocypack::Slice serversRegistered =
result.slice()[0].get(std::vector<std::string>(
{AgencyComm::prefix(), "Current", "ServersRegistered"}));

View File

@ -1,9 +1,6 @@
#!/bin/bash
if [ -z "$XTERM" ] ; then
XTERM=x-terminal-emulator
fi
if [ -z "$XTERMOPTIONS" ] ; then
XTERMOPTIONS="--geometry=80x43"
XTERM=xterm
fi
if [ ! -d arangod ] || [ ! -d arangosh ] || [ ! -d UnitTests ] ; then