diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 3ca8d6d764..d1b578c696 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -166,6 +166,8 @@ void AgencyOperation::toVelocyPack(VPackBuilder& builder) const { if (_opType.value == AgencyValueOperationType::OBSERVE || _opType.value == AgencyValueOperationType::UNOBSERVE) { builder.add("url", _value); + } else if (_opType.value == AgencyValueOperationType::ERASE) { + builder.add("val", _value); } else { builder.add("new", _value); } diff --git a/arangod/Agency/AgencyComm.h b/arangod/Agency/AgencyComm.h index bc27d7e90f..ca756b9b8f 100644 --- a/arangod/Agency/AgencyComm.h +++ b/arangod/Agency/AgencyComm.h @@ -84,7 +84,7 @@ enum class AgencyReadOperationType { READ }; // --SECTION-- AgencyValueOperationType // ----------------------------------------------------------------------------- -enum class AgencyValueOperationType { SET, OBSERVE, UNOBSERVE, PUSH, PREPEND }; +enum class AgencyValueOperationType { ERASE, SET, OBSERVE, UNOBSERVE, PUSH, PREPEND }; // ----------------------------------------------------------------------------- // --SECTION-- AgencySimpleOperationType @@ -130,6 +130,8 @@ class AgencyOperationType { return "push"; case AgencyValueOperationType::PREPEND: return "prepend"; + case AgencyValueOperationType::ERASE: + return "erase"; default: return "unknown_operation_type"; } diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index f2165bf522..0e2d446749 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1584,28 +1584,98 @@ int ClusterInfo::ensureIndexCoordinator( VPackSlice const& slice, bool create, bool (*compare)(VPackSlice const&, VPackSlice const&), VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) { - AgencyComm ac; - - double const realTimeout = getTimeout(timeout); - double const endTime = TRI_microtime() + realTimeout; - double const interval = getPollInterval(); - - std::string where = - "Current/Collections/" + databaseName + "/" + collectionID; - + // check index id uint64_t iid = 0; - VPackSlice const idxSlice = slice.get("id"); - if (idxSlice.isString()) { + VPackSlice const idSlice = slice.get("id"); + if (idSlice.isString()) { // use predefined index id - iid = arangodb::basics::StringUtils::uint64(idxSlice.copyString()); + iid = arangodb::basics::StringUtils::uint64(idSlice.copyString()); } if (iid == 0) { // no id set, create a new one! iid = uniqid(); } + std::string const idString = arangodb::basics::StringUtils::itoa(iid); + + int errorCode = ensureIndexCoordinatorWithoutRollback( + databaseName, collectionID, idString, slice, create, compare, resultBuilder, errorMsg, timeout); + + if (errorCode == TRI_ERROR_NO_ERROR) { + return errorCode; + } + + std::shared_ptr planValue; + std::shared_ptr oldPlanIndexes; + std::shared_ptr c; + + size_t tries = 0; + do { + loadPlan(); + // find index in plan + planValue = nullptr; + oldPlanIndexes.reset(new VPackBuilder()); + + c = getCollection(databaseName, collectionID); + c->getIndexesVPack(*(oldPlanIndexes.get()), false); + VPackSlice const planIndexes = oldPlanIndexes->slice(); + + if (planIndexes.isArray()) { + for (auto const& index : VPackArrayIterator(planIndexes)) { + auto idPlanSlice = index.get("id"); + if (idPlanSlice.isString() && idPlanSlice.copyString() == idString) { + LOG_TOPIC(ERR, Logger::CLUSTER) << "HAS PLAN " << index; + planValue.reset(new VPackBuilder()); + planValue->add(index); + break; + } + } + } + + if (!planValue) { + // hmm :S both empty :S did somebody else clean up? :S + // should not happen? + return errorCode; + } + std::string const planIndexesKey = "Plan/Collections/" + databaseName + "/" + collectionID +"/indexes"; + std::vector operations; + std::vector preconditions; + if (planValue) { + AgencyOperation planEraser(planIndexesKey, AgencyValueOperationType::ERASE, planValue->slice()); + TRI_ASSERT(oldPlanIndexes); + AgencyPrecondition planPrecondition(planIndexesKey, AgencyPrecondition::Type::VALUE, oldPlanIndexes->slice()); + operations.push_back(planEraser); + operations.push_back(AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP)); + preconditions.push_back(planPrecondition); + } + + AgencyWriteTransaction trx(operations, preconditions); + AgencyCommResult eraseResult = _agency.sendTransactionWithFailover(trx, 0.0); + + if (eraseResult.successful()) { + return errorCode; + } + std::chrono::duration waitTime(10); + std::this_thread::sleep_for(waitTime); + } while (++tries < 5); + + LOG_TOPIC(ERR, Logger::CLUSTER) << "Couldn't roll back index creation of " << idString << ". Database: " << databaseName << ", Collection " << collectionID; + return errorCode; +} + +int ClusterInfo::ensureIndexCoordinatorWithoutRollback( + std::string const& databaseName, std::string const& collectionID, + std::string const& idString, VPackSlice const& slice, bool create, + bool (*compare)(VPackSlice const&, VPackSlice const&), + VPackBuilder& resultBuilder, std::string& errorMsg, double timeout) { + AgencyComm ac; + + double const realTimeout = getTimeout(timeout); + double const endTime = TRI_microtime() + realTimeout; + double const interval = getPollInterval(); + TRI_ASSERT(resultBuilder.isEmpty()); std::string const key = @@ -1639,8 +1709,6 @@ int ClusterInfo::ensureIndexCoordinator( std::shared_ptr c = getCollection(databaseName, collectionID); - READ_LOCKER(readLocker, _planProt.lock); - if (c == nullptr) { return setErrormsg(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, errorMsg); } @@ -1705,8 +1773,6 @@ int ClusterInfo::ensureIndexCoordinator( return setErrormsg(TRI_ERROR_CLUSTER_AGENCY_STRUCTURE_INVALID, errorMsg); } - std::string const idString = arangodb::basics::StringUtils::itoa(iid); - try { VPackObjectBuilder b(newBuilder.get()); // Create a new collection VPack with the new Index @@ -1833,6 +1899,9 @@ int ClusterInfo::ensureIndexCoordinator( // local variables. Therefore we have to protect all accesses to them // by a mutex. We use the mutex of the condition variable in the // AgencyCallback for this. + std::string where = + "Current/Collections/" + databaseName + "/" + collectionID; + auto agencyCallback = std::make_shared(ac, where, dbServerChanged, true, false); _agencyCallbackRegistry->registerCallback(agencyCallback); diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 315cd16b37..e807066741 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -538,6 +538,17 @@ class ClusterInfo { double getReloadServerListTimeout() const { return 60.0; } + ////////////////////////////////////////////////////////////////////////////// + /// @brief ensure an index in coordinator. + ////////////////////////////////////////////////////////////////////////////// + + int ensureIndexCoordinatorWithoutRollback( + std::string const& databaseName, std::string const& collectionID, + std::string const& idSlice, arangodb::velocypack::Slice const& slice, bool create, + bool (*compare)(arangodb::velocypack::Slice const&, + arangodb::velocypack::Slice const&), + arangodb::velocypack::Builder& resultBuilder, std::string& errorMsg, double timeout); + ////////////////////////////////////////////////////////////////////////////// /// @brief object for agency communication //////////////////////////////////////////////////////////////////////////////