1
0
Fork 0

[3.4] Fast Leader Change (#9642)

* Intermediate state.

* Finished.
This commit is contained in:
Lars Maier 2019-08-09 14:02:04 +02:00 committed by Michael Hackstein
parent 508abc311e
commit d8bc3685d3
13 changed files with 324 additions and 18 deletions

View File

@ -1,8 +1,11 @@
v3.4.8 (XXXX-XX-XX)
-------------------
* Keep followers in sync if the old leader resigned and stopped writes.
* Abort a FailedLeader job when its _to_ server fails.
* Re-added warning for kernel setting "overcommit_memory=2" if jemalloc is also enabled.
* Significantly reduced "random" 1 second delays in some arangosearch DDL operations.

View File

@ -217,6 +217,8 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
LOG_TOPIC(TRACE, Logger::MAINTENANCE)
<< "DBServerAgencySync::phaseTwo - current state: " << current->toJson();
mfeature->increaseCurrentCounter();
local.clear();
glc = getLocalCollections(local);
// We intentionally refetch local collections here, such that phase 2
@ -264,14 +266,14 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
AgencyPrecondition(
precondition.keyAt(0).copyString(), AgencyPrecondition::Type::VALUE, precondition.valueAt(0)));
}
if (op == "set") {
auto const value = ao.value.get("payload");
operations.push_back(AgencyOperation(key, AgencyValueOperationType::SET, value));
} else if (op == "delete") {
operations.push_back(AgencyOperation(key, AgencySimpleOperationType::DELETE_OP));
}
}
operations.push_back(AgencyOperation("Current/Version",
AgencySimpleOperationType::INCREMENT_OP));

View File

@ -358,3 +358,20 @@ void FollowerInfo::clear() {
auto v = std::make_shared<std::vector<ServerID>>();
_followers = v; // will cast to std::vector<ServerID> const
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Take over leadership for this shard.
/// Also inject information of a insync followers that we knew about
/// before a failover to this server has happened
////////////////////////////////////////////////////////////////////////////////
void FollowerInfo::takeOverLeadership(std::shared_ptr<std::vector<ServerID>> realInsyncFollowers) {
// This function copies over the information taken from the last CURRENT into a local vector.
// Where we remove the old leader and ourself from the list of followers
WRITE_LOCKER(writeLocker, _dataLock);
// Reset local structures, if we take over leadership we do not know anything!
_followers = realInsyncFollowers ? realInsyncFollowers : std::make_shared<std::vector<ServerID>>();
// Take over leadership
_theLeader = "";
_theLeaderTouched = true;
}

View File

@ -114,6 +114,16 @@ class FollowerInfo {
READ_LOCKER(readLocker, _dataLock);
return _theLeaderTouched;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Take over leadership for this shard.
/// Also inject information of a insync followers that we knew about
/// before a failover to this server has happened
/// The second parameter may be nullptr. It is an additional list
/// of declared to be insync followers. If it is nullptr the follower
/// list is initialised empty.
////////////////////////////////////////////////////////////////////////////////
void takeOverLeadership(std::shared_ptr<std::vector<ServerID>> realInsyncFollowers);
};
} // end namespace arangodb

View File

@ -245,7 +245,8 @@ void handlePlanShard(VPackSlice const& cprops, VPackSlice const& ldb,
{THE_LEADER, shouldBeLeading ? std::string() : leaderId},
{SERVER_ID, serverId},
{LOCAL_LEADER, lcol.get(THE_LEADER).copyString()},
{FOLLOWERS_TO_DROP, followersToDropString}},
{FOLLOWERS_TO_DROP, followersToDropString},
{OLD_CURRENT_COUNTER, std::to_string(feature.getCurrentCounter())}},
HIGHER_PRIORITY, properties));
} else {
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
@ -624,7 +625,7 @@ arangodb::Result arangodb::maintenance::phaseOne(VPackSlice const& plan,
VPackBuilder& report) {
arangodb::Result result;
report.add(VPackValue(PHASE_ONE));
{

View File

@ -369,7 +369,7 @@ std::shared_ptr<Action> MaintenanceFeature::findFirstActionHash(size_t hash,
READ_LOCKER(rLock, _actionRegistryLock);
return findFirstActionHashNoLock(hash, predicate);
}
}
std::shared_ptr<Action> MaintenanceFeature::findFirstActionHashNoLock(size_t hash,
std::function<bool(std::shared_ptr<maintenance::Action> const&)> const& predicate) {
@ -387,7 +387,7 @@ std::shared_ptr<Action> MaintenanceFeature::findActionId(uint64_t id) {
READ_LOCKER(rLock, _actionRegistryLock);
return findActionIdNoLock(id);
}
}
std::shared_ptr<Action> MaintenanceFeature::findActionIdNoLock(uint64_t id) {
// assert to test lock held?
@ -426,7 +426,7 @@ std::shared_ptr<Action> MaintenanceFeature::findReadyAction(std::unordered_set<s
// contain any fast track, so we can idle.
break;
}
// When we get here, there is currently nothing to do, so we might
// as well clean up those jobs in the _actionRegistry, which are
// in state DONE:
@ -451,7 +451,7 @@ std::shared_ptr<Action> MaintenanceFeature::findReadyAction(std::unordered_set<s
return ret_ptr;
}
}
VPackBuilder MaintenanceFeature::toVelocyPack() const {
VPackBuilder vb;
@ -734,3 +734,45 @@ void MaintenanceFeature::delShardVersion(std::string const& shname) {
_shardVersion.erase(it);
}
}
uint64_t MaintenanceFeature::getCurrentCounter() const {
// It is guaranteed that getCurrentCounter is not executed
// concurrent to increase / wait.
// This guarantee is created by the following:
// 1) There is one inifinite loop that will call
// PhaseOne and PhaseTwo in exactly this ordering.
// It is guaranteed that only one thread at a time is
// in this loop.
// Between PhaseOne and PhaseTwo the increaseCurrentCounter is called
// Within PhaseOne this getCurrentCounter is called, but never after.
// so getCurrentCounter and increaseCurrentCounter are strictily serialized.
// 2) waitForLargerCurrentCounter can be called in concurrent threads at any time.
// It is read-only, so it is save to have it concurrent to getCurrentCounter
// without any locking.
// However we need locking for increase and waitFor in order to guarantee
// it's functionallity.
// For now we actually do not need this guard, but as this is NOT performance
// critical we can simply get it, just to be save for later use.
std::unique_lock<std::mutex> guard(_currentCounterLock);
return _currentCounter;
}
void MaintenanceFeature::increaseCurrentCounter() {
std::unique_lock<std::mutex> guard(_currentCounterLock);
_currentCounter++;
_currentCounterCondition.notify_all();
}
void MaintenanceFeature::waitForLargerCurrentCounter(uint64_t old) {
std::unique_lock<std::mutex> guard(_currentCounterLock);
// Just to be sure we get not woken up for other reasons.
while (_currentCounter <= old) {
// We might miss a shutdown check here.
// This is ok, as we will not be able to do much anyways.
if (_isShuttingDown) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
_currentCounterCondition.wait(guard);
}
TRI_ASSERT(_currentCounter > old);
}

View File

@ -298,6 +298,29 @@ class MaintenanceFeature : public application_features::ApplicationFeature {
*/
void delShardVersion(std::string const& shardId);
/**
* @brief Get the number of loadCurrent operations.
* NOTE: The Counter functions can be removed
* as soon as we use a push based approach on Plan and Current
* @return The most recent count for getCurrent calls
*/
uint64_t getCurrentCounter() const;
/**
* @brief increase the counter for loadCurrent operations triggered
* during maintenance. This is used to delay some Actions, that
* require a recent current to continue
*/
void increaseCurrentCounter();
/**
* @brief wait until the current counter is larger then the given old one
* the idea here is to first request the `getCurrentCounter`.
* @param old The last number of getCurrentCounter(). This function will
* return only of the recent counter is larger than old.
*/
void waitForLargerCurrentCounter(uint64_t old);
private:
/// @brief common code used by multiple constructors
void init();
@ -404,6 +427,15 @@ class MaintenanceFeature : public application_features::ApplicationFeature {
/// @brief shards have versions in order to be able to distinguish between
/// independant actions
std::unordered_map<std::string, size_t> _shardVersion;
/// @brief Mutex for the current counter condition variable
mutable std::mutex _currentCounterLock;
/// @brief Condition variable where Actions can wait on until _currentCounter increased
std::condition_variable _currentCounterCondition;
/// @brief counter for load_current requests.
uint64_t _currentCounter;
};
} // namespace arangodb

View File

@ -69,6 +69,7 @@ constexpr char const* THE_LEADER = "theLeader";
constexpr char const* UNDERSCORE = "_";
constexpr char const* UPDATE_COLLECTION = "UpdateCollection";
constexpr char const* WAIT_FOR_SYNC = "waitForSync";
constexpr char const* OLD_CURRENT_COUNTER = "oldCurrentCounter";
} // namespace maintenance
} // namespace arangodb

View File

@ -118,7 +118,7 @@ bool ResignShardLeadership::first() {
// for now but we will not accept any replication operation from any
// leader, until we have negotiated a deal with it. Then the actual
// name of the leader will be set.
col->followers()->setTheLeader("LEADER_NOT_YET_KNOWN"); // resign
col->followers()->setTheLeader(LeaderNotYetKnownString); // resign
} catch (std::exception const& e) {
std::stringstream error;
@ -131,3 +131,5 @@ bool ResignShardLeadership::first() {
notify();
return false;
}
std::string const ResignShardLeadership::LeaderNotYetKnownString = "LEADER_NOT_YET_KNOWN";

View File

@ -40,6 +40,8 @@ class ResignShardLeadership : public ActionBase {
virtual ~ResignShardLeadership();
virtual bool first() override final;
static std::string const LeaderNotYetKnownString;
};
} // namespace maintenance

View File

@ -26,6 +26,7 @@
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/FollowerInfo.h"
#include "Cluster/MaintenanceFeature.h"
@ -74,6 +75,7 @@ UpdateCollection::UpdateCollection(MaintenanceFeature& feature, ActionDescriptio
error << "followersToDrop must be specified. ";
}
TRI_ASSERT(desc.has(FOLLOWERS_TO_DROP));
TRI_ASSERT(desc.has(OLD_CURRENT_COUNTER));
if (!error.str().empty()) {
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "UpdateCollection: " << error.str();
@ -82,14 +84,90 @@ UpdateCollection::UpdateCollection(MaintenanceFeature& feature, ActionDescriptio
}
}
void sendLeaderChangeRequests(std::vector<ServerID> const& currentServers,
std::shared_ptr<std::vector<ServerID>>& realInsyncFollowers,
std::string const& databaseName, ShardID const& shardID,
std::string const& oldLeader) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return;
}
std::string const& sid = arangodb::ServerState::instance()->getId();
VPackBuilder bodyBuilder;
{
VPackObjectBuilder ob(&bodyBuilder);
bodyBuilder.add("leaderId", VPackValue(sid));
bodyBuilder.add("oldLeaderId", VPackValue(oldLeader));
bodyBuilder.add("shard", VPackValue(shardID));
}
std::string const url = "/_db/" + databaseName + "/_api/replication/set-the-leader";
std::vector<ClusterCommRequest> requests;
auto body = std::make_shared<std::string>(bodyBuilder.toJson());
for (auto const& srv : currentServers) {
if (srv == sid) {
continue; // ignore ourself
}
requests.emplace_back("server:" + srv, RequestType::PUT, url, body);
}
size_t nrDone;
cc->performRequests(requests, 3.0, nrDone, Logger::COMMUNICATION, false);
// This code intentionally ignores all errors
realInsyncFollowers = std::make_shared<std::vector<ServerID>>();
for (auto const& req : requests) {
ClusterCommResult const& result = req.result;
if (result.status == CL_COMM_RECEIVED && result.errorCode == TRI_ERROR_NO_ERROR) {
if (result.result && result.result->getHttpReturnCode() == 200) {
realInsyncFollowers->push_back(result.serverID);
}
}
}
}
void handleLeadership(LogicalCollection& collection, std::string const& localLeader,
std::string const& plannedLeader, std::string const& followersToDrop) {
std::string const& plannedLeader, std::string const& followersToDrop,
std::string const& databaseName,
uint64_t oldCounter, MaintenanceFeature& feature) {
auto& followers = collection.followers();
if (plannedLeader.empty()) { // Planned to lead
if (!localLeader.empty()) { // We were not leader, assume leadership
followers->setTheLeader(std::string());
followers->clear();
// This will block the thread until we fetched a new current version
// in maintenance main thread.
feature.waitForLargerCurrentCounter(oldCounter);
auto currentInfo = ClusterInfo::instance()->getCollectionCurrent(
databaseName, std::to_string(collection.planId()));
if (currentInfo == nullptr) {
// Collection has been dropped we cannot continue here.
return;
}
std::vector<ServerID> currentServers = currentInfo->servers(collection.name());
std::shared_ptr<std::vector<ServerID>> realInsyncFollowers;
if (currentServers.size() > 0) {
std::string& oldLeader = currentServers.at(0);
// Check if the old leader has resigned and stopped all write
// (if so, we can assume that all servers are still in sync)
if (oldLeader.at(0) == '_') {
// remove the underscore from the list as it is useless anyway
oldLeader = oldLeader.substr(1);
// Update all follower and tell them that we are the leader now
sendLeaderChangeRequests(currentServers, realInsyncFollowers, databaseName, collection.name(), oldLeader);
}
}
followers->takeOverLeadership(realInsyncFollowers);
} else {
// If someone (the Supervision most likely) has thrown
// out a follower from the plan, then the leader
@ -134,6 +212,8 @@ bool UpdateCollection::first() {
auto const& localLeader = _description.get(LOCAL_LEADER);
auto const& followersToDrop = _description.get(FOLLOWERS_TO_DROP);
auto const& props = properties();
auto const& oldCounterString = _description.get(OLD_CURRENT_COUNTER);
uint64_t oldCounter = basics::StringUtils::uint64(oldCounterString);
try {
DatabaseGuard guard(database);
@ -149,7 +229,8 @@ bool UpdateCollection::first() {
// resignation case is not handled here, since then
// ourselves does not appear in shards[shard] but only
// "_" + ourselves.
handleLeadership(*coll, localLeader, plannedLeader, followersToDrop);
handleLeadership(*coll, localLeader, plannedLeader, followersToDrop,
vocbase->name(), oldCounter, feature());
_result = Collections::updateProperties(*coll, props, false); // always a full-update
if (!_result.ok()) {

View File

@ -36,6 +36,7 @@
#include "Cluster/ClusterHelpers.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/FollowerInfo.h"
#include "Cluster/ResignShardLeadership.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Indexes/Index.h"
#include "Replication/DatabaseInitialSyncer.h"
@ -107,6 +108,40 @@ static bool ignoreHiddenEnterpriseCollection(std::string const& name, bool force
return false;
}
static Result checkPlanLeaderDirect(std::shared_ptr<LogicalCollection> const& col,
std::string const& claimLeaderId) {
std::vector<std::string> agencyPath = {
"Plan",
"Collections",
col->vocbase().name(),
std::to_string(col->planId()),
"shards",
col->name()
};
std::string shardAgencyPathString = StringUtils::join(agencyPath, '/');
AgencyComm ac;
AgencyCommResult res = ac.getValues(shardAgencyPathString);
if (res.successful()) {
// This is bullshit. Why does the *fancy* AgencyComm Manager
// prepend the agency url with `arango` but in the end returns an object
// that is prepended by `arango`! WTF!?
VPackSlice plan = res.slice().at(0).get(AgencyCommManager::path()).get(agencyPath);
TRI_ASSERT(plan.isArray() && plan.length() > 0);
VPackSlice leaderSlice = plan.at(0);
TRI_ASSERT(leaderSlice.isString());
if (leaderSlice.isEqualString(claimLeaderId)) {
return Result{};
}
}
return Result{TRI_ERROR_FORBIDDEN};
}
static Result restoreDataParser(char const* ptr, char const* pos,
std::string const& collectionName, int line,
std::string& key, VPackBuilder& builder,
@ -466,6 +501,15 @@ RestStatus RestReplicationHandler::execute() {
} else {
handleCommandRemoveFollower();
}
} else if (command == "set-the-leader") {
if (type != rest::RequestType::PUT) {
goto BAD_CALL;
}
if (!ServerState::instance()->isDBServer()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_CLUSTER_ONLY_ON_DBSERVER);
} else {
handleCommandSetTheLeader();
}
} else if (command == "holdReadLockCollection") {
if (!ServerState::instance()->isDBServer()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_CLUSTER_ONLY_ON_DBSERVER);
@ -501,6 +545,69 @@ BAD_CALL:
return RestStatus::DONE;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief update the leader of a shard
//////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandSetTheLeader() {
TRI_ASSERT(ServerState::instance()->isDBServer());
bool success = false;
VPackSlice const body = this->parseVPackBody(success);
if (!success) {
// error already created
return;
}
if (!body.isObject()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"body needs to be an object with attributes 'leaderId' "
"and 'shard'");
return;
}
VPackSlice const leaderIdSlice = body.get("leaderId");
VPackSlice const oldLeaderIdSlice = body.get("oldLeaderId");
VPackSlice const shard = body.get("shard");
if (!leaderIdSlice.isString() || !shard.isString() || !oldLeaderIdSlice.isString()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"'leaderId' and 'shard' attributes must be strings");
return;
}
std::string leaderId = leaderIdSlice.copyString();
auto col = _vocbase.lookupCollection(shard.copyString());
if (col == nullptr) {
generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND,
"did not find collection");
return;
}
Result res = checkPlanLeaderDirect(col, leaderId);
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
std::string currentLeader = col->followers()->getLeader();
if (currentLeader == arangodb::maintenance::ResignShardLeadership::LeaderNotYetKnownString) {
// We have resigned, check that we are the old leader
currentLeader = ServerState::instance()->getId();
}
if (!oldLeaderIdSlice.isEqualString(currentLeader)) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN, "old leader not as expected");
return;
}
col->followers()->setTheLeader(leaderId);
VPackBuilder b;
{
VPackObjectBuilder bb(&b);
b.add(StaticStrings::Error, VPackValue(false));
}
generateResult(rest::ResponseCode::OK, b.slice());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock JSF_put_api_replication_makeSlave
////////////////////////////////////////////////////////////////////////////////
@ -1105,7 +1212,7 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
// when in the community version, we need to turn off specific attributes
// because they are only supported in enterprise mode
// watch out for "isSmart" -> we need to set this to false in the community version
VPackSlice s = parameters.get(StaticStrings::GraphIsSmart);
if (s.isBoolean() && s.getBoolean()) {
@ -1113,7 +1220,7 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
toMerge.add(StaticStrings::GraphIsSmart, VPackValue(false));
changes.push_back("changed 'isSmart' attribute value to false");
}
// "smartGraphAttribute" needs to be set to be removed too
s = parameters.get(StaticStrings::GraphSmartGraphAttribute);
if (s.isString() && !s.copyString().empty()) {
@ -1121,7 +1228,7 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
toMerge.add(StaticStrings::GraphSmartGraphAttribute, VPackSlice::nullSlice());
changes.push_back("removed 'smartGraphAttribute' attribute value");
}
// same for "smartJoinAttribute"
s = parameters.get(StaticStrings::SmartJoinAttribute);
if (s.isString() && !s.copyString().empty()) {
@ -1129,7 +1236,7 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
toMerge.add(StaticStrings::SmartJoinAttribute, VPackSlice::nullSlice());
changes.push_back("removed 'smartJoinAttribute' attribute value");
}
// finally rewrite all enterprise sharding strategies to a simple hash-based strategy
s = parameters.get("shardingStrategy");
if (s.isString() && s.copyString().find("enterprise") != std::string::npos) {
@ -1142,7 +1249,7 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
if (s.isString() && s.copyString() == "satellite") {
// set "satellite" replicationFactor to the default replication factor
ClusterFeature* cl = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster");
uint32_t replicationFactor = cl->systemReplicationFactor();
toMerge.add(StaticStrings::ReplicationFactor, VPackValue(replicationFactor));
changes.push_back(std::string("changed 'replicationFactor' attribute value to ") + std::to_string(replicationFactor));;

View File

@ -173,6 +173,12 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
void handleCommandRemoveFollower();
//////////////////////////////////////////////////////////////////////////////
/// @brief update the leader of a shard
//////////////////////////////////////////////////////////////////////////////
void handleCommandSetTheLeader();
//////////////////////////////////////////////////////////////////////////////
/// @brief hold a read lock on a collection to stop writes temporarily
//////////////////////////////////////////////////////////////////////////////