mirror of https://gitee.com/bigwinds/arangodb
Fix various things in agency callback handling in coordinator.
This commit is contained in:
parent
e52768c497
commit
fcf9d32ba4
|
@ -124,18 +124,11 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
|
||||||
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
||||||
// One needs to acquire the mutex of the condition variable
|
// One needs to acquire the mutex of the condition variable
|
||||||
// before entering this function!
|
// before entering this function!
|
||||||
auto compareBuilder = std::make_shared<VPackBuilder>();
|
|
||||||
if (_lastData) {
|
|
||||||
compareBuilder = _lastData;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0))) {
|
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0))) {
|
||||||
if (!_lastData || !_lastData->slice().equals(compareBuilder->slice())) {
|
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
<< "Waiting done and nothing happended. Refetching to be sure";
|
||||||
<< "Waiting done and nothing happended. Refetching to be sure";
|
// mop: watches have not triggered during our sleep...recheck to be sure
|
||||||
// mop: watches have not triggered during our sleep...recheck to be sure
|
refetchAndUpdate(false);
|
||||||
refetchAndUpdate(false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -839,7 +839,6 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
|
||||||
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
|
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
loadCurrent(); // update our cache
|
|
||||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -897,6 +896,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*dbServerResult >= 0) {
|
if (*dbServerResult >= 0) {
|
||||||
|
loadCurrent(); // update our cache
|
||||||
return *dbServerResult;
|
return *dbServerResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1062,7 +1062,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
loadCurrent();
|
|
||||||
if (tmpHaveError) {
|
if (tmpHaveError) {
|
||||||
*errMsg = "Error in creation of collection:" + tmpMsg;
|
*errMsg = "Error in creation of collection:" + tmpMsg;
|
||||||
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||||
|
@ -1120,8 +1119,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
errorMsg = *errMsg;
|
errorMsg = *errMsg;
|
||||||
|
|
||||||
if (*dbServerResult >= 0) {
|
if (*dbServerResult >= 0) {
|
||||||
|
loadCurrent();
|
||||||
return *dbServerResult;
|
return *dbServerResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1151,37 +1151,21 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
|
||||||
double const realTimeout = getTimeout(timeout);
|
double const realTimeout = getTimeout(timeout);
|
||||||
double const endTime = TRI_microtime() + realTimeout;
|
double const endTime = TRI_microtime() + realTimeout;
|
||||||
double const interval = getPollInterval();
|
double const interval = getPollInterval();
|
||||||
|
|
||||||
auto dbServerResult = std::make_shared<int>(-1);
|
auto dbServerResult = std::make_shared<int>(-1);
|
||||||
auto errMsg = std::make_shared<std::string>();
|
auto errMsg = std::make_shared<std::string>();
|
||||||
std::function<bool(VPackSlice const& result)> dbServerChanged =
|
std::function<bool(VPackSlice const& result)> dbServerChanged =
|
||||||
[=](VPackSlice const& result) {
|
[=](VPackSlice const& result) {
|
||||||
AgencyComm ac;
|
|
||||||
if (result.isObject() && result.length() == 0) {
|
if (result.isObject() && result.length() == 0) {
|
||||||
// ...remove the entire directory for the collection
|
|
||||||
AgencyCommResult res;
|
|
||||||
res = ac.removeValues(
|
|
||||||
"Current/Collections/" + databaseName + "/" + collectionID, true);
|
|
||||||
if (res.successful()) {
|
|
||||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
*dbServerResult = setErrormsg(
|
|
||||||
TRI_ERROR_CLUSTER_COULD_NOT_REMOVE_COLLECTION_IN_CURRENT,
|
|
||||||
*errMsg);
|
|
||||||
return true;
|
|
||||||
|
|
||||||
loadCurrent();
|
|
||||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
// monitor the entry for the collection
|
// monitor the entry for the collection
|
||||||
std::string const where =
|
std::string const where =
|
||||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||||
|
|
||||||
// ATTENTION: The following callback calls the above closure in a
|
// ATTENTION: The following callback calls the above closure in a
|
||||||
// different thread. Nevertheless, the closure accesses some of our
|
// different thread. Nevertheless, the closure accesses some of our
|
||||||
// local variables. Therefore we have to protect all accesses to them
|
// local variables. Therefore we have to protect all accesses to them
|
||||||
|
@ -1203,25 +1187,29 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
|
||||||
AgencyWriteTransaction trans(
|
AgencyWriteTransaction trans(
|
||||||
{delPlanCollection, incrementVersion},precondition);
|
{delPlanCollection, incrementVersion},precondition);
|
||||||
res = ac.sendTransactionWithFailover(trans);
|
res = ac.sendTransactionWithFailover(trans);
|
||||||
|
|
||||||
// Update our own cache:
|
// Update our own cache:
|
||||||
loadPlan();
|
loadPlan();
|
||||||
|
|
||||||
{
|
{
|
||||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
errorMsg = *errMsg;
|
errorMsg = *errMsg;
|
||||||
|
|
||||||
if (*dbServerResult >= 0) {
|
if (*dbServerResult >= 0) {
|
||||||
|
// ...remove the entire directory for the collection
|
||||||
|
ac.removeValues(
|
||||||
|
"Current/Collections/" + databaseName + "/" + collectionID, true);
|
||||||
|
loadCurrent()
|
||||||
return *dbServerResult;
|
return *dbServerResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TRI_microtime() > endTime) {
|
if (TRI_microtime() > endTime) {
|
||||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
agencyCallback->executeByCallbackOrTimeout(interval);
|
agencyCallback->executeByCallbackOrTimeout(interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1618,15 +1606,13 @@ int ClusterInfo::ensureIndexCoordinator(
|
||||||
}
|
}
|
||||||
resBuilder->add("isNewlyCreated", VPackValue(true));
|
resBuilder->add("isNewlyCreated", VPackValue(true));
|
||||||
}
|
}
|
||||||
loadCurrent();
|
|
||||||
|
|
||||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
// ATTENTION: The following callback calls the above closure in a
|
// ATTENTION: The following callback calls the above closure in a
|
||||||
// different thread. Nevertheless, the closure accesses some of our
|
// different thread. Nevertheless, the closure accesses some of our
|
||||||
|
@ -1644,19 +1630,19 @@ int ClusterInfo::ensureIndexCoordinator(
|
||||||
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP);
|
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP);
|
||||||
AgencyPrecondition oldValue(key, AgencyPrecondition::VALUE, collection);
|
AgencyPrecondition oldValue(key, AgencyPrecondition::VALUE, collection);
|
||||||
AgencyWriteTransaction trx ({newValue, incrementVersion}, oldValue);
|
AgencyWriteTransaction trx ({newValue, incrementVersion}, oldValue);
|
||||||
|
|
||||||
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
|
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
|
||||||
|
|
||||||
if (!result.successful()) {
|
if (!result.successful()) {
|
||||||
resultBuilder = *resBuilder;
|
resultBuilder = *resBuilder;
|
||||||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
|
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
|
||||||
errorMsg);
|
errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
loadPlan();
|
loadPlan();
|
||||||
|
|
||||||
TRI_ASSERT(*numberOfShards > 0);
|
TRI_ASSERT(*numberOfShards > 0);
|
||||||
|
|
||||||
{
|
{
|
||||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||||
|
|
||||||
|
@ -1664,15 +1650,16 @@ int ClusterInfo::ensureIndexCoordinator(
|
||||||
|
|
||||||
errorMsg = *errMsg;
|
errorMsg = *errMsg;
|
||||||
resultBuilder = *resBuilder;
|
resultBuilder = *resBuilder;
|
||||||
|
|
||||||
if (*dbServerResult >= 0) {
|
if (*dbServerResult >= 0) {
|
||||||
|
loadCurrent();
|
||||||
return *dbServerResult;
|
return *dbServerResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TRI_microtime() > endTime) {
|
if (TRI_microtime() > endTime) {
|
||||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
agencyCallback->executeByCallbackOrTimeout(interval);
|
agencyCallback->executeByCallbackOrTimeout(interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1741,7 +1728,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
}
|
}
|
||||||
|
|
||||||
VPackObjectIterator shards(current);
|
VPackObjectIterator shards(current);
|
||||||
|
|
||||||
if (shards.size() == (size_t)localNumberOfShards) {
|
if (shards.size() == (size_t)localNumberOfShards) {
|
||||||
bool found = false;
|
bool found = false;
|
||||||
for (auto const& shard : shards) {
|
for (auto const& shard : shards) {
|
||||||
|
@ -1767,13 +1754,12 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found) {
|
if (!found) {
|
||||||
loadCurrent();
|
|
||||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
// ATTENTION: The following callback calls the above closure in a
|
// ATTENTION: The following callback calls the above closure in a
|
||||||
// different thread. Nevertheless, the closure accesses some of our
|
// different thread. Nevertheless, the closure accesses some of our
|
||||||
// local variables. Therefore we have to protect all accesses to them
|
// local variables. Therefore we have to protect all accesses to them
|
||||||
|
@ -1789,30 +1775,30 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
// and the write lock we acquire below something has changed. Therefore
|
// and the write lock we acquire below something has changed. Therefore
|
||||||
// we first get the previous value and then do a compare and swap operation.
|
// we first get the previous value and then do a compare and swap operation.
|
||||||
|
|
||||||
|
|
||||||
VPackBuilder tmp;
|
VPackBuilder tmp;
|
||||||
VPackSlice indexes;
|
VPackSlice indexes;
|
||||||
{
|
{
|
||||||
std::shared_ptr<LogicalCollection> c =
|
std::shared_ptr<LogicalCollection> c =
|
||||||
getCollection(databaseName, collectionID);
|
getCollection(databaseName, collectionID);
|
||||||
|
|
||||||
READ_LOCKER(readLocker, _planProt.lock);
|
READ_LOCKER(readLocker, _planProt.lock);
|
||||||
|
|
||||||
if (c == nullptr) {
|
if (c == nullptr) {
|
||||||
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
|
return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg);
|
||||||
}
|
}
|
||||||
c->getIndexesVPack(tmp, false);
|
c->getIndexesVPack(tmp, false);
|
||||||
indexes = tmp.slice();
|
indexes = tmp.slice();
|
||||||
|
|
||||||
if (!indexes.isArray()) {
|
if (!indexes.isArray()) {
|
||||||
// no indexes present, so we can't delete our index
|
// no indexes present, so we can't delete our index
|
||||||
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
|
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
MUTEX_LOCKER(guard, *numberOfShardsMutex);
|
MUTEX_LOCKER(guard, *numberOfShardsMutex);
|
||||||
*numberOfShards = c->numberOfShards();
|
*numberOfShards = c->numberOfShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool found = false;
|
bool found = false;
|
||||||
VPackBuilder newIndexes;
|
VPackBuilder newIndexes;
|
||||||
{
|
{
|
||||||
|
@ -1822,14 +1808,14 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
for (auto const& indexSlice: VPackArrayIterator(indexes)) {
|
for (auto const& indexSlice: VPackArrayIterator(indexes)) {
|
||||||
VPackSlice id = indexSlice.get("id");
|
VPackSlice id = indexSlice.get("id");
|
||||||
VPackSlice type = indexSlice.get("type");
|
VPackSlice type = indexSlice.get("type");
|
||||||
|
|
||||||
if (!id.isString() || !type.isString()) {
|
if (!id.isString() || !type.isString()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (idString == id.copyString()) {
|
if (idString == id.copyString()) {
|
||||||
// found our index, ignore it when copying
|
// found our index, ignore it when copying
|
||||||
found = true;
|
found = true;
|
||||||
|
|
||||||
std::string const typeString = type.copyString();
|
std::string const typeString = type.copyString();
|
||||||
if (typeString == "primary" || typeString == "edge") {
|
if (typeString == "primary" || typeString == "edge") {
|
||||||
return setErrormsg(TRI_ERROR_FORBIDDEN, errorMsg);
|
return setErrormsg(TRI_ERROR_FORBIDDEN, errorMsg);
|
||||||
|
@ -1842,7 +1828,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
if (!found) {
|
if (!found) {
|
||||||
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
|
return setErrormsg(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
VPackBuilder newCollectionBuilder;
|
VPackBuilder newCollectionBuilder;
|
||||||
{
|
{
|
||||||
VPackObjectBuilder newCollectionObjectBuilder(&newCollectionBuilder);
|
VPackObjectBuilder newCollectionObjectBuilder(&newCollectionBuilder);
|
||||||
|
@ -1862,15 +1848,15 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
AgencyPrecondition prec(key, AgencyPrecondition::VALUE, previous);
|
AgencyPrecondition prec(key, AgencyPrecondition::VALUE, previous);
|
||||||
AgencyWriteTransaction trx ({newVal, incrementVersion}, prec);
|
AgencyWriteTransaction trx ({newVal, incrementVersion}, prec);
|
||||||
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
|
AgencyCommResult result = ac.sendTransactionWithFailover(trx, 0.0);
|
||||||
|
|
||||||
if (!result.successful()) {
|
if (!result.successful()) {
|
||||||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
|
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
|
||||||
errorMsg);
|
errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// load our own cache:
|
// load our own cache:
|
||||||
loadPlan();
|
loadPlan();
|
||||||
|
|
||||||
{
|
{
|
||||||
MUTEX_LOCKER(guard, *numberOfShardsMutex);
|
MUTEX_LOCKER(guard, *numberOfShardsMutex);
|
||||||
TRI_ASSERT(*numberOfShards > 0);
|
TRI_ASSERT(*numberOfShards > 0);
|
||||||
|
@ -1882,8 +1868,9 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
errorMsg = *errMsg;
|
errorMsg = *errMsg;
|
||||||
|
|
||||||
if (*dbServerResult >= 0) {
|
if (*dbServerResult >= 0) {
|
||||||
|
loadCurrent();
|
||||||
return *dbServerResult;
|
return *dbServerResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1919,7 +1906,7 @@ void ClusterInfo::loadServers() {
|
||||||
AgencyCommResult result = _agency.getValues(prefixServers);
|
AgencyCommResult result = _agency.getValues(prefixServers);
|
||||||
|
|
||||||
if (result.successful()) {
|
if (result.successful()) {
|
||||||
|
|
||||||
velocypack::Slice serversRegistered =
|
velocypack::Slice serversRegistered =
|
||||||
result.slice()[0].get(std::vector<std::string>(
|
result.slice()[0].get(std::vector<std::string>(
|
||||||
{AgencyComm::prefix(), "Current", "ServersRegistered"}));
|
{AgencyComm::prefix(), "Current", "ServersRegistered"}));
|
||||||
|
|
Loading…
Reference in New Issue