diff --git a/arangod/Cluster/Maintenance.cpp b/arangod/Cluster/Maintenance.cpp index 90f9202cf6..aa04016a48 100644 --- a/arangod/Cluster/Maintenance.cpp +++ b/arangod/Cluster/Maintenance.cpp @@ -1079,7 +1079,7 @@ arangodb::Result arangodb::maintenance::syncReplicatedShardsWithLeaders( actions.emplace_back( ActionDescription( std::map { - {NAME, "SynchronizeShard"}, {DATABASE, dbname}, + {NAME, SYNCHRONIZE_SHARD}, {DATABASE, dbname}, {COLLECTION, colname}, {SHARD, shname}, {THE_LEADER, leader}})); } diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index d0df1736a3..27f1b0bf7d 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -79,6 +79,7 @@ std::string const RESTRICT_TYPE("restrictType"); std::string const RESTRICT_COLLECTIONS("restrictCollections"); std::string const SKIP_CREATE_DROP("skipCreateDrop"); std::string const TTL("ttl"); + using namespace std::chrono; SynchronizeShard::SynchronizeShard( @@ -102,10 +103,10 @@ SynchronizeShard::SynchronizeShard( } TRI_ASSERT(desc.has(SHARD)); - if (!desc.has(THE_LEADER)) { - error << "leader must be stecified"; + if (!desc.has(THE_LEADER) || desc.get(THE_LEADER).empty()) { + error << "leader must be specified and must be non-empty"; } - TRI_ASSERT(desc.has(THE_LEADER)); + TRI_ASSERT(desc.has(THE_LEADER) && !desc.get(THE_LEADER).empty()); if (!error.str().empty()) { LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeShard: " << error.str(); @@ -114,6 +115,8 @@ SynchronizeShard::SynchronizeShard( } } +SynchronizeShard::~SynchronizeShard() {} + class SynchronizeShardCallback : public arangodb::ClusterCommCallback { public: explicit SynchronizeShardCallback(SynchronizeShard* callie) {}; @@ -122,9 +125,20 @@ public: } }; -SynchronizeShard::~SynchronizeShard() {} +static std::stringstream& AppendShardInformationToMessage( + std::string const& database, std::string const& shard, + std::string const& planId, + std::chrono::system_clock::time_point const& startTime, + std::stringstream& msg) { + auto const endTime = system_clock::now(); + msg << "local shard: '" << database << "/" << shard << "', " + << "for central: '" << database << "/" << planId << "', " + << "started: " << timepointToString(startTime) << ", " + << "ended: " << timepointToString(endTime); + return msg; +} -arangodb::Result getReadLockId( +static arangodb::Result getReadLockId ( std::string const& endpoint, std::string const& database, std::string const& clientId, double timeout, uint64_t& id) { @@ -163,7 +177,7 @@ arangodb::Result getReadLockId( return arangodb::Result(); } -arangodb::Result collectionCount( +static arangodb::Result collectionCount( std::shared_ptr const& col, uint64_t& c) { std::string collectionName(col->name()); @@ -196,7 +210,7 @@ arangodb::Result collectionCount( } -arangodb::Result addShardFollower( +static arangodb::Result addShardFollower ( std::string const& endpoint, std::string const& database, std::string const& shard, uint64_t lockJobId, std::string const& clientId, double timeout = 120.0) { @@ -283,48 +297,7 @@ arangodb::Result addShardFollower( } } -arangodb::Result removeShardFollower( - std::string const& endpoint, std::string const& database, - std::string const& shard, std::string const& clientId, double timeout = 120.0) { - - LOG_TOPIC(WARN, Logger::MAINTENANCE) << - "removeShardFollower: tell the leader to take us off the follower list..."; - - auto cc = arangodb::ClusterComm::instance(); - if (cc == nullptr) { // nullptr only happens during controlled shutdown - return arangodb::Result( - TRI_ERROR_SHUTTING_DOWN, "startReadLockOnLeader: Shutting down"); - } - - VPackBuilder body; - { VPackObjectBuilder b(&body); - body.add(SHARD, VPackValue(shard)); - body.add(FOLLOWER_ID, - VPackValue(arangodb::ServerState::instance()->getId())); } - - // Note that we always use the _system database here because the actual - // database might be gone already on the leader and we need to cancel - // the read lock under all circumstances. - auto comres = cc->syncRequest( - clientId, 1, endpoint, rest::RequestType::PUT, - DB + database + REPL_REM_FOLLOWER, body.toJson(), - std::unordered_map(), timeout); - - auto result = comres->result; - if (result == nullptr || result->getHttpReturnCode() != 200) { - std::string errorMessage( - "removeShardFollower: could not remove us from the leader's follower list: "); - errorMessage += result->getHttpReturnCode(); - errorMessage += comres->stringifyErrorMessage(); - LOG_TOPIC(ERR, Logger::MAINTENANCE) << errorMessage; - return arangodb::Result(TRI_ERROR_INTERNAL, errorMessage); - } - - LOG_TOPIC(WARN, Logger::MAINTENANCE) << "removeShardFollower: success" ; - return arangodb::Result(); -} - -arangodb::Result cancelReadLockOnLeader( +static arangodb::Result cancelReadLockOnLeader ( std::string const& endpoint, std::string const& database, uint64_t lockJobId, std::string const& clientId, double timeout = 10.0) { @@ -332,7 +305,7 @@ arangodb::Result cancelReadLockOnLeader( auto cc = arangodb::ClusterComm::instance(); if (cc == nullptr) { // nullptr only happens during controlled shutdown return arangodb::Result( - TRI_ERROR_SHUTTING_DOWN, "startReadLockOnLeader: Shutting down"); + TRI_ERROR_SHUTTING_DOWN, "cancelReadLockOnLeader: Shutting down"); } VPackBuilder body; @@ -361,7 +334,7 @@ arangodb::Result cancelReadLockOnLeader( return arangodb::Result(); } -arangodb::Result cancelBarrier( +static arangodb::Result cancelBarrier( std::string const& endpoint, std::string const& database, int64_t barrierId, std::string const& clientId, double timeout = 120.0) { @@ -404,7 +377,7 @@ arangodb::Result cancelBarrier( arangodb::Result SynchronizeShard::getReadLock( std::string const& endpoint, std::string const& database, std::string const& collection, std::string const& clientId, - uint64_t rlid, double timeout) { + uint64_t rlid, bool soft, double timeout) { auto cc = arangodb::ClusterComm::instance(); if (cc == nullptr) { // nullptr only happens during controlled shutdown @@ -416,7 +389,9 @@ arangodb::Result SynchronizeShard::getReadLock( { VPackObjectBuilder o(&body); body.add(ID, VPackValue(std::to_string(rlid))); body.add(COLLECTION, VPackValue(collection)); - body.add(TTL, VPackValue(timeout)); } + body.add(TTL, VPackValue(timeout)); + body.add(StaticStrings::ReplicationSoftLockOnly, VPackValue(soft)); + } auto url = DB + database + REPL_HOLD_READ_LOCK; @@ -430,9 +405,10 @@ arangodb::Result SynchronizeShard::getReadLock( // we must make sure that the read lock on the leader is not active! // This is done automatically below. + double sleepTime = 0.5; size_t count = 0; - while (++count < 20) { // wait for some time until read lock established: - + size_t maxTries = static_cast(std::floor(600.0 / sleepTime)); + while (++count < maxTries) { // wait for some time until read lock established: // Now check that we hold the read lock: auto putres = cc->syncRequest( clientId, 1, endpoint, rest::RequestType::PUT, url, body.toJson(), @@ -455,7 +431,7 @@ arangodb::Result SynchronizeShard::getReadLock( << putres->stringifyErrorMessage(); } - std::this_thread::sleep_for(duration(.5)); + std::this_thread::sleep_for(duration(sleepTime)); } LOG_TOPIC(ERR, Logger::MAINTENANCE) << "startReadLockOnLeader: giving up"; @@ -478,14 +454,14 @@ arangodb::Result SynchronizeShard::getReadLock( } -bool isStopping() { +static inline bool isStopping() { return application_features::ApplicationServer::isStopping(); } arangodb::Result SynchronizeShard::startReadLockOnLeader( std::string const& endpoint, std::string const& database, std::string const& collection, std::string const& clientId, - uint64_t& rlid, double timeout) { + uint64_t& rlid, bool soft, double timeout) { // Read lock id rlid = 0; @@ -498,7 +474,7 @@ arangodb::Result SynchronizeShard::startReadLockOnLeader( LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "Got read lock id: " << rlid; } - result = getReadLock(endpoint, database, collection, clientId, rlid, timeout); + result = getReadLock(endpoint, database, collection, clientId, rlid, soft, timeout); return result; } @@ -508,7 +484,7 @@ enum ApplierType { APPLIER_GLOBAL }; -arangodb::Result replicationSynchronize( +static arangodb::Result replicationSynchronize( std::shared_ptr const &col, VPackSlice const& config, ApplierType applierType, std::shared_ptr sy) { @@ -589,7 +565,47 @@ arangodb::Result replicationSynchronize( } -arangodb::Result replicationSynchronizeFinalize(VPackSlice const& conf) { +static arangodb::Result replicationSynchronizeCatchup(VPackSlice const& conf, + TRI_voc_tick_t& tickReached) { + + auto const database = conf.get(DATABASE).copyString(); + auto const collection = conf.get(COLLECTION).copyString(); + auto const leaderId = conf.get(LEADER_ID).copyString(); + auto const fromTick = conf.get("from").getNumber(); + + ReplicationApplierConfiguration configuration = + ReplicationApplierConfiguration::fromVelocyPack(conf, database); + // will throw if invalid + configuration.validate(); + + DatabaseGuard guard(database); + DatabaseTailingSyncer syncer(guard.database(), configuration, fromTick, true, 0); + + if (!leaderId.empty()) { + syncer.setLeaderId(leaderId); + } + + Result r; + try { + r = syncer.syncCollectionCatchup(collection, tickReached); + } catch (arangodb::basics::Exception const& ex) { + r = Result(ex.code(), ex.what()); + } catch (std::exception const& ex) { + r = Result(TRI_ERROR_INTERNAL, ex.what()); + } catch (...) { + r = Result(TRI_ERROR_INTERNAL, "unknown exception"); + } + + if (r.fail()) { + LOG_TOPIC(ERR, Logger::REPLICATION) + << "syncCollectionFinalize failed: " << r.errorMessage(); + } + + return r; +} + + +static arangodb::Result replicationSynchronizeFinalize(VPackSlice const& conf) { auto const database = conf.get(DATABASE).copyString(); auto const collection = conf.get(COLLECTION).copyString(); auto const leaderId = conf.get(LEADER_ID).copyString(); @@ -640,7 +656,7 @@ bool SynchronizeShard::first() { auto const ourselves = arangodb::ServerState::instance()->getId(); auto startTime = system_clock::now(); auto const startTimeStr = timepointToString(startTime); - auto const clientId(database + planId + shard + leader); + std::string const clientId(database + planId + shard + leader); // First wait until the leader has created the shard (visible in // Current in the Agency) or we or the shard have vanished from @@ -659,11 +675,9 @@ bool SynchronizeShard::first() { std::find(planned.begin(), planned.end(), ourselves) == planned.end() || planned.front() != leader) { // Things have changed again, simply terminate: - auto const endTime = system_clock::now(); std::stringstream error; - error << "cancelled, " << database << "/" << shard << ", " << database - << "/" << planId << ", started " << startTimeStr << ", ended " - << timepointToString(endTime); + error << "cancelled, "; + AppendShardInformationToMessage(database, shard, planId, startTime, error); LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str(); _result.reset(TRI_ERROR_FAILED, error.str()); return false; @@ -673,14 +687,10 @@ bool SynchronizeShard::first() { try { // ci->getCollection can throw ci = clusterInfo->getCollection(database, planId); } catch(...) { - auto const endTime = system_clock::now(); std::stringstream msg; - msg << "exception in getCollection, " << database << "/" - << shard << ", " << database - << "/" << planId << ", started " << startTimeStr << ", ended " - << timepointToString(endTime); - LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " - << msg.str(); + msg << "exception in getCollection, "; + AppendShardInformationToMessage(database, shard, planId, startTime, msg); + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << msg.str(); _result.reset(TRI_ERROR_FAILED, msg.str()); return false; } @@ -692,20 +702,16 @@ bool SynchronizeShard::first() { std::vector current = cic->servers(shard); if (current.empty()) { - Result(TRI_ERROR_FAILED, - "synchronizeOneShard: cancelled, no servers in 'Current'"); - } - if (current.front() == leader) { + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) + << "synchronizeOneShard: cancelled, no servers in 'Current'"; + } else if (current.front() == leader) { if (std::find(current.begin(), current.end(), ourselves) == current.end()) { break; // start synchronization work } // We are already there, this is rather strange, but never mind: - auto const endTime = system_clock::now(); std::stringstream error; - error - << "already done, " << database << "/" << shard - << ", " << database << "/" << planId << ", started " - << startTimeStr << ", ended " << timepointToString(endTime); + error << "already done, "; + AppendShardInformationToMessage(database, shard, planId, startTime, error); LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str(); _result.reset(TRI_ERROR_FAILED, error.str()); return false; @@ -716,7 +722,6 @@ bool SynchronizeShard::first() { << "/" << shard << ", " << database << "/" << planId; std::this_thread::sleep_for(duration(0.2)); - } // Once we get here, we know that the leader is ready for sync, so we give it a try: @@ -752,16 +757,16 @@ bool SynchronizeShard::first() { database << "/" << shard << "' for central '" << database << "/" << planId << "'"; try { - auto asResult = addShardFollower(ep, database, shard, 0, clientId, 60.0); if (asResult.ok()) { - auto const endTime = system_clock::now(); - LOG_TOPIC(DEBUG, Logger::MAINTENANCE) - << "synchronizeOneShard: shortcut worked, done, " << database << "/" - << shard << ", " << database << "/" << planId <<", started: " - << startTimeStr << " ended: " << timepointToString(endTime); + if (Logger::isEnabled(LogLevel::DEBUG, Logger::MAINTENANCE)) { + std::stringstream msg; + msg << "synchronizeOneShard: shortcut worked, done, "; + AppendShardInformationToMessage(database, shard, planId, startTime, msg); + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << msg.str(); + } collection->followers()->setTheLeader(leader); notify(); return false; @@ -774,29 +779,28 @@ bool SynchronizeShard::first() { << "/" << shard << "' for central '" << database << "/" << planId << "'"; try { + // From here on we perform a number of steps, each of which can + // fail. If it fails with an exception, it is caught, but this + // should usually not happen. If it fails without an exception, + // we log and use return. + + Result res; // used multiple times for intermediate results + // First once without a read transaction: if (isStopping()) { - _result.reset(TRI_ERROR_INTERNAL, "server is shutting down"); + std::string errorMessage( + "synchronizeOneShard: synchronization failed for shard "); + errorMessage += shard + ": shutdown in progress, giving up"; + LOG_TOPIC(INFO, Logger::MAINTENANCE) << errorMessage; + _result.reset(TRI_ERROR_INTERNAL, errorMessage); + return false; } - // Mark us as follower for this leader such that we begin - // accepting replication operations, note that this is also - // used for the initial synchronization: - + // This is necessary to accept replications from the leader which can + // happen as soon as we are in sync. collection->followers()->setTheLeader(leader); - if (leader.empty()) { - collection->followers()->clear(); - } - - // do not reset followers when we resign at this time...we are - // still the only source of truth to trust, in particular, in the - // planned leader resignation, we will shortly after the call to - // this function here report the controlled resignation to the - // agency. This report must still contain the correct follower list - // or else the supervision is super angry with us. - startTime = system_clock::now(); VPackBuilder config; @@ -815,143 +819,86 @@ bool SynchronizeShard::first() { auto details = std::make_shared(); - Result syncRes = replicationSynchronize( + res = replicationSynchronize( collection, config.slice(), APPLIER_DATABASE, details); auto sy = details->slice(); auto const endTime = system_clock::now(); - bool longSync = false; - // Long shard sync initialisation if (endTime-startTime > seconds(5)) { - LOG_TOPIC(WARN, Logger::MAINTENANCE) + LOG_TOPIC(INFO, Logger::MAINTENANCE) << "synchronizeOneShard: long call to syncCollection for shard" - << shard << " " << syncRes.errorMessage() << " start time: " - << timepointToString(startTime) << "end time: " + << shard << " " << res.errorMessage() << " start time: " + << timepointToString(startTime) << ", end time: " << timepointToString(system_clock::now()); - longSync = true; } - // - if (!syncRes.ok()) { + // If this did not work, then we cannot go on: + if (!res.ok()) { std::stringstream error; error << "could not initially synchronize shard " << shard << ": " - << syncRes.errorMessage(); + << res.errorMessage(); LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str(); _result.reset(TRI_ERROR_INTERNAL, error.str()); return false; - } else { - - VPackSlice collections = sy.get(COLLECTIONS); - - if (collections.length() == 0 || - collections[0].get("name").copyString() != shard) { - - if (longSync) { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: long sync, before cancelBarrier" - << timepointToString(system_clock::now()); - } - cancelBarrier(ep, database, sy.get(BARRIER_ID).getNumber(), clientId); - if (longSync) { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: long sync, after cancelBarrier" - << timepointToString(system_clock::now()); - } - - std::stringstream error; - error << "shard " << shard << " seems to be gone from leader!"; - LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str(); - _result.reset(TRI_ERROR_INTERNAL, error.str()); - return false; - - } else { - - // Now start a read transaction to stop writes: - uint64_t lockJobId = 0; - LOG_TOPIC(DEBUG, Logger::MAINTENANCE) - << "synchronizeOneShard: startReadLockOnLeader: " << ep << ":" - << database << ":" << collection->name(); - Result result = startReadLockOnLeader( - ep, database, collection->name(), clientId, lockJobId); - if (result.ok()) { - LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; - } else { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: error in startReadLockOnLeader:" - << result.errorMessage(); - } - - cancelBarrier(ep, database, sy.get("barrierId").getNumber(), clientId); - - if (lockJobId != 0) { - - VPackBuilder builder; - { VPackObjectBuilder o(&builder); - builder.add(ENDPOINT, VPackValue(ep)); - builder.add(DATABASE, VPackValue(database)); - builder.add(COLLECTION, VPackValue(shard)); - builder.add(LEADER_ID, VPackValue(leader)); - builder.add("from", sy.get(LAST_LOG_TICK)); - builder.add("requestTimeout", VPackValue(60.0)); - builder.add("connectTimeout", VPackValue(60.0)); - } - - Result fres = replicationSynchronizeFinalize (builder.slice()); - - if (fres.ok()) { - result = addShardFollower(ep, database, shard, lockJobId, clientId, 60.0); - - if (!result.ok()) { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: failed to add follower" - << result.errorMessage(); - } - } else { - std::string errorMessage( - "synchronizeOneshard: error in syncCollectionFinalize: ") ; - errorMessage += fres.errorMessage(); - result = Result(TRI_ERROR_INTERNAL, errorMessage); - } - - // This result is unused, only in logs - Result lockResult = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0); - if (!lockResult.ok()) { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: read lock has timed out for shard " << shard; - } - } else { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: lockJobId was false for shard" << shard; - } - - if (result.ok()) { - LOG_TOPIC(DEBUG, Logger::MAINTENANCE) - << "synchronizeOneShard: synchronization worked for shard " << shard; - _result.reset(TRI_ERROR_NO_ERROR); - } else { - LOG_TOPIC(ERR, Logger::MAINTENANCE) - << "synchronizeOneShard: synchronization failed for shard " << shard; - std::string errorMessage( - "synchronizeOneShard: synchronization failed for shard " - + shard + ":" + result.errorMessage()); - _result = Result(TRI_ERROR_INTERNAL, errorMessage);; - } - } } + + // From here on, we have to call `cancelBarrier` in case of errors + // as well as in the success case! + int64_t barrierId = sy.get(BARRIER_ID).getNumber(); + TRI_DEFER(cancelBarrier(ep, database, barrierId, clientId)); + + VPackSlice collections = sy.get(COLLECTIONS); + + if (collections.length() == 0 || + collections[0].get("name").copyString() != shard) { + + std::stringstream error; + error << "shard " << shard << " seems to be gone from leader, this " + "can happen if a collection was dropped during synchronization!"; + LOG_TOPIC(WARN, Logger::MAINTENANCE) << "SynchronizeOneShard: " + << error.str(); + _result.reset(TRI_ERROR_INTERNAL, error.str()); + return false; + + } + + auto lastTick = arangodb::basics::VelocyPackHelper::readNumericValue(sy, LAST_LOG_TICK, 0); + VPackBuilder builder; + + ResultT tickResult = catchupWithReadLock(ep, database, *collection, clientId, + shard, leader, lastTick, builder); + if (!res.ok()) { + LOG_TOPIC(INFO, Logger::MAINTENANCE) << res.errorMessage(); + _result.reset(tickResult); + return false; + } + lastTick = tickResult.get(); + + // Now start a exclusive transaction to stop writes: + res = catchupWithExclusiveLock(ep, database, *collection, clientId, + shard, leader, lastTick, builder); + if (!res.ok()) { + LOG_TOPIC(INFO, Logger::MAINTENANCE) << res.errorMessage(); + _result.reset(res); + return false; + } + } catch (std::exception const& e) { - auto const endTime = system_clock::now(); std::stringstream error; - error << "synchronization of local shard '" << database << "/" << shard - << "' for central '" << database << "/" << planId << "' failed: " - << e.what() << timepointToString(endTime); + error << "synchronization of"; + AppendShardInformationToMessage(database, shard, planId, startTime, error); + error << " failed: " << e.what(); LOG_TOPIC(ERR, Logger::MAINTENANCE) << error.str(); _result.reset(TRI_ERROR_INTERNAL, e.what()); return false; } + // Validate that HARDLOCK only works! } catch (std::exception const& e) { + // This catches the case that we could not even find the collection + // locally, because the DatabaseGuard constructor threw. LOG_TOPIC(WARN, Logger::MAINTENANCE) << "action " << _description << " failed with exception " << e.what(); _result.reset(TRI_ERROR_INTERNAL, e.what()); @@ -959,13 +906,169 @@ bool SynchronizeShard::first() { } // Tell others that we are done: - auto const endTime = system_clock::now(); - LOG_TOPIC(INFO, Logger::MAINTENANCE) - << "synchronizeOneShard: done, " << database << "/" << shard << ", " - << database << "/" << planId << ", started: " - << timepointToString(startTime) << ", ended: " << timepointToString(endTime); - + if (Logger::isEnabled(LogLevel::INFO, Logger::MAINTENANCE)) { + // This wrap is just to not write the stream if not needed. + std::stringstream msg; + AppendShardInformationToMessage(database, shard, planId, startTime, msg); + LOG_TOPIC(INFO, Logger::MAINTENANCE) + << "synchronizeOneShard: done, " << msg.str(); + } notify(); - return false;; + return false; } +ResultT SynchronizeShard::catchupWithReadLock( + std::string const& ep, + std::string const& database, + LogicalCollection const& collection, + std::string const& clientId, + std::string const& shard, + std::string const& leader, + TRI_voc_tick_t lastLogTick, + VPackBuilder& builder) { + // Now ask for a "soft stop" on the leader, in case of mmfiles, this + // will be a hard stop, but for rocksdb, this is a no-op: + uint64_t lockJobId = 0; + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) + << "synchronizeOneShard: startReadLockOnLeader (soft): " << ep << ":" + << database << ":" << collection.name(); + Result res = startReadLockOnLeader( + ep, database, collection.name(), clientId, lockJobId, true); + if (!res.ok()) { + std::string errorMessage = + "synchronizeOneShard: error in startReadLockOnLeader (soft):" + + res.errorMessage(); + return ResultT::error(TRI_ERROR_INTERNAL, errorMessage); + } + auto readLockGuard = arangodb::scopeGuard([&]() { + // Always cancel the read lock. + // Reported seperately + auto res = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0); + if (!res.ok()) { + LOG_TOPIC(INFO, Logger::MAINTENANCE) + << "Could not cancel soft read lock on leader: " + << res.errorMessage(); + } + }); + + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; + + // From now on, we need to cancel the read lock on the leader regardless + // if things go wrong or right! + + // Do a first try of a catch up with the WAL. In case of RocksDB, + // this has not yet stopped the writes, so we have to be content + // with nearly reaching the end of the WAL, which is a "soft" catchup. + builder.clear(); + { VPackObjectBuilder o(&builder); + builder.add(ENDPOINT, VPackValue(ep)); + builder.add(DATABASE, VPackValue(database)); + builder.add(COLLECTION, VPackValue(shard)); + builder.add(LEADER_ID, VPackValue(leader)); + builder.add("from", VPackValue(lastLogTick)); + builder.add("requestTimeout", VPackValue(600.0)); + builder.add("connectTimeout", VPackValue(60.0)); + } + + TRI_voc_tick_t tickReached = 0; + res = replicationSynchronizeCatchup(builder.slice(), tickReached); + + if (!res.ok()) { + std::string errorMessage( + "synchronizeOneshard: error in syncCollectionCatchup: ") ; + errorMessage += res.errorMessage(); + return ResultT::error(TRI_ERROR_INTERNAL, errorMessage); + } + + // Stop the read lock again: + res = cancelReadLockOnLeader(ep, database, lockJobId, + clientId, 60.0); + // We removed the readlock + readLockGuard.cancel(); + if (!res.ok()) { + std::string errorMessage + = "synchronizeOneShard: error when cancelling soft read lock: " + + res.errorMessage(); + LOG_TOPIC(INFO, Logger::MAINTENANCE) << errorMessage; + _result.reset(TRI_ERROR_INTERNAL, errorMessage); + return ResultT::error(TRI_ERROR_INTERNAL, errorMessage); + } + return ResultT::success(tickReached); +} + +Result SynchronizeShard::catchupWithExclusiveLock(std::string const& ep, + std::string const& database, + LogicalCollection const& collection, + std::string const& clientId, + std::string const& shard, + std::string const& leader, + TRI_voc_tick_t lastLogTick, + VPackBuilder& builder) { + uint64_t lockJobId = 0; + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) + << "synchronizeOneShard: startReadLockOnLeader: " << ep << ":" + << database << ":" << collection.name(); + Result res = startReadLockOnLeader( + ep, database, collection.name(), clientId, lockJobId, false); + if (!res.ok()) { + std::string errorMessage = + "synchronizeOneShard: error in startReadLockOnLeader (hard):" + + res.errorMessage(); + return {TRI_ERROR_INTERNAL, errorMessage}; + } + auto readLockGuard = arangodb::scopeGuard([&]() { + // Always cancel the read lock. + // Reported seperately + auto res = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0); + if (!res.ok()) { + LOG_TOPIC(INFO, Logger::MAINTENANCE) + << "Could not cancel hard read lock on leader: " + << res.errorMessage(); + } + }); + + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; + + builder.clear(); + { VPackObjectBuilder o(&builder); + builder.add(ENDPOINT, VPackValue(ep)); + builder.add(DATABASE, VPackValue(database)); + builder.add(COLLECTION, VPackValue(shard)); + builder.add(LEADER_ID, VPackValue(leader)); + builder.add("from", VPackValue(lastLogTick)); + builder.add("requestTimeout", VPackValue(600.0)); + builder.add("connectTimeout", VPackValue(60.0)); + } + + res = replicationSynchronizeFinalize(builder.slice()); + + if (!res.ok()) { + std::string errorMessage( + "synchronizeOneshard: error in syncCollectionFinalize: ") ; + errorMessage += res.errorMessage(); + return {TRI_ERROR_INTERNAL, errorMessage}; + } + + res = addShardFollower(ep, database, shard, lockJobId, clientId, 60.0); + + if (!res.ok()) { + std::string errorMessage( + "synchronizeOneshard: error in addShardFollower: ") ; + errorMessage += res.errorMessage(); + return {TRI_ERROR_INTERNAL, errorMessage}; + } + + // This result is unused, only in logs + res = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0); + readLockGuard.cancel(); + if (!res.ok()) { + LOG_TOPIC(INFO, Logger::MAINTENANCE) + << "synchronizeOneShard: read lock has timed out for shard " << shard; + } + + // Report success: + LOG_TOPIC(DEBUG, Logger::MAINTENANCE) + << "synchronizeOneShard: synchronization worked for shard " << shard; + _result.reset(TRI_ERROR_NO_ERROR); + return {TRI_ERROR_NO_ERROR}; +} diff --git a/arangod/Cluster/SynchronizeShard.h b/arangod/Cluster/SynchronizeShard.h index bb83f81c32..587a0da494 100644 --- a/arangod/Cluster/SynchronizeShard.h +++ b/arangod/Cluster/SynchronizeShard.h @@ -27,11 +27,15 @@ #include "ActionBase.h" #include "ActionDescription.h" +#include "Cluster/ResultT.h" +#include "VocBase/voc-types.h" #include namespace arangodb { +class LogicalCollection; + class MaintenanceAction; namespace maintenance { @@ -50,13 +54,31 @@ private: arangodb::Result getReadLock( std::string const& endpoint, std::string const& database, std::string const& collection, std::string const& clientId, uint64_t rlid, - double timeout = 300.0); + bool soft, double timeout = 300.0); arangodb::Result startReadLockOnLeader( std::string const& endpoint, std::string const& database, std::string const& collection, std::string const& clientId, uint64_t& rlid, - double timeout = 300.0); + bool soft, double timeout = 300.0); + + arangodb::ResultT catchupWithReadLock(std::string const& ep, + std::string const& database, + LogicalCollection const& collection, + std::string const& clientId, + std::string const& shard, + std::string const& leader, + TRI_voc_tick_t lastLogTick, + VPackBuilder& builder); + + arangodb::Result catchupWithExclusiveLock(std::string const& ep, + std::string const& database, + LogicalCollection const& collection, + std::string const& clientId, + std::string const& shard, + std::string const& leader, + TRI_voc_tick_t lastLogTick, + VPackBuilder& builder); }; }} diff --git a/arangod/Replication/DatabaseTailingSyncer.cpp b/arangod/Replication/DatabaseTailingSyncer.cpp index 427e9b7af5..1202811c86 100644 --- a/arangod/Replication/DatabaseTailingSyncer.cpp +++ b/arangod/Replication/DatabaseTailingSyncer.cpp @@ -101,8 +101,9 @@ Result DatabaseTailingSyncer::saveApplierState() { /// @brief finalize the synchronization of a collection by tailing the WAL /// and filtering on the collection name until no more data is available -Result DatabaseTailingSyncer::syncCollectionFinalize( - std::string const& collectionName) { +Result DatabaseTailingSyncer::syncCollectionCatchupInternal( + std::string const& collectionName, bool hard, TRI_voc_tick_t& until) { + setAborted(false); // fetch master state just once Result r = _state.master.getState(_state.connection, _state.isChildSyncer); @@ -117,9 +118,19 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( TRI_voc_tick_t fromTick = _initialTick; TRI_voc_tick_t lastScannedTick = fromTick; - LOG_TOPIC(DEBUG, Logger::REPLICATION) - << "starting syncCollectionFinalize:" << collectionName << ", fromTick " - << fromTick; + + if (hard) { + LOG_TOPIC(DEBUG, Logger::REPLICATION) + << "starting syncCollectionFinalize:" << collectionName << ", fromTick " + << fromTick; + } else { + LOG_TOPIC(DEBUG, Logger::REPLICATION) + << "starting syncCollectionCatchup:" << collectionName << ", fromTick " + << fromTick; + } + + auto clock = std::chrono::steady_clock(); + auto startTime = clock.now(); while (true) { if (application_features::ApplicationServer::isStopping()) { @@ -141,11 +152,13 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( }); if (replutils::hasFailed(response.get())) { + until = fromTick; return replutils::buildHttpError(response.get(), url, _state.connection); } if (response->getHttpReturnCode() == 204) { // HTTP 204 No content: this means we are done + until = fromTick; return Result(); } @@ -166,6 +179,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( header = response->getHeaderField( StaticStrings::ReplicationHeaderLastIncluded, found); if (!found) { + until = fromTick; return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, std::string("got invalid response from master at ") + _state.master.endpoint + ": required header " + @@ -182,6 +196,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( fromIncluded = StringUtils::boolean(header); } if (!fromIncluded && fromTick > 0) { + until = fromTick; return Result( TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT, std::string("required follow tick value '") + @@ -198,6 +213,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( Result r = applyLog(response.get(), fromTick, processedMarkers, ignoreCount); if (r.fail()) { + until = fromTick; return r; } @@ -213,8 +229,28 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( << "this indicates we're at the end"; } + // If this is non-hard, we employ some heuristics to stop early: + if (!hard) { + if (clock.now() - startTime > std::chrono::seconds(1) && _ongoingTransactions.empty()) { + checkMore = false; + } else { + TRI_voc_tick_t lastTick = 0; + header = response->getHeaderField( + StaticStrings::ReplicationHeaderLastTick, found); + if (found) { + lastTick = StringUtils::uint64(header); + if (_ongoingTransactions.empty() && + lastTick > lastIncludedTick && // just to make sure! + lastTick - lastIncludedTick < 1000) { + checkMore = false; + } + } + } + } + if (!checkMore) { // done! + until = fromTick; return Result(); } LOG_TOPIC(DEBUG, Logger::REPLICATION) diff --git a/arangod/Replication/DatabaseTailingSyncer.h b/arangod/Replication/DatabaseTailingSyncer.h index 3367f718a3..20102b6e8b 100644 --- a/arangod/Replication/DatabaseTailingSyncer.h +++ b/arangod/Replication/DatabaseTailingSyncer.h @@ -50,10 +50,30 @@ class DatabaseTailingSyncer final : public TailingSyncer { /// @brief finalize the synchronization of a collection by tailing the WAL /// and filtering on the collection name until no more data is available - Result syncCollectionFinalize(std::string const& collectionName); + Result syncCollectionFinalize(std::string const& collectionName) { + TRI_voc_tick_t dummy = 0; + return syncCollectionCatchupInternal(collectionName, true, dummy); + } + + /// @brief catch up with changes in a leader shard by doing the same + /// as in syncCollectionFinalize, but potentially stopping earlier. + /// This function will use some heuristics to stop early, when most + /// of the catching up is already done. In this case, the replication + /// will end and store the tick to which it got in the argument `until`. + /// The idea is that one can use `syncCollectionCatchup` without stopping + /// writes on the leader to catch up mostly. Then one can stop writes + /// by getting an exclusive lock on the leader and use + /// `syncCollectionFinalize` to finish off the rest. + /// Internally, both use `syncCollectionCatchupInternal`. + Result syncCollectionCatchup(std::string const& collectionName, + TRI_voc_tick_t& until) { + return syncCollectionCatchupInternal(collectionName, false, until); + } protected: + Result syncCollectionCatchupInternal(std::string const& collectionName, + bool hard, TRI_voc_tick_t& until); /// @brief save the current applier state Result saveApplierState() override; diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 64e804de16..51031aaa28 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2143,7 +2143,7 @@ void RestReplicationHandler::handleCommandAddFollower() { if (!checksumSlice.isEqualString(referenceChecksum.get())) { const std::string checksum = checksumSlice.copyString(); LOG_TOPIC(WARN, Logger::REPLICATION) << "Cannot add follower, mismatching checksums. " - << "Expected: " << referenceChecksum << " Actual: " << checksum; + << "Expected: " << referenceChecksum.get() << " Actual: " << checksum; generateError(rest::ResponseCode::BAD, TRI_ERROR_REPLICATION_WRONG_CHECKSUM, "'checksum' is wrong. Expected: " + referenceChecksum.get() diff --git a/arangod/RocksDBEngine/RocksDBWalAccess.cpp b/arangod/RocksDBEngine/RocksDBWalAccess.cpp index 613a912b9c..035e37609b 100644 --- a/arangod/RocksDBEngine/RocksDBWalAccess.cpp +++ b/arangod/RocksDBEngine/RocksDBWalAccess.cpp @@ -744,6 +744,11 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize, // pre 3.4 breaking up write batches is not supported size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX; + /* + maxTrxChunkSize = 16 * 16 * 16; + chunkSize = 16 * 16 * 16; + */ + MyWALDumper dumper(filter, func, maxTrxChunkSize); const uint64_t since = dumper.safeBeginTick(); TRI_ASSERT(since <= filter.tickStart); @@ -790,6 +795,7 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize, //LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence; lastScannedTick = batch.sequence; // start of the batch + if (batch.sequence < since) { iterator->Next(); // skip continue; diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index 27217702c9..147e4313dd 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -189,6 +189,9 @@ std::string const StaticStrings::GraphDropCollection("dropCollection"); std::string const StaticStrings::GraphCreateCollections("createCollections"); std::string const StaticStrings::GraphCreateCollection("createCollection"); +// Replication +std::string const StaticStrings::ReplicationSoftLockOnly("doSoftLockOnly"); + // misc strings std::string const StaticStrings::LastValue("lastValue"); std::string const StaticStrings::checksumFileJs("JS_SHA1SUM.txt"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index 68ba65c8ea..dbc2b6f6e9 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -171,6 +171,9 @@ class StaticStrings { static std::string const GraphInitialCid; static std::string const GraphName; + // Replication + static std::string const ReplicationSoftLockOnly; + // misc strings static std::string const LastValue; static std::string const checksumFileJs; diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index 1a4a541505..efddb66ede 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -201,6 +201,7 @@ start() { --javascript.module-directory $SRC_DIR/enterprise/js \ --javascript.app-path cluster/apps$PORT \ --log.force-direct true \ + --log.thread true \ --log.level $LOG_LEVEL_CLUSTER \ --javascript.allow-admin-execute true \ $STORAGE_ENGINE \