mirror of https://gitee.com/bigwinds/arangodb
Fix index creation in cluster. (#7440)
* Fix index creation in cluster. Simplify and correct error handling logic in ensureIndexCoordinator. * After index creation, wait until index appears. We wait until the Supervision has removed the isBuilding flag and the coordinator has reloaded the Plan. * More index handling fixes. * Directly remove isBuilding in ensureIndexCoordinator (again). * Fix catch tests by holding mutex shorter. * Better mutex handling in ClusterInfo.
This commit is contained in:
parent
683d45bd68
commit
ae29e5d2ba
|
@ -1860,12 +1860,11 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
}
|
||||
|
||||
{
|
||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||
|
||||
while (true) {
|
||||
|
||||
int tmpRes = dbServerResult->load(std::memory_order_acquire);
|
||||
if (tmpRes >= 0) {
|
||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||
cbGuard.fire(); // unregister cb before accessing errMsg
|
||||
errorMsg = *errMsg;
|
||||
loadCurrent();
|
||||
|
@ -1909,7 +1908,10 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
return setErrormsg(TRI_ERROR_SHUTTING_DOWN, errorMsg);
|
||||
}
|
||||
|
||||
agencyCallback->executeByCallbackOrTimeout(interval);
|
||||
{
|
||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||
agencyCallback->executeByCallbackOrTimeout(interval);
|
||||
}
|
||||
if (!application_features::ApplicationServer::isRetryOK()) {
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
}
|
||||
|
@ -2458,10 +2460,6 @@ int ClusterInfo::ensureIndexCoordinator(
|
|||
VPackSlice const& slice, bool create, VPackBuilder& resultBuilder,
|
||||
std::string& errorMsg, double timeout) {
|
||||
|
||||
// The timepoint at which we timeout
|
||||
using namespace std::chrono;
|
||||
auto const endTime = steady_clock::now() + duration<double>(timeout);
|
||||
|
||||
// check index id
|
||||
uint64_t iid = 0;
|
||||
|
||||
|
@ -2482,9 +2480,12 @@ int ClusterInfo::ensureIndexCoordinator(
|
|||
// Keep trying for 2 minutes, if it's preconditions, which are stopping us
|
||||
do {
|
||||
resultBuilder.clear();
|
||||
errorCode = ensureIndexCoordinatorWithoutRollback(
|
||||
errorCode = ensureIndexCoordinatorInner(
|
||||
databaseName, collectionID, idString, slice, create, resultBuilder,
|
||||
errorMsg, timeout);
|
||||
// Note that this function sets the errorMsg unless it is precondition
|
||||
// failed, in which case we retry, if this times out, we need to set
|
||||
// it ourselves, otherwise all is done!
|
||||
|
||||
if (errorCode == TRI_ERROR_HTTP_PRECONDITION_FAILED) {
|
||||
auto diff = std::chrono::steady_clock::now() - start;
|
||||
|
@ -2493,144 +2494,56 @@ int ClusterInfo::ensureIndexCoordinator(
|
|||
std::this_thread::sleep_for(std::chrono::steady_clock::duration(wt));
|
||||
continue;
|
||||
}
|
||||
errorCode
|
||||
= setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN,
|
||||
errorMsg);
|
||||
}
|
||||
break;
|
||||
} while(true);
|
||||
} catch (basics::Exception const& ex) {
|
||||
errorCode = ex.code();
|
||||
setErrormsg(errorCode, errorMsg);
|
||||
errorMsg += std::string(", exception: ") + ex.what();
|
||||
} catch (...) {
|
||||
errorCode = TRI_ERROR_INTERNAL;
|
||||
setErrormsg(errorCode, errorMsg);
|
||||
}
|
||||
|
||||
if (application_features::ApplicationServer::isStopping()) {
|
||||
return errorCode;
|
||||
// We get here in any case eventually, regardless of whether we have
|
||||
// - succeeded with lookup or index creation
|
||||
// - failed because of a timeout and rollback
|
||||
// - some other error
|
||||
// There is nothing more to do here.
|
||||
|
||||
if (!application_features::ApplicationServer::isStopping()) {
|
||||
loadPlan();
|
||||
}
|
||||
loadPlan();
|
||||
|
||||
std::string const indexPath =
|
||||
"Plan/Collections/" + databaseName + "/" + collectionID + "/indexes";
|
||||
|
||||
auto const resultSlice = resultBuilder.slice();
|
||||
VPackBuilder oldPlanIndex;
|
||||
{ VPackObjectBuilder b(&oldPlanIndex);
|
||||
for (auto const& entry : VPackObjectIterator(resultSlice)) {
|
||||
auto const key = entry.key.copyString();
|
||||
if (key != "isNewlyCreated") {
|
||||
oldPlanIndex.add(entry.key.copyString(), entry.value);
|
||||
}}}
|
||||
|
||||
std::uint64_t sleepFor = 50;
|
||||
if (errorCode == TRI_ERROR_NO_ERROR) { // Success so far
|
||||
|
||||
// Smart collection, we're done.
|
||||
auto const resultSlice = resultBuilder.slice();
|
||||
if (resultSlice.hasKey("isSmart") &&
|
||||
resultSlice.get("isSmart").isBoolean() &&
|
||||
resultSlice.get("isSmart").getBoolean()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
// Arangosearch we're done
|
||||
if (slice.get(
|
||||
arangodb::StaticStrings::IndexType).isEqualString("arangosearch")) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
std::string idStr;
|
||||
if (resultSlice.hasKey("id")) {
|
||||
idStr = resultSlice.get("id").copyString();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Missing 'id' field in index creation result";
|
||||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
// isBuilding key to be removed, when index appears in all shards in Current
|
||||
if (resultSlice.hasKey("isBuilding")) {
|
||||
|
||||
VPackBuilder newPlanIndex;
|
||||
{ VPackObjectBuilder o(&newPlanIndex);
|
||||
for (auto const& entry : VPackObjectIterator(resultSlice)) {
|
||||
auto const key = entry.key.copyString();
|
||||
if (key != "isBuilding" && key != "isNewlyCreated") {
|
||||
newPlanIndex.add(entry.key.copyString(), entry.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove isBuilding from Plan
|
||||
AgencyWriteTransaction trx({
|
||||
AgencyOperation(
|
||||
indexPath, AgencyValueOperationType::REPLACE,
|
||||
newPlanIndex.slice(), oldPlanIndex.slice()),
|
||||
AgencyOperation(
|
||||
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP)});
|
||||
sleepFor = 50;
|
||||
while (true) {
|
||||
if (_agency.sendTransactionWithFailover(trx, 0.0).successful()) {
|
||||
loadPlan();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
if (steady_clock::now() > endTime) {
|
||||
errorMsg = "Timed out while trying to update Plan record for the index.";
|
||||
// It's fine to leave here in hopes that
|
||||
// the supervision will handle this.
|
||||
return TRI_ERROR_CLUSTER_TIMEOUT;
|
||||
}
|
||||
if (sleepFor <= 2500) {
|
||||
sleepFor*=2;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds());
|
||||
}
|
||||
|
||||
} else {
|
||||
// That's odd isBuilding key has gone already. Supervision was quicker?
|
||||
loadPlan();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// At this time the index creation has failed and we want to roll back
|
||||
// the plan entry
|
||||
AgencyWriteTransaction trx(
|
||||
std::vector<AgencyOperation>
|
||||
{ AgencyOperation(
|
||||
indexPath, AgencyValueOperationType::ERASE, oldPlanIndex.slice()),
|
||||
AgencyOperation(
|
||||
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP)});
|
||||
|
||||
setErrormsg(errorCode, errorMsg);
|
||||
|
||||
sleepFor = 50;
|
||||
while (true) {
|
||||
AgencyCommResult update =
|
||||
_agency.sendTransactionWithFailover(trx, 0.0);
|
||||
if (update.successful()) {
|
||||
loadPlan();
|
||||
return errorCode;
|
||||
}
|
||||
if (steady_clock::now() > endTime) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Couldn't roll back index creation of " << idString << ". Database: "
|
||||
<< databaseName << ", Collection " << collectionID;
|
||||
errorMsg = "Timed out while trying to report index creation failure";
|
||||
return TRI_ERROR_CLUSTER_TIMEOUT;
|
||||
}
|
||||
if (sleepFor <= 2500) {
|
||||
sleepFor*=2;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(sleepFor));
|
||||
}
|
||||
|
||||
return errorCode;
|
||||
}
|
||||
|
||||
int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
||||
// The following function does the actual work of index creation: Create
|
||||
// in Plan, watch Current until all dbservers for all shards have done their
|
||||
// bit. If this goes wrong with a timeout, the creation operation is rolled
|
||||
// back. If the `create` flag is false, this is actually a lookup operation.
|
||||
// In any case, no rollback has to happen in the calling function
|
||||
// ClusterInfo::ensureIndexCoordinator. Note that this method here
|
||||
// sets the `isBuilding` attribute to `true`, which leads to the fact
|
||||
// that the index is not yet used by queries. There is code in the
|
||||
// Agency Supervision which deletes this flag once everything has been
|
||||
// built successfully. This is a more robust and self-repairing solution
|
||||
// than if we would take out the `isBuilding` here, since it survives a
|
||||
// coordinator crash and failover operations.
|
||||
// Finally note that the retry loop for the case of a failed precondition
|
||||
// is outside this function here in `ensureIndexCoordinator`.
|
||||
|
||||
int ClusterInfo::ensureIndexCoordinatorInner(
|
||||
std::string const& databaseName, std::string const& collectionID,
|
||||
std::string const& idString, VPackSlice const& slice, bool create,
|
||||
VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) {
|
||||
AgencyComm ac;
|
||||
|
||||
using namespace std::chrono;
|
||||
|
||||
double const realTimeout = getTimeout(timeout);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
double const interval = getPollInterval();
|
||||
|
@ -2643,7 +2556,7 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
|
||||
AgencyCommResult previous = ac.getValues(planCollKey);
|
||||
if (!previous.successful()) {
|
||||
return TRI_ERROR_CLUSTER_READING_PLAN_AGENCY;
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_READING_PLAN_AGENCY, errorMsg);
|
||||
}
|
||||
|
||||
velocypack::Slice collection = previous.slice()[0].get(
|
||||
|
@ -2712,9 +2625,11 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
|
||||
// check for errors
|
||||
if (hasError(v)) {
|
||||
std::string errorMsg =
|
||||
extractErrorMessage(shard.key.copyString(), v);
|
||||
errorMsg = "Error during index creation: " + errorMsg;
|
||||
// Note that this closure runs with the mutex in the condition
|
||||
// variable of the agency callback, which protects writing
|
||||
// to *errMsg:
|
||||
*errMsg = extractErrorMessage(shard.key.copyString(), v);
|
||||
*errMsg = "Error during index creation: " + *errMsg;
|
||||
// Returns the specific error number if set, or the general
|
||||
// error otherwise
|
||||
int errNum = arangodb::basics::VelocyPackHelper::readNumericValue<int>(
|
||||
|
@ -2731,7 +2646,6 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
|
||||
if (found == (size_t)numberOfShards) {
|
||||
dbServerResult->store(setErrormsg(TRI_ERROR_NO_ERROR, *errMsg), std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -2783,6 +2697,7 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
if (!result.successful()) {
|
||||
if (result.httpCode() ==
|
||||
(int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
|
||||
// Retry loop is outside!
|
||||
return TRI_ERROR_HTTP_PRECONDITION_FAILED;
|
||||
} else {
|
||||
errorMsg += " Failed to execute ";
|
||||
|
@ -2794,6 +2709,10 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_INDEX_IN_PLAN;
|
||||
}
|
||||
|
||||
// From here on we want to roll back the index creation if we run into
|
||||
// the timeout. If this coordinator crashes, the worst that can happen
|
||||
// is that the index stays in some state. In most cases, it will converge
|
||||
// against the planned state.
|
||||
loadPlan();
|
||||
|
||||
if (numberOfShards == 0) { // smart "dummy" collection has no shards
|
||||
|
@ -2804,29 +2723,119 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
resultBuilder.add("isSmart", VPackValue(true));
|
||||
}
|
||||
loadCurrent();
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||
}
|
||||
|
||||
{
|
||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||
|
||||
while (!application_features::ApplicationServer::isStopping()) {
|
||||
int tmpRes = dbServerResult->load(std::memory_order_acquire);
|
||||
if (tmpRes >= 0) {
|
||||
cbGuard.fire(); // unregister cb before accessing errMsg
|
||||
loadCurrent();
|
||||
{
|
||||
// Copy over all elements in slice.
|
||||
VPackObjectBuilder b(&resultBuilder);
|
||||
resultBuilder.add(VPackObjectIterator(newIndexBuilder.slice()));
|
||||
resultBuilder.add("isNewlyCreated", VPackValue(true));
|
||||
if (tmpRes == 0) {
|
||||
// Finally, in case all is good, remove the `isBuilding` flag
|
||||
// check that the index has appeared. Note that we have to have
|
||||
// a precondition since the collection could have been deleted
|
||||
// in the meantime:
|
||||
VPackBuilder finishedPlanIndex;
|
||||
{ VPackObjectBuilder o(&finishedPlanIndex);
|
||||
for (auto const& entry : VPackObjectIterator(newIndexBuilder.slice())) {
|
||||
auto const key = entry.key.copyString();
|
||||
if (key != "isBuilding" && key != "isNewlyCreated") {
|
||||
finishedPlanIndex.add(entry.key.copyString(), entry.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
errorMsg = *errMsg;
|
||||
return tmpRes;
|
||||
AgencyWriteTransaction trx(
|
||||
{AgencyOperation(
|
||||
planIndexesKey, AgencyValueOperationType::REPLACE,
|
||||
finishedPlanIndex.slice(), newIndexBuilder.slice()),
|
||||
AgencyOperation(
|
||||
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP)},
|
||||
AgencyPrecondition(planIndexesKey, AgencyPrecondition::Type::EMPTY,
|
||||
false));
|
||||
TRI_idx_iid_t indexId = arangodb::basics::StringUtils::uint64(
|
||||
newIndexBuilder.slice().get("id").copyString());
|
||||
if (!_agency.sendTransactionWithFailover(trx, 0.0).successful()) {
|
||||
// We just log the problem and move on, the Supervision will repair
|
||||
// things in due course:
|
||||
LOG_TOPIC(INFO, Logger::CLUSTER)
|
||||
<< "Could not remove isBuilding flag in new index " << indexId
|
||||
<< ", this will be repaired automatically.";
|
||||
}
|
||||
loadPlan();
|
||||
// Finally check if it has appeared, if not, we take another turn,
|
||||
// which does not do any harm:
|
||||
auto coll = getCollection(databaseName, collectionID);
|
||||
auto indexes = coll->getIndexes();
|
||||
if (std::any_of(indexes.begin(), indexes.end(),
|
||||
[indexId](std::shared_ptr<arangodb::Index>& index) -> bool {
|
||||
return indexId == index->id();
|
||||
})) {
|
||||
cbGuard.fire(); // unregister cb before accessing errMsg
|
||||
loadCurrent();
|
||||
{
|
||||
// Copy over all elements in slice.
|
||||
VPackObjectBuilder b(&resultBuilder);
|
||||
resultBuilder.add(VPackObjectIterator(finishedPlanIndex.slice()));
|
||||
resultBuilder.add("isNewlyCreated", VPackValue(true));
|
||||
}
|
||||
// The mutex in the condition variable protects the access to
|
||||
// *errMsg:
|
||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||
errorMsg = *errMsg;
|
||||
return tmpRes;
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Index " << indexId
|
||||
<< " is complete, waiting for Supervision to remove isBuilding flag.";
|
||||
}
|
||||
|
||||
if (TRI_microtime() > endTime) {
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
|
||||
if (tmpRes > 0 || TRI_microtime() > endTime) {
|
||||
// At this time the index creation has failed and we want to
|
||||
// roll back the plan entry, provided the collection still exists:
|
||||
AgencyWriteTransaction trx(
|
||||
std::vector<AgencyOperation>(
|
||||
{ AgencyOperation(
|
||||
planIndexesKey, AgencyValueOperationType::ERASE,
|
||||
newIndexBuilder.slice()),
|
||||
AgencyOperation(
|
||||
"Plan/Version", AgencySimpleOperationType::INCREMENT_OP)}),
|
||||
AgencyPrecondition(planCollKey,
|
||||
AgencyPrecondition::Type::EMPTY, false) );
|
||||
|
||||
int sleepFor = 50;
|
||||
auto rollbackEndTime = steady_clock::now() + std::chrono::seconds(10);
|
||||
while (true) {
|
||||
AgencyCommResult update =
|
||||
_agency.sendTransactionWithFailover(trx, 0.0);
|
||||
if (update.successful()) {
|
||||
loadPlan();
|
||||
if (tmpRes < 0) { // timeout
|
||||
errorMsg = "Index could not be created within timeout, giving up and rolling back index creation.";
|
||||
return TRI_ERROR_CLUSTER_TIMEOUT;
|
||||
}
|
||||
return tmpRes;
|
||||
}
|
||||
if (update._statusCode == TRI_ERROR_HTTP_PRECONDITION_FAILED) {
|
||||
// Collection was removed, let's break here and report outside
|
||||
break;
|
||||
}
|
||||
if (steady_clock::now() > rollbackEndTime) {
|
||||
LOG_TOPIC(ERR, Logger::CLUSTER)
|
||||
<< "Couldn't roll back index creation of " << idString
|
||||
<< ". Database: " << databaseName << ", Collection "
|
||||
<< collectionID;
|
||||
if (tmpRes < 0) { // timeout
|
||||
errorMsg = "Timed out while trying to roll back index creation failure";
|
||||
return TRI_ERROR_CLUSTER_TIMEOUT;
|
||||
}
|
||||
return tmpRes;
|
||||
}
|
||||
if (sleepFor <= 2500) {
|
||||
sleepFor*=2;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(sleepFor));
|
||||
}
|
||||
// We only get here if the collection was dropped just in the moment
|
||||
// when we wanted to roll back the index creation.
|
||||
}
|
||||
|
||||
auto c = getCollection(databaseName, collectionID);
|
||||
|
@ -2835,7 +2844,10 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
|
|||
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
|
||||
}
|
||||
|
||||
agencyCallback->executeByCallbackOrTimeout(interval);
|
||||
{
|
||||
CONDITION_LOCKER(locker, agencyCallback->_cv);
|
||||
agencyCallback->executeByCallbackOrTimeout(interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3728,4 +3740,4 @@ arangodb::Result ClusterInfo::getShardServers(
|
|||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- END-OF-FILE
|
||||
// -----------------------------------------------------------------------------
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -659,7 +659,7 @@ class ClusterInfo {
|
|||
/// @brief ensure an index in coordinator.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int ensureIndexCoordinatorWithoutRollback(
|
||||
int ensureIndexCoordinatorInner(
|
||||
std::string const& databaseName, std::string const& collectionID,
|
||||
std::string const& idSlice, arangodb::velocypack::Slice const& slice,
|
||||
bool create, arangodb::velocypack::Builder& resultBuilder,
|
||||
|
|
Loading…
Reference in New Issue