diff --git a/Documentation/Books/Manual/Architecture/Replication/README.md b/Documentation/Books/Manual/Architecture/Replication/README.md index fdc1f5e4e5..c20300f0d4 100644 --- a/Documentation/Books/Manual/Architecture/Replication/README.md +++ b/Documentation/Books/Manual/Architecture/Replication/README.md @@ -38,6 +38,10 @@ factor. The number of _followers_ can be controlled using the `replicationFactor` parameter is the total number of copies being kept, that is, it is one plus the number of _followers_. +In addition to the `replicationFactor` we have a `minReplicationFactor` +the locks down a collection as soon as we have lost too many followers. + + Asynchronous replication ------------------------ diff --git a/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md b/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md index 4ad54622d5..022d8675d5 100644 --- a/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md +++ b/Documentation/Books/Manual/DataModeling/Collections/DatabaseMethods.md @@ -164,6 +164,15 @@ to the [naming conventions](../NamingConventions/README.md). dramatically when using joins in AQL at the costs of reduced write performance on these collections. +- *minReplicationFactor* (optional, default is 1): in a cluster, this + attribute determines how many copies of each shard are required + to be in sync on the different DBServers. If we have less then these + many copies in the cluster a shard will refuse to write. The + minReplicationFactor can not be larger than replicationFactor. + Please note: during server failures this might lead to writes + not beeing possible until the failover is sorted out and might cause + write slow downs in trade of data durability. + - *distributeShardsLike*: distribute the shards of this collection cloning the shard distribution of another. If this value is set, it will copy the attributes *replicationFactor*, *numberOfShards* and diff --git a/Documentation/DocuBlocks/Rest/Collections/1_structs.md b/Documentation/DocuBlocks/Rest/Collections/1_structs.md index 58aecc3912..e533e39128 100644 --- a/Documentation/DocuBlocks/Rest/Collections/1_structs.md +++ b/Documentation/DocuBlocks/Rest/Collections/1_structs.md @@ -43,6 +43,10 @@ determine the target shard for documents; *Cluster specific attribute.* @RESTSTRUCT{replicationFactor,collection_info,integer,optional,} contains how many copies of each shard are kept on different DBServers.; *Cluster specific attribute.* +@RESTSTRUCT{minReplicationFactor,collection_info,integer,optional,} +contains how many minimal copies of each shard are kept on different DBServers. +The shards will refuse to write, if we have less then these many copies in sync.; *Cluster specific attribute.* + @RESTSTRUCT{shardingStrategy,collection_info,string,optional,} the sharding strategy selected for the collection; *Cluster specific attribute.* One of 'hash' or 'enterprise-hash-smart-edge' diff --git a/Documentation/DocuBlocks/Rest/Graph/1_structs.md b/Documentation/DocuBlocks/Rest/Graph/1_structs.md index c4626cbba8..e981535142 100644 --- a/Documentation/DocuBlocks/Rest/Graph/1_structs.md +++ b/Documentation/DocuBlocks/Rest/Graph/1_structs.md @@ -25,6 +25,11 @@ concurrent modifications to this graph. @RESTSTRUCT{replicationFactor,graph_representation,integer,required,} The replication factor used for every new collection in the graph. +@RESTSTRUCT{minReplicationFactor,graph_representation,integer,optional,} +The minimal replication factor used for every new collection in the graph. +If one shard has less then minimal replication factor copies, we cannot write +to this shard, but to all others. + @RESTSTRUCT{isSmart,graph_representation,boolean,required,} Flag if the graph is a SmartGraph (Enterprise Edition only) or not. diff --git a/Documentation/DocuBlocks/Rest/Graph/general_graph_create_http_examples.md b/Documentation/DocuBlocks/Rest/Graph/general_graph_create_http_examples.md index a153ee7625..2917be25b4 100644 --- a/Documentation/DocuBlocks/Rest/Graph/general_graph_create_http_examples.md +++ b/Documentation/DocuBlocks/Rest/Graph/general_graph_create_http_examples.md @@ -42,6 +42,11 @@ Cannot be modified later. @RESTSTRUCT{replicationFactor,post_api_gharial_create_opts,integer,required,} The replication factor used when initially creating collections for this graph. +@RESTSTRUCT{minReplicationFactor,post_api_gharial_create_opts,integer,optional,} +The minimal replication factor used for every new collection in the graph. +If one shard has less then minimal replication factor copies, we cannot write +to this shard, but to all others. + @RESTRETURNCODES @RESTRETURNCODE{201} diff --git a/Documentation/DocuBlocks/collectionProperties.md b/Documentation/DocuBlocks/collectionProperties.md index cc2a9fb312..78ed0e54e1 100644 --- a/Documentation/DocuBlocks/collectionProperties.md +++ b/Documentation/DocuBlocks/collectionProperties.md @@ -52,7 +52,11 @@ In a cluster setup, the result will also contain the following attributes: determine the target shard for documents. * *replicationFactor*: determines how many copies of each shard are kept - on different DBServers. + on different DBServers. Has to be in the range of 1-10 *(Cluster only)* + + * *minReplicationFactor* : determines the number of minimal shard copies kept on + different DBServers, a shard will refuse to write, if less then this amount + of copies are in sync. Has to be in the range of 1-replicationFactor *(Cluster only)* * *shardingStrategy*: the sharding strategy selected for the collection. This attribute will only be populated in cluster mode and is not populated @@ -77,6 +81,10 @@ one or more of the following attribute(s): different DBServers, valid values are integer numbers in the range of 1-10 *(Cluster only)* +* *minReplicationFactor* : Change the number of minimal shard copies kept on + different DBServers, a shard will refuse to write, if less then this amount + of copies are in sync. Has to be in the range of 1-replicationFactor *(Cluster only)* + **Note**: some other collection properties, such as *type*, *isVolatile*, *keyOptions*, *numberOfShards* or *shardingStrategy* cannot be changed once the collection is created. diff --git a/arangod/Agency/Job.cpp b/arangod/Agency/Job.cpp index 72c5ebae0f..3868b63d8b 100644 --- a/arangod/Agency/Job.cpp +++ b/arangod/Agency/Job.cpp @@ -95,21 +95,22 @@ bool Job::finish(std::string const& server, std::string const& shard, try { jobType = pending.slice()[0].get("type").copyString(); } catch (std::exception const&) { - LOG_TOPIC("76352", WARN, Logger::AGENCY) << "Failed to obtain type of job " << _jobId; + LOG_TOPIC("76352", WARN, Logger::AGENCY) + << "Failed to obtain type of job " << _jobId; } // Additional payload, which is to be executed in the finish transaction Slice operations = Slice::emptyObjectSlice(); - Slice preconditions = Slice::emptyObjectSlice(); + Slice preconditions = Slice::emptyObjectSlice(); if (payload != nullptr) { Slice slice = payload->slice(); TRI_ASSERT(slice.isObject() || slice.isArray()); - if (slice.isObject()) { // opers only + if (slice.isObject()) { // opers only operations = slice; TRI_ASSERT(operations.isObject()); } else { - TRI_ASSERT(slice.length() < 3); // opers + precs only + TRI_ASSERT(slice.length() < 3); // opers + precs only if (slice.length() > 0) { operations = slice[0]; TRI_ASSERT(operations.isObject()); @@ -125,7 +126,7 @@ bool Job::finish(std::string const& server, std::string const& shard, { VPackArrayBuilder guard(&finished); - { // operations -- + { // operations -- VPackObjectBuilder operguard(&finished); addPutJobIntoSomewhere(finished, success ? "Finished" : "Failed", @@ -148,15 +149,14 @@ bool Job::finish(std::string const& server, std::string const& shard, addReleaseShard(finished, shard); } - } // -- operations + } // -- operations - if (preconditions.isObject() && preconditions.length() > 0) { // preconditions -- + if (preconditions.isObject() && preconditions.length() > 0) { // preconditions -- VPackObjectBuilder precguard(&finished); for (auto const& prec : VPackObjectIterator(preconditions)) { finished.add(prec.key.copyString(), prec.value); } - } // -- preconditions - + } // -- preconditions } write_ret_t res = singleWriteTransaction(_agent, finished, false); @@ -168,16 +168,16 @@ bool Job::finish(std::string const& server, std::string const& shard, } } catch (std::exception const& e) { LOG_TOPIC("1fead", WARN, Logger::AGENCY) - << "Caught exception in finish, message: " << e.what(); + << "Caught exception in finish, message: " << e.what(); } catch (...) { LOG_TOPIC("7762f", WARN, Logger::AGENCY) - << "Caught unspecified exception in finish."; + << "Caught unspecified exception in finish."; } return false; } std::string Job::randomIdleAvailableServer(Node const& snap, - std::vector const& exclude) { + std::vector const& exclude) { std::vector as = availableServers(snap); std::string ret; @@ -189,11 +189,11 @@ std::string Job::randomIdleAvailableServer(Node const& snap, for (auto const& srv : snap.hasAsChildren(healthPrefix).first) { // ignore excluded servers if (std::find(std::begin(exclude), std::end(exclude), srv.first) != std::end(exclude)) { - continue ; + continue; } // ignore servers not in availableServers above: if (std::find(std::begin(as), std::end(as), srv.first) == std::end(as)) { - continue ; + continue; } std::string const& status = (*srv.second).hasAsString("Status").first; @@ -242,7 +242,7 @@ size_t Job::countGoodOrBadServersInList(Node const& snap, VPackSlice const& serv auto const& health = snap.hasAsChildren(healthPrefix); // Do we have a Health substructure? if (health.second) { - Node::Children const& healthData = health.first; // List of servers in Health + Node::Children const& healthData = health.first; // List of servers in Health for (VPackSlice const serverName : VPackArrayIterator(serverList)) { if (serverName.isString()) { // serverName not a string? Then don't count @@ -269,13 +269,14 @@ size_t Job::countGoodOrBadServersInList(Node const& snap, VPackSlice const& serv } // The following counts in a given server list how many of the servers are - // in Status "GOOD" or "BAD". -size_t Job::countGoodOrBadServersInList(Node const& snap, std::vector const& serverList) { +// in Status "GOOD" or "BAD". +size_t Job::countGoodOrBadServersInList(Node const& snap, + std::vector const& serverList) { size_t count = 0; auto const& health = snap.hasAsChildren(healthPrefix); // Do we have a Health substructure? if (health.second) { - Node::Children const& healthData = health.first; // List of servers in Health + Node::Children const& healthData = health.first; // List of servers in Health for (auto& serverStr : serverList) { // Now look up this server: auto it = healthData.find(serverStr); @@ -294,7 +295,8 @@ size_t Job::countGoodOrBadServersInList(Node const& snap, std::vector Job::clones(Node const& snapshot, std::string const& d for (const auto& colptr : snapshot.hasAsChildren(databasePath).first) { // collections - auto const &col = *colptr.second; - auto const &otherCollection = colptr.first; + auto const& col = *colptr.second; + auto const& otherCollection = colptr.first; if (otherCollection != collection && col.has("distributeShardsLike") && // use .has() form to prevent logging of missing col.hasAsSlice("distributeShardsLike").first.copyString() == collection) { auto const& theirshards = sortedShardList(col.hasAsNode("shards").first); if (theirshards.size() > 0) { // do not care about virtual collections if (theirshards.size() == myshards.size()) { - ret.emplace_back(otherCollection, - theirshards[steps]); + ret.emplace_back(otherCollection, theirshards[steps]); } else { LOG_TOPIC("3092e", ERR, Logger::SUPERVISION) << "Shard distribution of clone(" << otherCollection @@ -452,10 +453,11 @@ std::string Job::findNonblockedCommonHealthyInSyncFollower( // Which is in "GOO std::unordered_map currentServers; for (const auto& clone : cs) { - auto currentShardPath = curColPrefix + db + "/" + clone.collection + "/" + - clone.shard + "/servers"; - auto plannedShardPath = - planColPrefix + db + "/" + clone.collection + "/shards/" + clone.shard; + auto sharedPath = db + "/" + clone.collection + "/"; + auto currentShardPath = curColPrefix + sharedPath + clone.shard + "/servers"; + auto currentFailoverCandidatesPath = + curColPrefix + sharedPath + clone.shard + "/servers"; + auto plannedShardPath = planColPrefix + sharedPath + "shards/" + clone.shard; size_t i = 0; // start up race condition ... current might not have everything in plan @@ -464,13 +466,30 @@ std::string Job::findNonblockedCommonHealthyInSyncFollower( // Which is in "GOO continue; } // if - for (const auto& server : - VPackArrayIterator(snap.hasAsArray(currentShardPath).first)) { - auto id = server.copyString(); + bool isArray = false; + VPackSlice serverList; + // If we do have failover candidates, we should use them + std::tie(serverList, isArray) = snap.hasAsArray(currentFailoverCandidatesPath); + if (!isArray) { + // We have old DBServers that do not report failover candidates, + // Need to rely on current + std::tie(serverList, isArray) = snap.hasAsArray(currentShardPath); + TRI_ASSERT(isArray); + if (!isArray) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_SUPERVISION_GENERAL_FAILURE, + "Could not find common insync server for: " + currentShardPath + + ", value is not an array."); + } + } + // Guarantieed by if above + TRI_ASSERT(serverList.isArray()); + for (const auto& server : VPackArrayIterator(serverList)) { if (i++ == 0) { // Skip leader continue; } + auto id = server.copyString(); if (!good[id]) { // Skip unhealthy servers @@ -550,9 +569,9 @@ bool Job::abortable(Node const& snapshot, std::string const& jobId) { return false; } -void Job::doForAllShards(Node const& snapshot, std::string& database, - std::vector& shards, - std::function worker) { +void Job::doForAllShards( + Node const& snapshot, std::string& database, std::vector& shards, + std::function worker) { for (auto const& collShard : shards) { std::string shard = collShard.shard; std::string collection = collShard.collection; diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index be99906e17..8ce92abac7 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -49,9 +49,7 @@ class RestAqlHandler : public RestVocbaseBaseHandler { public: char const* name() const override final { return "RestAqlHandler"; } - RequestLane lane() const override final { - return RequestLane::CLUSTER_INTERNAL; - } + RequestLane lane() const override final { return RequestLane::CLUSTER_AQL; } RestStatus execute() override; RestStatus continueExecute() override; diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index dc3d14c7d0..29a56c4258 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -165,6 +165,30 @@ class CollectionInfoCurrent { return v; } + ////////////////////////////////////////////////////////////////////////////// + /// @brief returns the current failover candidates for the given shard + ////////////////////////////////////////////////////////////////////////////// + + TEST_VIRTUAL std::vector failoverCandidates(ShardID const& shardID) const { + std::vector v; + + auto it = _vpacks.find(shardID); + if (it != _vpacks.end()) { + VPackSlice slice = it->second->slice(); + + VPackSlice servers = slice.get(StaticStrings::FailoverCandidates); + if (servers.isArray()) { + for (auto const& server : VPackArrayIterator(servers)) { + TRI_ASSERT(server.isString()); + if (server.isString()) { + v.push_back(server.copyString()); + } + } + } + } + return v; + } + ////////////////////////////////////////////////////////////////////////////// /// @brief returns the errorMessage entry for one shardID ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/CreateCollection.cpp b/arangod/Cluster/CreateCollection.cpp index dfc971ae67..640d4ea1ef 100644 --- a/arangod/Cluster/CreateCollection.cpp +++ b/arangod/Cluster/CreateCollection.cpp @@ -93,7 +93,8 @@ CreateCollection::CreateCollection(MaintenanceFeature& feature, ActionDescriptio TRI_ASSERT(type == TRI_COL_TYPE_DOCUMENT || type == TRI_COL_TYPE_EDGE); if (!error.str().empty()) { - LOG_TOPIC("7c60f", ERR, Logger::MAINTENANCE) << "CreateCollection: " << error.str(); + LOG_TOPIC("7c60f", ERR, Logger::MAINTENANCE) + << "CreateCollection: " << error.str(); _result.reset(TRI_ERROR_INTERNAL, error.str()); setState(FAILED); } @@ -156,10 +157,12 @@ bool CreateCollection::first() { LOG_TOPIC("9db9a", DEBUG, Logger::MAINTENANCE) << "local collection " << database << "/" << shard << " successfully created"; - col->followers()->setTheLeader(leader); if (leader.empty()) { - col->followers()->clear(); + std::vector noFollowers; + col->followers()->takeOverLeadership(noFollowers); + } else { + col->followers()->setTheLeader(leader); } }); @@ -186,7 +189,7 @@ bool CreateCollection::first() { } LOG_TOPIC("4562c", DEBUG, Logger::MAINTENANCE) - << "Create collection done, notifying Maintenance"; + << "Create collection done, notifying Maintenance"; notify(); diff --git a/arangod/Cluster/DBServerAgencySync.cpp b/arangod/Cluster/DBServerAgencySync.cpp index fcd7d836b1..cf0d01aa62 100644 --- a/arangod/Cluster/DBServerAgencySync.cpp +++ b/arangod/Cluster/DBServerAgencySync.cpp @@ -70,20 +70,19 @@ Result DBServerAgencySync::getLocalCollections(VPackBuilder& collections) { } if (dbfeature == nullptr) { - LOG_TOPIC("d0ef2", ERR, Logger::HEARTBEAT) << "Failed to get feature database"; + LOG_TOPIC("d0ef2", ERR, Logger::HEARTBEAT) + << "Failed to get feature database"; return Result(TRI_ERROR_INTERNAL, "Failed to get feature database"); } VPackObjectBuilder c(&collections); - + dbfeature->enumerateDatabases([&](TRI_vocbase_t& vocbase) { if (!vocbase.use()) { return; } - auto unuse = scopeGuard([&vocbase] { - vocbase.release(); - }); - + auto unuse = scopeGuard([&vocbase] { vocbase.release(); }); + collections.add(VPackValue(vocbase.name())); VPackObjectBuilder db(&collections); @@ -100,9 +99,8 @@ Result DBServerAgencySync::getLocalCollections(VPackBuilder& collections) { // generate a collection definition identical to that which would be // persisted in the case of SingleServer collection->properties(collections, - LogicalDataSource::makeFlags( - LogicalDataSource::Serialize::Detailed, - LogicalDataSource::Serialize::ForPersistence)); + LogicalDataSource::makeFlags(LogicalDataSource::Serialize::Detailed, + LogicalDataSource::Serialize::ForPersistence)); auto const& folls = collection->followers(); std::string const theLeader = folls->getLeader(); @@ -119,19 +117,7 @@ Result DBServerAgencySync::getLocalCollections(VPackBuilder& collections) { // we are the leader ourselves // In this case we report our in-sync followers here in the format // of the agency: [ leader, follower1, follower2, ... ] - collections.add(VPackValue("servers")); - - { - VPackArrayBuilder guard(&collections); - - collections.add(VPackValue(arangodb::ServerState::instance()->getId())); - - std::shared_ptr const> srvs = folls->get(); - - for (auto const& s : *srvs) { - collections.add(VPackValue(s)); - } - } + folls->injectFollowerInfo(collections); } } } @@ -151,14 +137,20 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { LOG_TOPIC("62fd8", DEBUG, Logger::MAINTENANCE) << "DBServerAgencySync::execute starting"; - + DBServerAgencySyncResult result; auto* sysDbFeature = application_features::ApplicationServer::lookupFeature(); MaintenanceFeature* mfeature = ApplicationServer::getFeature("Maintenance"); + if (mfeature == nullptr) { + LOG_TOPIC("3a1f7", ERR, Logger::MAINTENANCE) + << "Could not load maintenance feature, can happen during shutdown."; + result.success = false; + result.errorMessage = "Could not load maintenance feature"; + return result; + } arangodb::SystemDatabaseFeature::ptr vocbase = sysDbFeature ? sysDbFeature->use() : nullptr; - DBServerAgencySyncResult result; if (vocbase == nullptr) { LOG_TOPIC("18d67", DEBUG, Logger::MAINTENANCE) @@ -196,20 +188,21 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { VPackObjectBuilder o(&rb); auto startTimePhaseOne = std::chrono::steady_clock::now(); - LOG_TOPIC("19aaf", DEBUG, Logger::MAINTENANCE) << "DBServerAgencySync::phaseOne"; + LOG_TOPIC("19aaf", DEBUG, Logger::MAINTENANCE) + << "DBServerAgencySync::phaseOne"; tmp = arangodb::maintenance::phaseOne(plan->slice(), local.slice(), serverId, *mfeature, rb); auto endTimePhaseOne = std::chrono::steady_clock::now(); LOG_TOPIC("93f83", DEBUG, Logger::MAINTENANCE) << "DBServerAgencySync::phaseOne done"; - if (endTimePhaseOne - startTimePhaseOne > - std::chrono::milliseconds(200)) { + if (endTimePhaseOne - startTimePhaseOne > std::chrono::milliseconds(200)) { // We take this as indication that many shards are in the system, // in this case: give some asynchronous jobs created in phaseOne a // chance to complete before we collect data for phaseTwo: LOG_TOPIC("ef730", DEBUG, Logger::MAINTENANCE) - << "DBServerAgencySync::hesitating between phases 1 and 2 for 0.1s..."; + << "DBServerAgencySync::hesitating between phases 1 and 2 for " + "0.1s..."; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -224,6 +217,8 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { LOG_TOPIC("675fd", TRACE, Logger::MAINTENANCE) << "DBServerAgencySync::phaseTwo - current state: " << current->toJson(); + mfeature->increaseCurrentCounter(); + local.clear(); glc = getLocalCollections(local); // We intentionally refetch local collections here, such that phase 2 @@ -237,7 +232,8 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { return result; } - LOG_TOPIC("652ff", DEBUG, Logger::MAINTENANCE) << "DBServerAgencySync::phaseTwo"; + LOG_TOPIC("652ff", DEBUG, Logger::MAINTENANCE) + << "DBServerAgencySync::phaseTwo"; tmp = arangodb::maintenance::phaseTwo(plan->slice(), current->slice(), local.slice(), serverId, *mfeature, rb); @@ -246,7 +242,8 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { << "DBServerAgencySync::phaseTwo done"; } catch (std::exception const& e) { - LOG_TOPIC("cd308", ERR, Logger::MAINTENANCE) << "Failed to handle plan change: " << e.what(); + LOG_TOPIC("cd308", ERR, Logger::MAINTENANCE) + << "Failed to handle plan change: " << e.what(); } if (rb.isClosed()) { @@ -268,18 +265,17 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { if (ao.value.hasKey("precondition")) { auto const precondition = ao.value.get("precondition"); - preconditions.push_back( - AgencyPrecondition( - precondition.keyAt(0).copyString(), AgencyPrecondition::Type::VALUE, precondition.valueAt(0))); + preconditions.push_back(AgencyPrecondition(precondition.keyAt(0).copyString(), + AgencyPrecondition::Type::VALUE, + precondition.valueAt(0))); } - + if (op == "set") { auto const value = ao.value.get("payload"); operations.push_back(AgencyOperation(key, AgencyValueOperationType::SET, value)); } else if (op == "delete") { operations.push_back(AgencyOperation(key, AgencySimpleOperationType::DELETE_OP)); } - } operations.push_back(AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP)); @@ -288,9 +284,8 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { AgencyCommResult r = comm.sendTransactionWithFailover(currentTransaction); if (!r.successful()) { LOG_TOPIC("d73b8", INFO, Logger::MAINTENANCE) - << "Error reporting to agency: _statusCode: " << r.errorCode() - << " message: " << r.errorMessage() - << ". This can be ignored, since it will be retried automatically."; + << "Error reporting to agency: _statusCode: " << r.errorCode() + << " message: " << r.errorMessage() << ". This can be ignored, since it will be retried automatically."; } else { LOG_TOPIC("9b0b3", DEBUG, Logger::MAINTENANCE) << "Invalidating current in ClusterInfo"; @@ -317,22 +312,23 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { result.errorMessage = "Report from phase 1 and 2 was no object."; try { std::string json = report.toJson(); - LOG_TOPIC("65fde", WARN, Logger::MAINTENANCE) << "Report from phase 1 and 2 was: " << json; - } catch(std::exception const& exc) { + LOG_TOPIC("65fde", WARN, Logger::MAINTENANCE) + << "Report from phase 1 and 2 was: " << json; + } catch (std::exception const& exc) { LOG_TOPIC("54de2", WARN, Logger::MAINTENANCE) - << "Report from phase 1 and 2 could not be dumped to JSON, error: " - << exc.what() << ", head byte:" << report.head(); + << "Report from phase 1 and 2 could not be dumped to JSON, error: " + << exc.what() << ", head byte:" << report.head(); uint64_t l = 0; try { l = report.byteSize(); LOG_TOPIC("54dda", WARN, Logger::MAINTENANCE) - << "Report from phase 1 and 2, byte size: " << l; + << "Report from phase 1 and 2, byte size: " << l; LOG_TOPIC("67421", WARN, Logger::MAINTENANCE) - << "Bytes: " - << arangodb::basics::StringUtils::encodeHex((char const*) report.start(), l); - } catch(...) { + << "Bytes: " + << arangodb::basics::StringUtils::encodeHex((char const*)report.start(), l); + } catch (...) { LOG_TOPIC("76124", WARN, Logger::MAINTENANCE) - << "Report from phase 1 and 2, byte size throws."; + << "Report from phase 1 and 2, byte size throws."; } } } @@ -342,9 +338,10 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { auto took = duration(clock::now() - start).count(); if (took > 30.0) { - LOG_TOPIC("83cb8", WARN, Logger::MAINTENANCE) << "DBServerAgencySync::execute " - "took " - << took << " s to execute handlePlanChange"; + LOG_TOPIC("83cb8", WARN, Logger::MAINTENANCE) + << "DBServerAgencySync::execute " + "took " + << took << " s to execute handlePlanChange"; } return result; diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index fe85ddd41d..b9c416ae93 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -25,6 +25,7 @@ #include "FollowerInfo.h" #include "ApplicationFeatures/ApplicationServer.h" +#include "Cluster/MaintenanceStrings.h" #include "Cluster/ServerState.h" #include "VocBase/LogicalCollection.h" @@ -32,56 +33,12 @@ using namespace arangodb; -//////////////////////////////////////////////////////////////////////////////// -/// @brief change JSON under -/// Current/Collection/// -/// to add or remove a serverID, if add flag is true, the entry is added -/// (if it is not yet there), otherwise the entry is removed (if it was -/// there). -//////////////////////////////////////////////////////////////////////////////// - -static VPackBuilder newShardEntry(VPackSlice oldValue, ServerID const& sid, bool add) { - VPackBuilder newValue; - VPackSlice servers; - { - VPackObjectBuilder b(&newValue); - // Now need to find the `servers` attribute, which is a list: - for (auto const& it : VPackObjectIterator(oldValue)) { - if (it.key.isEqualString("servers")) { - servers = it.value; - } else { - newValue.add(it.key); - newValue.add(it.value); - } - } - newValue.add(VPackValue("servers")); - if (servers.isArray() && servers.length() > 0) { - VPackArrayBuilder bb(&newValue); - newValue.add(servers[0]); - VPackArrayIterator it(servers); - bool done = false; - for (++it; it.valid(); ++it) { - if ((*it).isEqualString(sid)) { - if (add) { - newValue.add(*it); - done = true; - } - } else { - newValue.add(*it); - } - } - if (add && !done) { - newValue.add(VPackValue(sid)); - } - } else { - VPackArrayBuilder bb(&newValue); - newValue.add(VPackValue(ServerState::instance()->getId())); - if (add) { - newValue.add(VPackValue(sid)); - } - } +static std::string const inline reportName(bool isRemove) { + if (isRemove) { + return "FollowerInfo::remove"; + } else { + return "FollowerInfo::add"; } - return newValue; } static std::string CurrentShardPath(arangodb::LogicalCollection& col) { @@ -136,6 +93,15 @@ Result FollowerInfo::add(ServerID const& sid) { v = std::make_shared>(*_followers); v->push_back(sid); // add a single entry _followers = v; // will cast to std::vector const + { + // insertIntoCandidates + if (std::find(_failoverCandidates->begin(), _failoverCandidates->end(), sid) == + _failoverCandidates->end()) { + auto nextCandidates = std::make_shared>(*_failoverCandidates); + nextCandidates->push_back(sid); // add a single entry + _failoverCandidates = nextCandidates; // will cast to std::vector const + } + } #ifdef DEBUG_SYNC_REPLICATION if (!AgencyCommManager::MANAGER) { return {TRI_ERROR_NO_ERROR}; @@ -144,23 +110,15 @@ Result FollowerInfo::add(ServerID const& sid) { } // Now tell the agency - TRI_ASSERT(_docColl != nullptr); - std::string curPath = CurrentShardPath(*_docColl); - std::string planPath = PlanShardPath(*_docColl); - AgencyComm ac; - double startTime = TRI_microtime(); - do { - AgencyReadTransaction trx(std::vector( - {AgencyCommManager::path(planPath), AgencyCommManager::path(curPath)})); - AgencyCommResult res = ac.sendTransactionWithFailover(trx); - - if (res.successful()) { - TRI_ASSERT(res.slice().isArray() && res.slice().length() == 1); - VPackSlice resSlice = res.slice()[0]; - // Let's look at the results, note that both can be None! - velocypack::Slice planEntry = PlanShardEntry(*_docColl, resSlice); - velocypack::Slice currentEntry = CurrentShardEntry(*_docColl, resSlice); + auto agencyRes = persistInAgency(false); + if (agencyRes.ok() || agencyRes.is(TRI_ERROR_CLUSTER_NOT_LEADER)) { + // Not a leader is expected + return agencyRes; + } + // Real error, report +<<<<<<< HEAD +======= if (!currentEntry.isObject()) { LOG_TOPIC("b753d", ERR, Logger::CLUSTER) << "FollowerInfo::add, did not find object in " << curPath; @@ -210,14 +168,15 @@ Result FollowerInfo::add(ServerID const& sid) { int errorCode = (application_features::ApplicationServer::isStopping()) ? TRI_ERROR_SHUTTING_DOWN : TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED; +>>>>>>> c922c5f1332482ef29dff794d8af394d31c1b737 std::string errorMessage = "unable to add follower in agency, timeout in agency CAS operation for " "key " + _docColl->vocbase().name() + "/" + std::to_string(_docColl->planId()) + - ": " + TRI_errno_string(errorCode); + ": " + TRI_errno_string(agencyRes.errorNumber()); LOG_TOPIC("6295b", ERR, Logger::CLUSTER) << errorMessage; - - return {errorCode, std::move(errorMessage)}; + agencyRes.reset(agencyRes.errorNumber(), std::move(errorMessage)); + return agencyRes; } //////////////////////////////////////////////////////////////////////////////// @@ -246,46 +205,192 @@ Result FollowerInfo::remove(ServerID const& sid) { << "Removing follower " << sid << " from " << _docColl->name(); MUTEX_LOCKER(locker, _agencyMutex); + WRITE_LOCKER(canWriteLocker, _canWriteLock); WRITE_LOCKER(writeLocker, _dataLock); // the data lock has to be locked until this function completes // because if the agency communication does not work // local data is modified again. // First check if there is anything to do: - bool found = false; - for (auto const& s : *_followers) { - if (s == sid) { - found = true; - break; - } - } - if (!found) { + if (std::find(_followers->begin(), _followers->end(), sid) == _followers->end()) { + TRI_ASSERT(std::find(_failoverCandidates->begin(), _failoverCandidates->end(), + sid) == _failoverCandidates->end()); return {TRI_ERROR_NO_ERROR}; // nothing to do } - - auto v = std::make_shared>(); - if (_followers->size() > 0) { + // Both lists have to be in sync at any time! + TRI_ASSERT(std::find(_failoverCandidates->begin(), _failoverCandidates->end(), + sid) != _failoverCandidates->end()); + auto oldFollowers = _followers; + auto oldFailovers = _failoverCandidates; + { + auto v = std::make_shared>(); + TRI_ASSERT(!_followers->empty()); // well we found the element above \o/ v->reserve(_followers->size() - 1); - for (auto const& i : *_followers) { - if (i != sid) { - v->push_back(i); - } - } + std::remove_copy(_followers->begin(), _followers->end(), + std::back_inserter(*v.get()), sid); + _followers = v; // will cast to std::vector const + } + { + auto v = std::make_shared>(); + TRI_ASSERT(!_failoverCandidates->empty()); // well we found the element above \o/ + v->reserve(_failoverCandidates->size() - 1); + std::remove_copy(_failoverCandidates->begin(), _failoverCandidates->end(), + std::back_inserter(*v.get()), sid); + _failoverCandidates = v; // will cast to std::vector const } - auto _oldFollowers = _followers; - _followers = v; // will cast to std::vector const #ifdef DEBUG_SYNC_REPLICATION if (!AgencyCommManager::MANAGER) { return {TRI_ERROR_NO_ERROR}; } #endif + Result agencyRes = persistInAgency(true); + if (agencyRes.ok()) { + // +1 for the leader (me) + if (_followers->size() + 1 < _docColl->minReplicationFactor()) { + _canWrite = false; + } + // we are finished + LOG_TOPIC("be0cb", DEBUG, Logger::CLUSTER) + << "Removing follower " << sid << " from " << _docColl->name() << "succeeded"; + return agencyRes; + } + if (agencyRes.is(TRI_ERROR_CLUSTER_NOT_LEADER)) { + // Next run in Maintenance will fix this. + return agencyRes; + } + // rollback: + _followers = oldFollowers; + _failoverCandidates = oldFailovers; + std::string errorMessage = + "unable to remove follower from agency, timeout in agency CAS operation " + "for key " + + _docColl->vocbase().name() + "/" + std::to_string(_docColl->planId()) + + ": " + TRI_errno_string(agencyRes.errorNumber()); + LOG_TOPIC("a0dcc", ERR, Logger::CLUSTER) << errorMessage; + agencyRes.resetErrorMessage(std::move(errorMessage)); + return agencyRes; +} + +////////////////////////////////////////////////////////////////////////////// +/// @brief clear follower list, no changes in agency necessary +////////////////////////////////////////////////////////////////////////////// + +void FollowerInfo::clear() { + WRITE_LOCKER(canWriteLocker, _canWriteLock); + WRITE_LOCKER(writeLocker, _dataLock); + _followers = std::make_shared>(); + _failoverCandidates = std::make_shared>(); + _canWrite = false; +} + +////////////////////////////////////////////////////////////////////////////// +/// @brief check whether the given server is a follower +////////////////////////////////////////////////////////////////////////////// + +bool FollowerInfo::contains(ServerID const& sid) const { + READ_LOCKER(readLocker, _dataLock); + auto const& f = *_followers; + return std::find(f.begin(), f.end(), sid) != f.end(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Take over leadership for this shard. +/// Also inject information of a insync followers that we knew about +/// before a failover to this server has happened +//////////////////////////////////////////////////////////////////////////////// + +void FollowerInfo::takeOverLeadership(std::vector const& previousInsyncFollowers) { + // This function copies over the information taken from the last CURRENT into a local vector. + // Where we remove the old leader and ourself from the list of followers + WRITE_LOCKER(canWriteLocker, _canWriteLock); + WRITE_LOCKER(writeLocker, _dataLock); + // Reset local structures, if we take over leadership we do not know anything! + _followers = std::make_shared>(); + _failoverCandidates = std::make_shared>(); + // We disallow writes until the first write. + _canWrite = false; + // Take over leadership + _theLeader = ""; + _theLeaderTouched = true; + TRI_ASSERT(_failoverCandidates != nullptr && _failoverCandidates->empty()); + if (previousInsyncFollowers.size() > 1) { + auto ourselves = arangodb::ServerState::instance()->getId(); + auto failoverCandidates = + std::make_shared>(previousInsyncFollowers); + auto myEntry = + std::find(failoverCandidates->begin(), failoverCandidates->end(), ourselves); + // We are a valid failover follower + TRI_ASSERT(myEntry != failoverCandidates->end()); + // The first server is a different leader! (For some reason the job can be + // triggered twice) TRI_ASSERT(myEntry != failoverCandidates->begin()); + failoverCandidates->erase(myEntry); + // Put us in front, put old leader somewhere, we do not really care + _failoverCandidates = failoverCandidates; + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Update the current information in the Agency. We update the failover- +/// list with the newest values, after this the guarantee is that +/// _followers == _failoverCandidates +//////////////////////////////////////////////////////////////////////////////// +bool FollowerInfo::updateFailoverCandidates() { + MUTEX_LOCKER(agencyLocker, _agencyMutex); + // Acquire _canWriteLock first + WRITE_LOCKER(canWriteLocker, _canWriteLock); + // Next acquire _dataLock + WRITE_LOCKER(dataLocker, _dataLock); + if (_canWrite) { +// Short circuit, we have multiple writes in the above write lock +// The first needs to do things and flips _canWrite +// All followers can return as soon as the lock is released +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + TRI_ASSERT(_failoverCandidates->size() == _followers->size()); + std::vector diff; + std::set_symmetric_difference(_failoverCandidates->begin(), + _failoverCandidates->end(), _followers->begin(), + _followers->end(), std::back_inserter(diff)); + TRI_ASSERT(diff.empty()); +#endif + return _canWrite; + } + TRI_ASSERT(_followers->size() + 1 >= _docColl->minReplicationFactor()); + // Update both lists (we use a copy here, as we are modifying them in other places individually!) + _failoverCandidates = std::make_shared const>(*_followers); + // Just be sure + TRI_ASSERT(_failoverCandidates.get() != _followers.get()); + TRI_ASSERT(_failoverCandidates->size() == _followers->size()); +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + std::vector diff; + std::set_symmetric_difference(_failoverCandidates->begin(), + _failoverCandidates->end(), _followers->begin(), + _followers->end(), std::back_inserter(diff)); + TRI_ASSERT(diff.empty()); +#endif + Result res = persistInAgency(true); + if (!res.ok()) { + // We could not persist the update in the agency. + // Collection left in RO mode. + LOG_TOPIC("7af00", INFO, Logger::CLUSTER) + << "Could not persist insync follower for " << _docColl->vocbase().name() + << "/" << std::to_string(_docColl->planId()) + << " keep RO-mode for now, next write will retry."; + TRI_ASSERT(!_canWrite); + } else { + _canWrite = true; + } + return _canWrite; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Persist information in Current +//////////////////////////////////////////////////////////////////////////////// +Result FollowerInfo::persistInAgency(bool isRemove) const { // Now tell the agency TRI_ASSERT(_docColl != nullptr); std::string curPath = CurrentShardPath(*_docColl); std::string planPath = PlanShardPath(*_docColl); - AgencyComm ac; - double startTime = TRI_microtime(); do { AgencyReadTransaction trx(std::vector( {AgencyCommManager::path(planPath), AgencyCommManager::path(curPath)})); @@ -299,7 +404,7 @@ Result FollowerInfo::remove(ServerID const& sid) { if (!currentEntry.isObject()) { LOG_TOPIC("01896", ERR, Logger::CLUSTER) - << "FollowerInfo::remove, did not find object in " << curPath; + << reportName(isRemove) << ", did not find object in " << curPath; if (!currentEntry.isNone()) { LOG_TOPIC("57c84", ERR, Logger::CLUSTER) << "Found: " << currentEntry.toJson(); } @@ -307,16 +412,16 @@ Result FollowerInfo::remove(ServerID const& sid) { if (!planEntry.isArray() || planEntry.length() == 0 || !planEntry[0].isString() || !planEntry[0].isEqualString(ServerState::instance()->getId())) { LOG_TOPIC("42231", INFO, Logger::CLUSTER) - << "FollowerInfo::remove, did not find myself in Plan: " - << _docColl->vocbase().name() << "/" - << std::to_string(_docColl->planId()) + << reportName(isRemove) + << ", did not find myself in Plan: " << _docColl->vocbase().name() + << "/" << std::to_string(_docColl->planId()) << " (can happen when the leader changed recently)."; if (!planEntry.isNone()) { LOG_TOPIC("ffede", INFO, Logger::CLUSTER) << "Found: " << planEntry.toJson(); } return {TRI_ERROR_CLUSTER_NOT_LEADER}; } else { - auto newValue = newShardEntry(currentEntry, sid, false); + auto newValue = newShardEntry(currentEntry); AgencyWriteTransaction trx; trx.preconditions.push_back( AgencyPrecondition(curPath, AgencyPrecondition::Type::VALUE, currentEntry)); @@ -328,19 +433,21 @@ Result FollowerInfo::remove(ServerID const& sid) { AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP)); AgencyCommResult res2 = ac.sendTransactionWithFailover(trx); if (res2.successful()) { - // we are finished - LOG_TOPIC("be0cb", DEBUG, Logger::CLUSTER) - << "Removing follower " << sid << " from " << _docColl->name() - << "succeeded"; return {TRI_ERROR_NO_ERROR}; } } } } else { LOG_TOPIC("b7333", WARN, Logger::CLUSTER) - << "FollowerInfo::remove, could not read " << planPath << " and " + << reportName(isRemove) << ", could not read " << planPath << " and " << curPath << " in agency."; } +<<<<<<< HEAD + using namespace std::chrono_literals; + std::this_thread::sleep_for(500ms); + } while (!application_features::ApplicationServer::isStopping()); + return TRI_ERROR_SHUTTING_DOWN; +======= std::this_thread::sleep_for(std::chrono::milliseconds(500)); } while (TRI_microtime() < startTime + 7200 && !application_features::ApplicationServer::isStopping()); @@ -367,23 +474,58 @@ Result FollowerInfo::remove(ServerID const& sid) { LOG_TOPIC("a0dcc", ERR, Logger::CLUSTER) << errorMessage; return {errorCode, std::move(errorMessage)}; +>>>>>>> c922c5f1332482ef29dff794d8af394d31c1b737 } -////////////////////////////////////////////////////////////////////////////// -/// @brief clear follower list, no changes in agency necessary -////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief inject the information about "servers" and "failoverCandidates" +//////////////////////////////////////////////////////////////////////////////// -void FollowerInfo::clear() { - WRITE_LOCKER(writeLocker, _dataLock); - _followers = std::make_shared>(); +void FollowerInfo::injectFollowerInfoInternal(VPackBuilder& builder) const { + auto ourselves = arangodb::ServerState::instance()->getId(); + TRI_ASSERT(builder.isOpenObject()); + builder.add(VPackValue(maintenance::SERVERS)); + { + VPackArrayBuilder bb(&builder); + builder.add(VPackValue(ourselves)); + for (auto const& f : *_followers) { + builder.add(VPackValue(f)); + } + } + builder.add(VPackValue(StaticStrings::FailoverCandidates)); + { + VPackArrayBuilder bb(&builder); + builder.add(VPackValue(ourselves)); + for (auto const& f : *_failoverCandidates) { + builder.add(VPackValue(f)); + } + } + TRI_ASSERT(builder.isOpenObject()); } -////////////////////////////////////////////////////////////////////////////// -/// @brief check whether the given server is a follower -////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief change JSON under +/// Current/Collection/// +/// to add or remove a serverID, if add flag is true, the entry is added +/// (if it is not yet there), otherwise the entry is removed (if it was +/// there). +//////////////////////////////////////////////////////////////////////////////// -bool FollowerInfo::contains(ServerID const& sid) const { - READ_LOCKER(readLocker, _dataLock); - auto const& f = *_followers; - return std::find(f.begin(), f.end(), sid) != f.end(); +VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const { + VPackBuilder newValue; + TRI_ASSERT(oldValue.isObject()); + { + VPackObjectBuilder b(&newValue); + // Copy all but SERVERS and FailoverCandidates. + // They will be injected later. + for (auto const& it : VPackObjectIterator(oldValue)) { + if (!it.key.isEqualString(maintenance::SERVERS) && + !it.key.isEqualString(StaticStrings::FailoverCandidates)) { + newValue.add(it.key); + newValue.add(it.value); + } + } + injectFollowerInfoInternal(newValue); + } + return newValue; } \ No newline at end of file diff --git a/arangod/Cluster/FollowerInfo.h b/arangod/Cluster/FollowerInfo.h index 917e9f2ae9..1f0280cbb0 100644 --- a/arangod/Cluster/FollowerInfo.h +++ b/arangod/Cluster/FollowerInfo.h @@ -25,11 +25,15 @@ #ifndef ARANGOD_CLUSTER_FOLLOWER_INFO_H #define ARANGOD_CLUSTER_FOLLOWER_INFO_H 1 +#include "ClusterInfo.h" + #include "Basics/Mutex.h" #include "Basics/ReadWriteLock.h" #include "Basics/Result.h" #include "Basics/WriteLocker.h" -#include "ClusterInfo.h" +#include "StorageEngine/EngineSelectorFeature.h" +#include "StorageEngine/StorageEngine.h" +#include "VocBase/LogicalCollection.h" namespace arangodb { @@ -44,22 +48,41 @@ class Slice; class FollowerInfo { // This is the list of real local followers std::shared_ptr const> _followers; + // This is the list of followers that have been insync BEFORE we + // triggered a failover to this server. + // The list is filled only temporarily, and will be deleted as + // soon as we can guarantee at least so many followers locally. + std::shared_ptr const> _failoverCandidates; // The agencyMutex is used to synchronise access to the agency. // the _dataLock is used to sync the access to local data. - // The agencyMutex is always locked before the _dataLock is locked. + // The _canWriteLock is used to protect flag if we do have enough followers + // The locking ordering to avoid dead locks has to be as follows: + // 1.) _agencyMutex + // 2.) _canWriteLock + // 3.) _dataLock mutable Mutex _agencyMutex; + mutable arangodb::basics::ReadWriteLock _canWriteLock; mutable arangodb::basics::ReadWriteLock _dataLock; + arangodb::LogicalCollection* _docColl; - std::string _theLeader; // if the latter is empty, then we are leading + std::string _theLeader; bool _theLeaderTouched; + // flag if we have enough insnc followers and can pass through writes + bool _canWrite; public: explicit FollowerInfo(arangodb::LogicalCollection* d) : _followers(std::make_shared>()), + _failoverCandidates(std::make_shared>()), _docColl(d), - _theLeaderTouched(false) {} + _theLeader(""), + _theLeaderTouched(false), + _canWrite(_docColl->replicationFactor() <= 1) { + // On replicationfactor 1 we do not have any failover servers to maintain. + // This should also disable satellite tracking. + } //////////////////////////////////////////////////////////////////////////////// /// @brief get information about current followers of a shard. @@ -70,6 +93,23 @@ class FollowerInfo { return _followers; } + //////////////////////////////////////////////////////////////////////////////// + /// @brief get information about current followers of a shard. + //////////////////////////////////////////////////////////////////////////////// + + std::shared_ptr const> getFailoverCandidates() const { + READ_LOCKER(readLocker, _dataLock); + return _failoverCandidates; + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief Take over leadership for this shard. + /// Also inject information of a insync followers that we knew about + /// before a failover to this server has happened + //////////////////////////////////////////////////////////////////////////////// + + void takeOverLeadership(std::vector const& previousInsyncFollowers); + ////////////////////////////////////////////////////////////////////////////// /// @brief add a follower to a shard, this is only done by the server side /// of the "get-in-sync" capabilities. This reports to the agency under @@ -106,6 +146,9 @@ class FollowerInfo { ////////////////////////////////////////////////////////////////////////////// void setTheLeader(std::string const& who) { + // Empty leader => we are now new leader. + // This needs to be handled with takeOverLeadership + TRI_ASSERT(!who.empty()); WRITE_LOCKER(writeLocker, _dataLock); _theLeader = who; _theLeaderTouched = true; @@ -128,6 +171,54 @@ class FollowerInfo { READ_LOCKER(readLocker, _dataLock); return _theLeaderTouched; } + + bool allowedToWrite() { + { + auto engine = arangodb::EngineSelectorFeature::ENGINE; + TRI_ASSERT(engine != nullptr); + if (engine->inRecovery()) { + return true; + } + READ_LOCKER(readLocker, _canWriteLock); + if (_canWrite) { + // Someone has decided we can write, fastPath! + +#ifdef ARANGODB_USE_MAINTAINER_MODE + // Invariant, we can only WRITE if we do not have other failover candidates + READ_LOCKER(readLockerData, _dataLock); + TRI_ASSERT(_followers->size() == _failoverCandidates->size()); + TRI_ASSERT(_followers->size() > _docColl->minReplicationFactor()); +#endif + return _canWrite; + } + READ_LOCKER(readLockerData, _dataLock); + TRI_ASSERT(_docColl != nullptr); + if (_followers->size() + 1 < _docColl->minReplicationFactor()) { + // We know that we still do not have enough followers + return false; + } + } + return updateFailoverCandidates(); + } + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Inject the information about followers into the builder. + /// Builder needs to be an open object and is not allowed to contain + /// the keys "servers" and "failoverCandidates". + ////////////////////////////////////////////////////////////////////////////// + void injectFollowerInfo(arangodb::velocypack::Builder& builder) const { + READ_LOCKER(readLockerData, _dataLock); + injectFollowerInfoInternal(builder); + } + + private: + void injectFollowerInfoInternal(arangodb::velocypack::Builder& builder) const; + + bool updateFailoverCandidates(); + + Result persistInAgency(bool isRemove) const; + + arangodb::velocypack::Builder newShardEntry(arangodb::velocypack::Slice oldValue) const; }; } // end namespace arangodb diff --git a/arangod/Cluster/Maintenance.cpp b/arangod/Cluster/Maintenance.cpp index 75e68d22ce..25cd7d8007 100644 --- a/arangod/Cluster/Maintenance.cpp +++ b/arangod/Cluster/Maintenance.cpp @@ -245,7 +245,8 @@ void handlePlanShard(VPackSlice const& cprops, VPackSlice const& ldb, {THE_LEADER, shouldBeLeading ? std::string() : leaderId}, {SERVER_ID, serverId}, {LOCAL_LEADER, lcol.get(THE_LEADER).copyString()}, - {FOLLOWERS_TO_DROP, followersToDropString}}, + {FOLLOWERS_TO_DROP, followersToDropString}, + {OLD_CURRENT_COUNTER, std::to_string(feature.getCurrentCounter())}}, HIGHER_PRIORITY, properties)); } else { LOG_TOPIC("0285b", DEBUG, Logger::MAINTENANCE) @@ -726,26 +727,14 @@ static VPackBuilder assembleLocalCollectionInfo( } } } - ret.add(VPackValue(SERVERS)); - { - VPackArrayBuilder a(&ret); - ret.add(VPackValue(ourselves)); - // planServers may be `none` in the case that the shard is not - // contained in Plan, but in local. - if (planServers.isArray()) { - std::shared_ptr const> current = - collection->followers()->get(); - for (auto const& server : *current) { - ret.add(VPackValue(server)); - } - } - } + collection->followers()->injectFollowerInfo(ret); } return ret; } catch (std::exception const& e) { ret.clear(); std::string errorMsg( - "Maintenance::assembleLocalCollectionInfo: Failed to lookup database "); + "Maintenance::assembleLocalCollectionInfo: Failed to lookup " + "database "); errorMsg += database; errorMsg += ", exception: "; errorMsg += e.what(); @@ -852,8 +841,10 @@ arangodb::Result arangodb::maintenance::reportInCurrent( auto const planPath = std::vector{dbName, colName, "shards", shName}; if (!pdbs.hasKey(planPath)) { LOG_TOPIC("43242", DEBUG, Logger::MAINTENANCE) - << "Ooops, we have a shard for which we believe to be the leader," - " but the Plan does not have it any more, we do not report in " + << "Ooops, we have a shard for which we believe to be the " + "leader," + " but the Plan does not have it any more, we do not report " + "in " "Current about this, database: " << dbName << ", shard: " << shName; continue; @@ -863,7 +854,8 @@ arangodb::Result arangodb::maintenance::reportInCurrent( if (!thePlanList.isArray() || thePlanList.length() == 0 || !thePlanList[0].isString() || !thePlanList[0].isEqualStringUnchecked(serverId)) { LOG_TOPIC("87776", DEBUG, Logger::MAINTENANCE) - << "Ooops, we have a shard for which we believe to be the leader," + << "Ooops, we have a shard for which we believe to be the " + "leader," " but the Plan says otherwise, we do not report in Current " "about this, database: " << dbName << ", shard: " << shName; @@ -923,7 +915,8 @@ arangodb::Result arangodb::maintenance::reportInCurrent( if (!pdbs.hasKey(planPath)) { LOG_TOPIC("65432", DEBUG, Logger::MAINTENANCE) << "Ooops, we have a shard for which we believe that we " - "just resigned, but the Plan does not have it any more," + "just resigned, but the Plan does not have it any " + "more," " we do not report in Current about this, database: " << dbName << ", shard: " << shName; continue; diff --git a/arangod/Cluster/MaintenanceFeature.cpp b/arangod/Cluster/MaintenanceFeature.cpp index ec61671860..25d7f81f9f 100644 --- a/arangod/Cluster/MaintenanceFeature.cpp +++ b/arangod/Cluster/MaintenanceFeature.cpp @@ -57,7 +57,8 @@ bool findNotDoneActions(std::shared_ptr const& action) { MaintenanceFeature::MaintenanceFeature(application_features::ApplicationServer& server) : ApplicationFeature(server, "Maintenance"), _forceActivation(false), - _maintenanceThreadsMax(2) { + _maintenanceThreadsMax(2), + _currentCounter(0) { // the number of threads will be adjusted later. it's just that we want to // initialize all members properly @@ -116,7 +117,8 @@ void MaintenanceFeature::validateOptions(std::shared_ptr options << "Need at least" << minThreadLimit << "maintenance-threads"; _maintenanceThreadsMax = minThreadLimit; } else if (_maintenanceThreadsMax >= maxThreadLimit) { - LOG_TOPIC("8fb0e", WARN, Logger::MAINTENANCE) << "maintenance-threads limited to " << maxThreadLimit; + LOG_TOPIC("8fb0e", WARN, Logger::MAINTENANCE) + << "maintenance-threads limited to " << maxThreadLimit; _maintenanceThreadsMax = maxThreadLimit; } } @@ -129,8 +131,9 @@ void MaintenanceFeature::start() { // _forceActivation is set by the catch tests if (!_forceActivation && (serverState->isAgent() || serverState->isSingleServer())) { - LOG_TOPIC("deb1a", TRACE, Logger::MAINTENANCE) << "Disable maintenance-threads" - << " for single-server or agents."; + LOG_TOPIC("deb1a", TRACE, Logger::MAINTENANCE) + << "Disable maintenance-threads" + << " for single-server or agents."; return; } @@ -442,7 +445,7 @@ std::shared_ptr MaintenanceFeature::findReadyAction(std::unordered_set guard(_currentCounterLock); + return _currentCounter; +} + +void MaintenanceFeature::increaseCurrentCounter() { + std::unique_lock guard(_currentCounterLock); + _currentCounter++; + _currentCounterCondition.notify_all(); +} + +void MaintenanceFeature::waitForLargerCurrentCounter(uint64_t old) { + std::unique_lock guard(_currentCounterLock); + if (_currentCounter > old) { + return; + } + _currentCounterCondition.wait(guard); + TRI_ASSERT(_currentCounter > old); + return; +} \ No newline at end of file diff --git a/arangod/Cluster/MaintenanceFeature.h b/arangod/Cluster/MaintenanceFeature.h index 1716810b44..7681c4fda1 100644 --- a/arangod/Cluster/MaintenanceFeature.h +++ b/arangod/Cluster/MaintenanceFeature.h @@ -36,7 +36,7 @@ namespace arangodb { -template +template struct SharedPtrComparer { bool operator()(std::shared_ptr const& a, std::shared_ptr const& b) { if (a == nullptr || b == nullptr) { @@ -50,8 +50,6 @@ class MaintenanceFeature : public application_features::ApplicationFeature { public: explicit MaintenanceFeature(application_features::ApplicationServer&); - MaintenanceFeature(); - virtual ~MaintenanceFeature() {} struct errors_t { @@ -156,7 +154,8 @@ class MaintenanceFeature : public application_features::ApplicationFeature { * @brief Find and return first found not-done action or nullptr * @param desc Description of sought action */ - std::shared_ptr findFirstNotDoneAction(std::shared_ptr const& desc); + std::shared_ptr findFirstNotDoneAction( + std::shared_ptr const& desc); /** * @brief add index error to bucket @@ -298,19 +297,44 @@ class MaintenanceFeature : public application_features::ApplicationFeature { */ void delShardVersion(std::string const& shardId); + /** + * @brief Get the number of loadCurrent operations. + * NOTE: The Counter functions can be removed + * as soon as we use a push based approach on Plan and Current + * @return The most recent count for getCurrent calls + */ + uint64_t getCurrentCounter() const; + + /** + * @brief increase the counter for loadCurrent operations triggered + * during maintenance. This is used to delay some Actions, that + * require a recent current to continue + */ + void increaseCurrentCounter(); + + /** + * @brief wait until the current counter is larger then the given old one + * the idea here is to first request the `getCurrentCounter`. + * @param old The last number of getCurrentCounter(). This function will + * return only of the recent counter is larger than old. + */ + void waitForLargerCurrentCounter(uint64_t old); + private: /// @brief common code used by multiple constructors void init(); /// @brief Search for first action matching hash and predicate /// @return shared pointer to action object if exists, empty shared_ptr if not - std::shared_ptr findFirstActionHash(size_t hash, - std::function const&)> const& predicate); + std::shared_ptr findFirstActionHash( + size_t hash, + std::function const&)> const& predicate); /// @brief Search for first action matching hash and predicate (with lock already held by caller) /// @return shared pointer to action object if exists, empty shared_ptr if not - std::shared_ptr findFirstActionHashNoLock(size_t hash, - std::function const&)> const& predicate); + std::shared_ptr findFirstActionHashNoLock( + size_t hash, + std::function const&)> const& predicate); /// @brief Search for action by Id /// @return shared pointer to action object if exists, nullptr if not @@ -321,7 +345,6 @@ class MaintenanceFeature : public application_features::ApplicationFeature { std::shared_ptr findActionIdNoLock(uint64_t hash); protected: - /// @brief option for forcing this feature to always be enable - used by the catch tests bool _forceActivation; @@ -365,8 +388,8 @@ class MaintenanceFeature : public application_features::ApplicationFeature { // we need to leave the action in _prioQueue (since we cannot remove anything // but the top from it), and simply put it into a different state. std::priority_queue, - std::vector>, - SharedPtrComparer> _prioQueue; + std::vector>, SharedPtrComparer> + _prioQueue; /// @brief lock to protect _actionRegistry and state changes to MaintenanceActions within mutable arangodb::basics::ReadWriteLock _actionRegistryLock; @@ -404,6 +427,15 @@ class MaintenanceFeature : public application_features::ApplicationFeature { /// @brief shards have versions in order to be able to distinguish between /// independant actions std::unordered_map _shardVersion; + + /// @brief Mutex for the current counter condition variable + mutable std::mutex _currentCounterLock; + + /// @brief Condition variable where Actions can wait on until _currentCounter increased + std::condition_variable _currentCounterCondition; + + /// @brief counter for load_current requests. + uint64_t _currentCounter; }; } // namespace arangodb diff --git a/arangod/Cluster/MaintenanceStrings.h b/arangod/Cluster/MaintenanceStrings.h index 6e9d520084..a0d2c67bf3 100644 --- a/arangod/Cluster/MaintenanceStrings.h +++ b/arangod/Cluster/MaintenanceStrings.h @@ -69,6 +69,7 @@ constexpr char const* THE_LEADER = "theLeader"; constexpr char const* UNDERSCORE = "_"; constexpr char const* UPDATE_COLLECTION = "UpdateCollection"; constexpr char const* WAIT_FOR_SYNC = "waitForSync"; +constexpr char const* OLD_CURRENT_COUNTER = "oldCurrentCounter"; } // namespace maintenance } // namespace arangodb diff --git a/arangod/Cluster/UpdateCollection.cpp b/arangod/Cluster/UpdateCollection.cpp index 8dfd1a80cf..f48a1399c5 100644 --- a/arangod/Cluster/UpdateCollection.cpp +++ b/arangod/Cluster/UpdateCollection.cpp @@ -77,21 +77,36 @@ UpdateCollection::UpdateCollection(MaintenanceFeature& feature, ActionDescriptio } TRI_ASSERT(desc.has(FOLLOWERS_TO_DROP)); + TRI_ASSERT(desc.has(OLD_CURRENT_COUNTER)); + if (!error.str().empty()) { - LOG_TOPIC("a6e4c", ERR, Logger::MAINTENANCE) << "UpdateCollection: " << error.str(); + LOG_TOPIC("a6e4c", ERR, Logger::MAINTENANCE) + << "UpdateCollection: " << error.str(); _result.reset(TRI_ERROR_INTERNAL, error.str()); setState(FAILED); } } void handleLeadership(LogicalCollection& collection, std::string const& localLeader, - std::string const& plannedLeader, std::string const& followersToDrop) { + std::string const& plannedLeader, + std::string const& followersToDrop, std::string const& databaseName, + uint64_t oldCounter, MaintenanceFeature& feature) { auto& followers = collection.followers(); if (plannedLeader.empty()) { // Planned to lead if (!localLeader.empty()) { // We were not leader, assume leadership - followers->setTheLeader(std::string()); - followers->clear(); + // This will block the thread until we fetched a new current version + // in maintenance main thread. + feature.waitForLargerCurrentCounter(oldCounter); + auto currentInfo = ClusterInfo::instance()->getCollectionCurrent( + databaseName, std::to_string(collection.planId())); + if (currentInfo == nullptr) { + // Collection has been dropped we cannot continue here. + return; + } + TRI_ASSERT(currentInfo != nullptr); + auto failoverCandidates = currentInfo->failoverCandidates(collection.name()); + followers->takeOverLeadership(failoverCandidates); transaction::cluster::abortFollowerTransactionsOnShard(collection.id()); } else { // If someone (the Supervision most likely) has thrown @@ -138,6 +153,8 @@ bool UpdateCollection::first() { auto const& localLeader = _description.get(LOCAL_LEADER); auto const& followersToDrop = _description.get(FOLLOWERS_TO_DROP); auto const& props = properties(); + auto const& oldCounterString = _description.get(OLD_CURRENT_COUNTER); + uint64_t oldCounter = basics::StringUtils::uint64(oldCounterString); try { DatabaseGuard guard(database); @@ -152,7 +169,8 @@ bool UpdateCollection::first() { // resignation case is not handled here, since then // ourselves does not appear in shards[shard] but only // "_" + ourselves. - handleLeadership(*coll, localLeader, plannedLeader, followersToDrop); + handleLeadership(*coll, localLeader, plannedLeader, followersToDrop, + vocbase.name(), oldCounter, feature()); _result = Collections::updateProperties(*coll, props, false); // always a full-update if (!_result.ok()) { @@ -173,7 +191,8 @@ bool UpdateCollection::first() { std::stringstream error; error << "action " << _description << " failed with exception " << e.what(); - LOG_TOPIC("79442", WARN, Logger::MAINTENANCE) << "UpdateCollection: " << error.str(); + LOG_TOPIC("79442", WARN, Logger::MAINTENANCE) + << "UpdateCollection: " << error.str(); _result.reset(TRI_ERROR_INTERNAL, error.str()); } diff --git a/arangod/GeneralServer/RequestLane.h b/arangod/GeneralServer/RequestLane.h index f0f5de7b3d..24edc7e738 100644 --- a/arangod/GeneralServer/RequestLane.h +++ b/arangod/GeneralServer/RequestLane.h @@ -65,6 +65,11 @@ enum class RequestLane { // V8 or having high priority. CLUSTER_INTERNAL, + // For requests from the DBserver to the Coordinator or + // from the Coordinator to the DBserver. Using AQL + // these have Medium priority. + CLUSTER_AQL, + // For requests from the from the Coordinator to the // DBserver using V8. CLUSTER_V8, @@ -115,6 +120,8 @@ inline RequestPriority PriorityRequestLane(RequestLane lane) { return RequestPriority::LOW; case RequestLane::CLUSTER_INTERNAL: return RequestPriority::HIGH; + case RequestLane::CLUSTER_AQL: + return RequestPriority::MED; case RequestLane::CLUSTER_V8: return RequestPriority::LOW; case RequestLane::CLUSTER_ADMIN: diff --git a/arangod/InternalRestHandler/InternalRestTraverserHandler.h b/arangod/InternalRestHandler/InternalRestTraverserHandler.h index ee5bfe6a95..068d405b6b 100644 --- a/arangod/InternalRestHandler/InternalRestTraverserHandler.h +++ b/arangod/InternalRestHandler/InternalRestTraverserHandler.h @@ -41,9 +41,7 @@ class InternalRestTraverserHandler : public RestVocbaseBaseHandler { char const* name() const override final { return "InternalRestTraverserHandler"; } - RequestLane lane() const override final { - return RequestLane::CLUSTER_INTERNAL; - } + RequestLane lane() const override final { return RequestLane::CLUSTER_AQL; } private: // @brief create a new Traverser Engine. diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index c6716ed61c..7a4cc946fe 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -82,7 +82,7 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase, if (_asyncMode) { LOG_TOPIC("1b1c2", DEBUG, Logger::PREGEL) << "Running in async mode"; } - VPackSlice lazy = _userParams.slice().get( Utils::lazyLoadingKey); + VPackSlice lazy = _userParams.slice().get(Utils::lazyLoadingKey); _lazyLoading = _algorithm->supportsLazyLoading(); _lazyLoading = _lazyLoading && (lazy.isNone() || lazy.getBoolean()); if (_lazyLoading) { @@ -98,8 +98,7 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase, } Conductor::~Conductor() { - if (_state != ExecutionState::CANCELED && - _state != ExecutionState::DEFAULT) { + if (_state != ExecutionState::CANCELED && _state != ExecutionState::DEFAULT) { try { this->cancel(); } catch (...) { @@ -120,11 +119,13 @@ void Conductor::start() { _globalSuperstep = 0; _state = ExecutionState::RUNNING; - LOG_TOPIC("3a255", DEBUG, Logger::PREGEL) << "Telling workers to load the data"; + LOG_TOPIC("3a255", DEBUG, Logger::PREGEL) + << "Telling workers to load the data"; int res = _initializeWorkers(Utils::startExecutionPath, VPackSlice()); if (res != TRI_ERROR_NO_ERROR) { _state = ExecutionState::CANCELED; - LOG_TOPIC("30171", ERR, Logger::PREGEL) << "Not all DBServers started the execution"; + LOG_TOPIC("30171", ERR, Logger::PREGEL) + << "Not all DBServers started the execution"; } } @@ -170,7 +171,8 @@ bool Conductor::_startGlobalStep() { _masterContext->_enterNextGSS = false; proceed = _masterContext->postGlobalSuperstep(); if (!proceed) { - LOG_TOPIC("0aa8e", DEBUG, Logger::PREGEL) << "Master context ended execution"; + LOG_TOPIC("0aa8e", DEBUG, Logger::PREGEL) + << "Master context ended execution"; } } @@ -212,7 +214,8 @@ bool Conductor::_startGlobalStep() { res = _sendToAllDBServers(Utils::startGSSPath, b); // call me maybe if (res != TRI_ERROR_NO_ERROR) { _state = ExecutionState::IN_ERROR; - LOG_TOPIC("f34bb", ERR, Logger::PREGEL) << "Conductor could not start GSS " << _globalSuperstep; + LOG_TOPIC("f34bb", ERR, Logger::PREGEL) + << "Conductor could not start GSS " << _globalSuperstep; // the recovery mechanisms should take care od this } else { LOG_TOPIC("411a5", DEBUG, Logger::PREGEL) << "Conductor started new gss " << _globalSuperstep; @@ -236,8 +239,9 @@ void Conductor::finishedWorkerStartup(VPackSlice const& data) { return; } - LOG_TOPIC("76631", INFO, Logger::PREGEL) << "Running pregel with " << _totalVerticesCount - << " vertices, " << _totalEdgesCount << " edges"; + LOG_TOPIC("76631", INFO, Logger::PREGEL) + << "Running pregel with " << _totalVerticesCount << " vertices, " + << _totalEdgesCount << " edges"; if (_masterContext) { _masterContext->_globalSuperstep = 0; _masterContext->_vertexCount = _totalVerticesCount; @@ -356,7 +360,8 @@ void Conductor::finishedRecoveryStep(VPackSlice const& data) { res = _sendToAllDBServers(Utils::continueRecoveryPath, b); } else { - LOG_TOPIC("6ecf2", INFO, Logger::PREGEL) << "Recovery finished. Proceeding normally"; + LOG_TOPIC("6ecf2", INFO, Logger::PREGEL) + << "Recovery finished. Proceeding normally"; // build the message, works for all cases VPackBuilder b; @@ -393,7 +398,8 @@ void Conductor::startRecovery() { if (_state != ExecutionState::RUNNING && _state != ExecutionState::IN_ERROR) { return; // maybe we are already in recovery mode } else if (_algorithm->supportsCompensation() == false) { - LOG_TOPIC("12e0e", ERR, Logger::PREGEL) << "Algorithm does not support recovery"; + LOG_TOPIC("12e0e", ERR, Logger::PREGEL) + << "Algorithm does not support recovery"; cancelNoLock(); return; } @@ -407,14 +413,15 @@ void Conductor::startRecovery() { // let's wait for a final state in the cluster _workHandle = SchedulerFeature::SCHEDULER->queueDelay( - RequestLane::CLUSTER_INTERNAL, std::chrono::seconds(2), [this](bool cancelled) { + RequestLane::CLUSTER_AQL, std::chrono::seconds(2), [this](bool cancelled) { if (cancelled || _state != ExecutionState::RECOVERING) { return; // seems like we are canceled } std::vector goodServers; int res = PregelFeature::instance()->recoveryManager()->filterGoodServers(_dbServers, goodServers); if (res != TRI_ERROR_NO_ERROR) { - LOG_TOPIC("3d08b", ERR, Logger::PREGEL) << "Recovery proceedings failed"; + LOG_TOPIC("3d08b", ERR, Logger::PREGEL) + << "Recovery proceedings failed"; cancelNoLock(); return; } @@ -614,15 +621,15 @@ int Conductor::_initializeWorkers(std::string const& suffix, VPackSlice addition } std::shared_ptr cc = ClusterComm::instance(); - size_t nrGood = cc->performRequests(requests, 5.0 * 60.0, - LogTopic("Pregel Conductor"), false); + size_t nrGood = + cc->performRequests(requests, 5.0 * 60.0, LogTopic("Pregel Conductor"), false); Utils::printResponses(requests); return nrGood == requests.size() ? TRI_ERROR_NO_ERROR : TRI_ERROR_FAILED; } int Conductor::_finalizeWorkers() { _callbackMutex.assertLockedByCurrentThread(); - _finalizationStartTimeSecs = TRI_microtime(); + _finalizationStartTimeSecs = TRI_microtime(); bool store = _state == ExecutionState::DONE; store = store && _storeResults; @@ -651,7 +658,6 @@ int Conductor::_finalizeWorkers() { } void Conductor::finishedWorkerFinalize(VPackSlice data) { - MUTEX_LOCKER(guard, _callbackMutex); _ensureUniqueResponse(data); if (_respondedServers.size() != _dbServers.size()) { @@ -674,9 +680,8 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) { LOG_TOPIC("063b5", INFO, Logger::PREGEL) << "Done. We did " << _globalSuperstep << " rounds"; LOG_TOPIC("3cfa8", INFO, Logger::PREGEL) - << "Startup Time: " << _computationStartTimeSecs - _startTimeSecs << "s"; - LOG_TOPIC("d43cb", INFO, Logger::PREGEL) - << "Computation Time: " << compTime << "s"; + << "Startup Time: " << _computationStartTimeSecs - _startTimeSecs << "s"; + LOG_TOPIC("d43cb", INFO, Logger::PREGEL) << "Computation Time: " << compTime << "s"; LOG_TOPIC("74e05", INFO, Logger::PREGEL) << "Storage Time: " << storeTime << "s"; LOG_TOPIC("06f03", INFO, Logger::PREGEL) << "Overall: " << totalRuntimeSecs() << "s"; LOG_TOPIC("03f2e", DEBUG, Logger::PREGEL) << "Stats: " << debugOut.toString(); @@ -686,7 +691,7 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) { auto* scheduler = SchedulerFeature::SCHEDULER; if (scheduler) { uint64_t exe = _executionNumber; - scheduler->queue(RequestLane::CLUSTER_INTERNAL, [exe] { + scheduler->queue(RequestLane::CLUSTER_AQL, [exe] { auto pf = PregelFeature::instance(); if (pf) { pf->cleanupConductor(exe); @@ -770,8 +775,7 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const& if (conductor) { TRI_vocbase_t& vocbase = conductor->_vocbaseGuard.database(); VPackBuilder response; - PregelFeature::handleWorkerRequest(vocbase, path, - message.slice(), response); + PregelFeature::handleWorkerRequest(vocbase, path, message.slice(), response); } }); } @@ -794,9 +798,10 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const& requests.emplace_back("server:" + server, rest::RequestType::POST, base + path, body); } - size_t nrGood = cc->performRequests(requests, 5.0 * 60.0, - LogTopic("Pregel Conductor"), false); - LOG_TOPIC("9de62", TRACE, Logger::PREGEL) << "Send " << path << " to " << nrGood << " servers"; + size_t nrGood = + cc->performRequests(requests, 5.0 * 60.0, LogTopic("Pregel Conductor"), false); + LOG_TOPIC("9de62", TRACE, Logger::PREGEL) + << "Send " << path << " to " << nrGood << " servers"; Utils::printResponses(requests); if (handle && nrGood == requests.size()) { for (ClusterCommRequest const& req : requests) { diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index ee7b8e85c5..05f26b8469 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -1147,7 +1147,8 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator( } if (!isValidMinReplFactorSlice) { - if (replFactorSlice.isString() && replFactorSlice.isEqualString("satellite")) { + if (replFactorSlice.isString() && + replFactorSlice.isEqualString("satellite")) { minReplicationFactor = 0; } else if (minReplicationFactor <= 0) { minReplicationFactor = 1; diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index ef71261b68..67346ef4c2 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -53,9 +53,9 @@ bool isDirectDeadlockLane(RequestLane lane) { // Those tasks can not be executed directly. return lane == RequestLane::TASK_V8 || lane == RequestLane::CLIENT_V8 || lane == RequestLane::CLUSTER_V8 || lane == RequestLane::INTERNAL_LOW || - lane == RequestLane::SERVER_REPLICATION || - lane == RequestLane::CLUSTER_ADMIN || lane == RequestLane::CLUSTER_INTERNAL || - lane == RequestLane::AGENCY_CLUSTER || lane == RequestLane::CLIENT_AQL; + lane == RequestLane::SERVER_REPLICATION || lane == RequestLane::CLUSTER_ADMIN || + lane == RequestLane::CLUSTER_INTERNAL || lane == RequestLane::AGENCY_CLUSTER || + lane == RequestLane::CLIENT_AQL || lane == RequestLane::CLUSTER_AQL; } } // namespace diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 9b20f5b859..f8fc83975a 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -1720,7 +1720,7 @@ OperationResult transaction::Methods::insertLocal(std::string const& collectionN if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION, options); } - if (followerInfo->get()->size() + 1 < collection->minReplicationFactor()) { + if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("d7306", ERR, Logger::REPLICATION) @@ -2044,7 +2044,7 @@ OperationResult transaction::Methods::modifyLocal(std::string const& collectionN if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION); } - if (followerInfo->get()->size() + 1 < collection->minReplicationFactor()) { + if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("2e35a", ERR, Logger::REPLICATION) @@ -2324,7 +2324,7 @@ OperationResult transaction::Methods::removeLocal(std::string const& collectionN if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION); } - if (followerInfo->get()->size() + 1 < collection->minReplicationFactor()) { + if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("f1f8e", ERR, Logger::REPLICATION) @@ -2559,7 +2559,7 @@ OperationResult transaction::Methods::truncateLocal(std::string const& collectio if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION); } - if (followerInfo->get()->size() + 1 < collection->minReplicationFactor()) { + if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("7c1d4", ERR, Logger::REPLICATION) diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index 66e67cab12..89b9c1f8b8 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -228,6 +228,7 @@ std::string const StaticStrings::GraphCreateCollection("createCollection"); // Replication std::string const StaticStrings::ReplicationSoftLockOnly("doSoftLockOnly"); +std::string const StaticStrings::FailoverCandidates("failoverCandidates"); // misc strings std::string const StaticStrings::LastValue("lastValue"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index d5662b19b4..764473fb7b 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -210,6 +210,7 @@ class StaticStrings { // Replication static std::string const ReplicationSoftLockOnly; + static std::string const FailoverCandidates; // misc strings static std::string const LastValue; diff --git a/tests/IResearch/IResearchQueryOptimization-test.cpp b/tests/IResearch/IResearchQueryOptimization-test.cpp index e1e728d2e4..d6bb880d06 100644 --- a/tests/IResearch/IResearchQueryOptimization-test.cpp +++ b/tests/IResearch/IResearchQueryOptimization-test.cpp @@ -252,13 +252,39 @@ class IResearchQueryOptimizationTest : public ::testing::Test { NS_END +static std::vector const EMPTY; + // ----------------------------------------------------------------------------- // --SECTION-- test suite // ----------------------------------------------------------------------------- +void addLinkToCollection(std::shared_ptr& view) { + auto updateJson = VPackParser::fromJson( + "{ \"links\" : {" + "\"collection_1\" : { \"includeAllFields\" : true }" + "}}"); + EXPECT_TRUE((view->properties(updateJson->slice(), true).ok())); + + arangodb::velocypack::Builder builder; + + builder.openObject(); + view->properties(builder, arangodb::LogicalDataSource::makeFlags( + arangodb::LogicalDataSource::Serialize::Detailed)); + builder.close(); + + auto slice = builder.slice(); + EXPECT_TRUE(slice.isObject()); + EXPECT_TRUE(slice.get("name").copyString() == "testView"); + EXPECT_TRUE(slice.get("type").copyString() == + arangodb::iresearch::DATA_SOURCE_TYPE.name()); + EXPECT_TRUE(slice.get("deleted").isNone()); // no system properties + auto tmpSlice = slice.get("links"); + EXPECT_TRUE((true == tmpSlice.isObject() && 1 == tmpSlice.length())); +} + + // dedicated to https://github.com/arangodb/arangodb/issues/8294 TEST_F(IResearchQueryOptimizationTest, test) { - static std::vector const EMPTY; auto createJson = VPackParser::fromJson( "{ \ @@ -285,29 +311,7 @@ TEST_F(IResearchQueryOptimizationTest, test) { ASSERT_TRUE((false == !view)); // add link to collection - { - auto updateJson = VPackParser::fromJson( - "{ \"links\" : {" - "\"collection_1\" : { \"includeAllFields\" : true }" - "}}"); - EXPECT_TRUE((view->properties(updateJson->slice(), true).ok())); - - arangodb::velocypack::Builder builder; - - builder.openObject(); - view->properties(builder, arangodb::LogicalDataSource::makeFlags( - arangodb::LogicalDataSource::Serialize::Detailed)); - builder.close(); - - auto slice = builder.slice(); - EXPECT_TRUE(slice.isObject()); - EXPECT_TRUE(slice.get("name").copyString() == "testView"); - EXPECT_TRUE(slice.get("type").copyString() == - arangodb::iresearch::DATA_SOURCE_TYPE.name()); - EXPECT_TRUE(slice.get("deleted").isNone()); // no system properties - auto tmpSlice = slice.get("links"); - EXPECT_TRUE((true == tmpSlice.isObject() && 1 == tmpSlice.length())); - } + addLinkToCollection(view); std::deque insertedDocs;