mirror of https://gitee.com/bigwinds/arangodb
Bug fix 3.4/shorter foot in door (#7084)
* Implement `syncCollectionCatchup` in DatabaseTailingSyncer. First stab, might not even compile. * Fixed a typo. * Fix a typo and a compilation problem. * Further compilation fix. * Implement two stage catchup. * Two small corrections. * Unified error messages in Synchronize shard job. * Improved a code comment. * Fixed autocasting bool->double and double->bool issue. That is truely one of the best features ever invented... </irony> * Renamed doHardLock => toSoftLockOnly and inverted default value * Merged soft/hard foot logic with Transaction splits * Use scopeguards to cancel readlocks
This commit is contained in:
parent
78132bc2e5
commit
e05880895a
|
@ -1079,7 +1079,7 @@ arangodb::Result arangodb::maintenance::syncReplicatedShardsWithLeaders(
|
|||
actions.emplace_back(
|
||||
ActionDescription(
|
||||
std::map<std::string,std::string> {
|
||||
{NAME, "SynchronizeShard"}, {DATABASE, dbname},
|
||||
{NAME, SYNCHRONIZE_SHARD}, {DATABASE, dbname},
|
||||
{COLLECTION, colname}, {SHARD, shname}, {THE_LEADER, leader}}));
|
||||
|
||||
}
|
||||
|
|
|
@ -79,6 +79,7 @@ std::string const RESTRICT_TYPE("restrictType");
|
|||
std::string const RESTRICT_COLLECTIONS("restrictCollections");
|
||||
std::string const SKIP_CREATE_DROP("skipCreateDrop");
|
||||
std::string const TTL("ttl");
|
||||
|
||||
using namespace std::chrono;
|
||||
|
||||
SynchronizeShard::SynchronizeShard(
|
||||
|
@ -102,10 +103,10 @@ SynchronizeShard::SynchronizeShard(
|
|||
}
|
||||
TRI_ASSERT(desc.has(SHARD));
|
||||
|
||||
if (!desc.has(THE_LEADER)) {
|
||||
error << "leader must be stecified";
|
||||
if (!desc.has(THE_LEADER) || desc.get(THE_LEADER).empty()) {
|
||||
error << "leader must be specified and must be non-empty";
|
||||
}
|
||||
TRI_ASSERT(desc.has(THE_LEADER));
|
||||
TRI_ASSERT(desc.has(THE_LEADER) && !desc.get(THE_LEADER).empty());
|
||||
|
||||
if (!error.str().empty()) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeShard: " << error.str();
|
||||
|
@ -114,6 +115,8 @@ SynchronizeShard::SynchronizeShard(
|
|||
}
|
||||
}
|
||||
|
||||
SynchronizeShard::~SynchronizeShard() {}
|
||||
|
||||
class SynchronizeShardCallback : public arangodb::ClusterCommCallback {
|
||||
public:
|
||||
explicit SynchronizeShardCallback(SynchronizeShard* callie) {};
|
||||
|
@ -122,9 +125,20 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
SynchronizeShard::~SynchronizeShard() {}
|
||||
static std::stringstream& AppendShardInformationToMessage(
|
||||
std::string const& database, std::string const& shard,
|
||||
std::string const& planId,
|
||||
std::chrono::system_clock::time_point const& startTime,
|
||||
std::stringstream& msg) {
|
||||
auto const endTime = system_clock::now();
|
||||
msg << "local shard: '" << database << "/" << shard << "', "
|
||||
<< "for central: '" << database << "/" << planId << "', "
|
||||
<< "started: " << timepointToString(startTime) << ", "
|
||||
<< "ended: " << timepointToString(endTime);
|
||||
return msg;
|
||||
}
|
||||
|
||||
arangodb::Result getReadLockId(
|
||||
static arangodb::Result getReadLockId (
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& clientId, double timeout, uint64_t& id) {
|
||||
|
||||
|
@ -163,7 +177,7 @@ arangodb::Result getReadLockId(
|
|||
return arangodb::Result();
|
||||
}
|
||||
|
||||
arangodb::Result collectionCount(
|
||||
static arangodb::Result collectionCount(
|
||||
std::shared_ptr<arangodb::LogicalCollection> const& col, uint64_t& c) {
|
||||
|
||||
std::string collectionName(col->name());
|
||||
|
@ -196,7 +210,7 @@ arangodb::Result collectionCount(
|
|||
|
||||
}
|
||||
|
||||
arangodb::Result addShardFollower(
|
||||
static arangodb::Result addShardFollower (
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& shard, uint64_t lockJobId,
|
||||
std::string const& clientId, double timeout = 120.0) {
|
||||
|
@ -283,48 +297,7 @@ arangodb::Result addShardFollower(
|
|||
}
|
||||
}
|
||||
|
||||
arangodb::Result removeShardFollower(
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& shard, std::string const& clientId, double timeout = 120.0) {
|
||||
|
||||
LOG_TOPIC(WARN, Logger::MAINTENANCE) <<
|
||||
"removeShardFollower: tell the leader to take us off the follower list...";
|
||||
|
||||
auto cc = arangodb::ClusterComm::instance();
|
||||
if (cc == nullptr) { // nullptr only happens during controlled shutdown
|
||||
return arangodb::Result(
|
||||
TRI_ERROR_SHUTTING_DOWN, "startReadLockOnLeader: Shutting down");
|
||||
}
|
||||
|
||||
VPackBuilder body;
|
||||
{ VPackObjectBuilder b(&body);
|
||||
body.add(SHARD, VPackValue(shard));
|
||||
body.add(FOLLOWER_ID,
|
||||
VPackValue(arangodb::ServerState::instance()->getId())); }
|
||||
|
||||
// Note that we always use the _system database here because the actual
|
||||
// database might be gone already on the leader and we need to cancel
|
||||
// the read lock under all circumstances.
|
||||
auto comres = cc->syncRequest(
|
||||
clientId, 1, endpoint, rest::RequestType::PUT,
|
||||
DB + database + REPL_REM_FOLLOWER, body.toJson(),
|
||||
std::unordered_map<std::string, std::string>(), timeout);
|
||||
|
||||
auto result = comres->result;
|
||||
if (result == nullptr || result->getHttpReturnCode() != 200) {
|
||||
std::string errorMessage(
|
||||
"removeShardFollower: could not remove us from the leader's follower list: ");
|
||||
errorMessage += result->getHttpReturnCode();
|
||||
errorMessage += comres->stringifyErrorMessage();
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << errorMessage;
|
||||
return arangodb::Result(TRI_ERROR_INTERNAL, errorMessage);
|
||||
}
|
||||
|
||||
LOG_TOPIC(WARN, Logger::MAINTENANCE) << "removeShardFollower: success" ;
|
||||
return arangodb::Result();
|
||||
}
|
||||
|
||||
arangodb::Result cancelReadLockOnLeader(
|
||||
static arangodb::Result cancelReadLockOnLeader (
|
||||
std::string const& endpoint, std::string const& database,
|
||||
uint64_t lockJobId, std::string const& clientId,
|
||||
double timeout = 10.0) {
|
||||
|
@ -332,7 +305,7 @@ arangodb::Result cancelReadLockOnLeader(
|
|||
auto cc = arangodb::ClusterComm::instance();
|
||||
if (cc == nullptr) { // nullptr only happens during controlled shutdown
|
||||
return arangodb::Result(
|
||||
TRI_ERROR_SHUTTING_DOWN, "startReadLockOnLeader: Shutting down");
|
||||
TRI_ERROR_SHUTTING_DOWN, "cancelReadLockOnLeader: Shutting down");
|
||||
}
|
||||
|
||||
VPackBuilder body;
|
||||
|
@ -361,7 +334,7 @@ arangodb::Result cancelReadLockOnLeader(
|
|||
return arangodb::Result();
|
||||
}
|
||||
|
||||
arangodb::Result cancelBarrier(
|
||||
static arangodb::Result cancelBarrier(
|
||||
std::string const& endpoint, std::string const& database,
|
||||
int64_t barrierId, std::string const& clientId,
|
||||
double timeout = 120.0) {
|
||||
|
@ -404,7 +377,7 @@ arangodb::Result cancelBarrier(
|
|||
arangodb::Result SynchronizeShard::getReadLock(
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& collection, std::string const& clientId,
|
||||
uint64_t rlid, double timeout) {
|
||||
uint64_t rlid, bool soft, double timeout) {
|
||||
|
||||
auto cc = arangodb::ClusterComm::instance();
|
||||
if (cc == nullptr) { // nullptr only happens during controlled shutdown
|
||||
|
@ -416,7 +389,9 @@ arangodb::Result SynchronizeShard::getReadLock(
|
|||
{ VPackObjectBuilder o(&body);
|
||||
body.add(ID, VPackValue(std::to_string(rlid)));
|
||||
body.add(COLLECTION, VPackValue(collection));
|
||||
body.add(TTL, VPackValue(timeout)); }
|
||||
body.add(TTL, VPackValue(timeout));
|
||||
body.add(StaticStrings::ReplicationSoftLockOnly, VPackValue(soft));
|
||||
}
|
||||
|
||||
auto url = DB + database + REPL_HOLD_READ_LOCK;
|
||||
|
||||
|
@ -430,9 +405,10 @@ arangodb::Result SynchronizeShard::getReadLock(
|
|||
// we must make sure that the read lock on the leader is not active!
|
||||
// This is done automatically below.
|
||||
|
||||
double sleepTime = 0.5;
|
||||
size_t count = 0;
|
||||
while (++count < 20) { // wait for some time until read lock established:
|
||||
|
||||
size_t maxTries = static_cast<size_t>(std::floor(600.0 / sleepTime));
|
||||
while (++count < maxTries) { // wait for some time until read lock established:
|
||||
// Now check that we hold the read lock:
|
||||
auto putres = cc->syncRequest(
|
||||
clientId, 1, endpoint, rest::RequestType::PUT, url, body.toJson(),
|
||||
|
@ -455,7 +431,7 @@ arangodb::Result SynchronizeShard::getReadLock(
|
|||
<< putres->stringifyErrorMessage();
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(duration<double>(.5));
|
||||
std::this_thread::sleep_for(duration<double>(sleepTime));
|
||||
}
|
||||
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "startReadLockOnLeader: giving up";
|
||||
|
@ -478,14 +454,14 @@ arangodb::Result SynchronizeShard::getReadLock(
|
|||
|
||||
}
|
||||
|
||||
bool isStopping() {
|
||||
static inline bool isStopping() {
|
||||
return application_features::ApplicationServer::isStopping();
|
||||
}
|
||||
|
||||
arangodb::Result SynchronizeShard::startReadLockOnLeader(
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& collection, std::string const& clientId,
|
||||
uint64_t& rlid, double timeout) {
|
||||
uint64_t& rlid, bool soft, double timeout) {
|
||||
|
||||
// Read lock id
|
||||
rlid = 0;
|
||||
|
@ -498,7 +474,7 @@ arangodb::Result SynchronizeShard::startReadLockOnLeader(
|
|||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "Got read lock id: " << rlid;
|
||||
}
|
||||
|
||||
result = getReadLock(endpoint, database, collection, clientId, rlid, timeout);
|
||||
result = getReadLock(endpoint, database, collection, clientId, rlid, soft, timeout);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -508,7 +484,7 @@ enum ApplierType {
|
|||
APPLIER_GLOBAL
|
||||
};
|
||||
|
||||
arangodb::Result replicationSynchronize(
|
||||
static arangodb::Result replicationSynchronize(
|
||||
std::shared_ptr<arangodb::LogicalCollection> const &col, VPackSlice const& config,
|
||||
ApplierType applierType, std::shared_ptr<VPackBuilder> sy) {
|
||||
|
||||
|
@ -589,7 +565,47 @@ arangodb::Result replicationSynchronize(
|
|||
}
|
||||
|
||||
|
||||
arangodb::Result replicationSynchronizeFinalize(VPackSlice const& conf) {
|
||||
static arangodb::Result replicationSynchronizeCatchup(VPackSlice const& conf,
|
||||
TRI_voc_tick_t& tickReached) {
|
||||
|
||||
auto const database = conf.get(DATABASE).copyString();
|
||||
auto const collection = conf.get(COLLECTION).copyString();
|
||||
auto const leaderId = conf.get(LEADER_ID).copyString();
|
||||
auto const fromTick = conf.get("from").getNumber<uint64_t>();
|
||||
|
||||
ReplicationApplierConfiguration configuration =
|
||||
ReplicationApplierConfiguration::fromVelocyPack(conf, database);
|
||||
// will throw if invalid
|
||||
configuration.validate();
|
||||
|
||||
DatabaseGuard guard(database);
|
||||
DatabaseTailingSyncer syncer(guard.database(), configuration, fromTick, true, 0);
|
||||
|
||||
if (!leaderId.empty()) {
|
||||
syncer.setLeaderId(leaderId);
|
||||
}
|
||||
|
||||
Result r;
|
||||
try {
|
||||
r = syncer.syncCollectionCatchup(collection, tickReached);
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
r = Result(ex.code(), ex.what());
|
||||
} catch (std::exception const& ex) {
|
||||
r = Result(TRI_ERROR_INTERNAL, ex.what());
|
||||
} catch (...) {
|
||||
r = Result(TRI_ERROR_INTERNAL, "unknown exception");
|
||||
}
|
||||
|
||||
if (r.fail()) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "syncCollectionFinalize failed: " << r.errorMessage();
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
static arangodb::Result replicationSynchronizeFinalize(VPackSlice const& conf) {
|
||||
auto const database = conf.get(DATABASE).copyString();
|
||||
auto const collection = conf.get(COLLECTION).copyString();
|
||||
auto const leaderId = conf.get(LEADER_ID).copyString();
|
||||
|
@ -640,7 +656,7 @@ bool SynchronizeShard::first() {
|
|||
auto const ourselves = arangodb::ServerState::instance()->getId();
|
||||
auto startTime = system_clock::now();
|
||||
auto const startTimeStr = timepointToString(startTime);
|
||||
auto const clientId(database + planId + shard + leader);
|
||||
std::string const clientId(database + planId + shard + leader);
|
||||
|
||||
// First wait until the leader has created the shard (visible in
|
||||
// Current in the Agency) or we or the shard have vanished from
|
||||
|
@ -659,11 +675,9 @@ bool SynchronizeShard::first() {
|
|||
std::find(planned.begin(), planned.end(), ourselves) == planned.end() ||
|
||||
planned.front() != leader) {
|
||||
// Things have changed again, simply terminate:
|
||||
auto const endTime = system_clock::now();
|
||||
std::stringstream error;
|
||||
error << "cancelled, " << database << "/" << shard << ", " << database
|
||||
<< "/" << planId << ", started " << startTimeStr << ", ended "
|
||||
<< timepointToString(endTime);
|
||||
error << "cancelled, ";
|
||||
AppendShardInformationToMessage(database, shard, planId, startTime, error);
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str();
|
||||
_result.reset(TRI_ERROR_FAILED, error.str());
|
||||
return false;
|
||||
|
@ -673,14 +687,10 @@ bool SynchronizeShard::first() {
|
|||
try { // ci->getCollection can throw
|
||||
ci = clusterInfo->getCollection(database, planId);
|
||||
} catch(...) {
|
||||
auto const endTime = system_clock::now();
|
||||
std::stringstream msg;
|
||||
msg << "exception in getCollection, " << database << "/"
|
||||
<< shard << ", " << database
|
||||
<< "/" << planId << ", started " << startTimeStr << ", ended "
|
||||
<< timepointToString(endTime);
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: "
|
||||
<< msg.str();
|
||||
msg << "exception in getCollection, ";
|
||||
AppendShardInformationToMessage(database, shard, planId, startTime, msg);
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << msg.str();
|
||||
_result.reset(TRI_ERROR_FAILED, msg.str());
|
||||
return false;
|
||||
}
|
||||
|
@ -692,20 +702,16 @@ bool SynchronizeShard::first() {
|
|||
std::vector<std::string> current = cic->servers(shard);
|
||||
|
||||
if (current.empty()) {
|
||||
Result(TRI_ERROR_FAILED,
|
||||
"synchronizeOneShard: cancelled, no servers in 'Current'");
|
||||
}
|
||||
if (current.front() == leader) {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: cancelled, no servers in 'Current'";
|
||||
} else if (current.front() == leader) {
|
||||
if (std::find(current.begin(), current.end(), ourselves) == current.end()) {
|
||||
break; // start synchronization work
|
||||
}
|
||||
// We are already there, this is rather strange, but never mind:
|
||||
auto const endTime = system_clock::now();
|
||||
std::stringstream error;
|
||||
error
|
||||
<< "already done, " << database << "/" << shard
|
||||
<< ", " << database << "/" << planId << ", started "
|
||||
<< startTimeStr << ", ended " << timepointToString(endTime);
|
||||
error << "already done, ";
|
||||
AppendShardInformationToMessage(database, shard, planId, startTime, error);
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str();
|
||||
_result.reset(TRI_ERROR_FAILED, error.str());
|
||||
return false;
|
||||
|
@ -716,7 +722,6 @@ bool SynchronizeShard::first() {
|
|||
<< "/" << shard << ", " << database << "/" << planId;
|
||||
|
||||
std::this_thread::sleep_for(duration<double>(0.2));
|
||||
|
||||
}
|
||||
|
||||
// Once we get here, we know that the leader is ready for sync, so we give it a try:
|
||||
|
@ -752,16 +757,16 @@ bool SynchronizeShard::first() {
|
|||
database << "/" << shard << "' for central '" << database << "/" <<
|
||||
planId << "'";
|
||||
try {
|
||||
|
||||
auto asResult = addShardFollower(ep, database, shard, 0, clientId, 60.0);
|
||||
|
||||
if (asResult.ok()) {
|
||||
|
||||
auto const endTime = system_clock::now();
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: shortcut worked, done, " << database << "/"
|
||||
<< shard << ", " << database << "/" << planId <<", started: "
|
||||
<< startTimeStr << " ended: " << timepointToString(endTime);
|
||||
if (Logger::isEnabled(LogLevel::DEBUG, Logger::MAINTENANCE)) {
|
||||
std::stringstream msg;
|
||||
msg << "synchronizeOneShard: shortcut worked, done, ";
|
||||
AppendShardInformationToMessage(database, shard, planId, startTime, msg);
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << msg.str();
|
||||
}
|
||||
collection->followers()->setTheLeader(leader);
|
||||
notify();
|
||||
return false;
|
||||
|
@ -774,29 +779,28 @@ bool SynchronizeShard::first() {
|
|||
<< "/" << shard << "' for central '" << database << "/" << planId << "'";
|
||||
|
||||
try {
|
||||
// From here on we perform a number of steps, each of which can
|
||||
// fail. If it fails with an exception, it is caught, but this
|
||||
// should usually not happen. If it fails without an exception,
|
||||
// we log and use return.
|
||||
|
||||
Result res; // used multiple times for intermediate results
|
||||
|
||||
// First once without a read transaction:
|
||||
|
||||
if (isStopping()) {
|
||||
_result.reset(TRI_ERROR_INTERNAL, "server is shutting down");
|
||||
std::string errorMessage(
|
||||
"synchronizeOneShard: synchronization failed for shard ");
|
||||
errorMessage += shard + ": shutdown in progress, giving up";
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE) << errorMessage;
|
||||
_result.reset(TRI_ERROR_INTERNAL, errorMessage);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Mark us as follower for this leader such that we begin
|
||||
// accepting replication operations, note that this is also
|
||||
// used for the initial synchronization:
|
||||
|
||||
// This is necessary to accept replications from the leader which can
|
||||
// happen as soon as we are in sync.
|
||||
collection->followers()->setTheLeader(leader);
|
||||
|
||||
if (leader.empty()) {
|
||||
collection->followers()->clear();
|
||||
}
|
||||
|
||||
// do not reset followers when we resign at this time...we are
|
||||
// still the only source of truth to trust, in particular, in the
|
||||
// planned leader resignation, we will shortly after the call to
|
||||
// this function here report the controlled resignation to the
|
||||
// agency. This report must still contain the correct follower list
|
||||
// or else the supervision is super angry with us.
|
||||
|
||||
startTime = system_clock::now();
|
||||
|
||||
VPackBuilder config;
|
||||
|
@ -815,143 +819,86 @@ bool SynchronizeShard::first() {
|
|||
|
||||
auto details = std::make_shared<VPackBuilder>();
|
||||
|
||||
Result syncRes = replicationSynchronize(
|
||||
res = replicationSynchronize(
|
||||
collection, config.slice(), APPLIER_DATABASE, details);
|
||||
|
||||
auto sy = details->slice();
|
||||
auto const endTime = system_clock::now();
|
||||
bool longSync = false;
|
||||
|
||||
// Long shard sync initialisation
|
||||
if (endTime-startTime > seconds(5)) {
|
||||
LOG_TOPIC(WARN, Logger::MAINTENANCE)
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: long call to syncCollection for shard"
|
||||
<< shard << " " << syncRes.errorMessage() << " start time: "
|
||||
<< timepointToString(startTime) << "end time: "
|
||||
<< shard << " " << res.errorMessage() << " start time: "
|
||||
<< timepointToString(startTime) << ", end time: "
|
||||
<< timepointToString(system_clock::now());
|
||||
longSync = true;
|
||||
}
|
||||
|
||||
//
|
||||
if (!syncRes.ok()) {
|
||||
// If this did not work, then we cannot go on:
|
||||
if (!res.ok()) {
|
||||
|
||||
std::stringstream error;
|
||||
error << "could not initially synchronize shard " << shard << ": "
|
||||
<< syncRes.errorMessage();
|
||||
<< res.errorMessage();
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str();
|
||||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
return false;
|
||||
|
||||
} else {
|
||||
|
||||
VPackSlice collections = sy.get(COLLECTIONS);
|
||||
|
||||
if (collections.length() == 0 ||
|
||||
collections[0].get("name").copyString() != shard) {
|
||||
|
||||
if (longSync) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: long sync, before cancelBarrier"
|
||||
<< timepointToString(system_clock::now());
|
||||
}
|
||||
cancelBarrier(ep, database, sy.get(BARRIER_ID).getNumber<int64_t>(), clientId);
|
||||
if (longSync) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: long sync, after cancelBarrier"
|
||||
<< timepointToString(system_clock::now());
|
||||
}
|
||||
|
||||
std::stringstream error;
|
||||
error << "shard " << shard << " seems to be gone from leader!";
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str();
|
||||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
return false;
|
||||
|
||||
} else {
|
||||
|
||||
// Now start a read transaction to stop writes:
|
||||
uint64_t lockJobId = 0;
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: startReadLockOnLeader: " << ep << ":"
|
||||
<< database << ":" << collection->name();
|
||||
Result result = startReadLockOnLeader(
|
||||
ep, database, collection->name(), clientId, lockJobId);
|
||||
if (result.ok()) {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: error in startReadLockOnLeader:"
|
||||
<< result.errorMessage();
|
||||
}
|
||||
|
||||
cancelBarrier(ep, database, sy.get("barrierId").getNumber<int64_t>(), clientId);
|
||||
|
||||
if (lockJobId != 0) {
|
||||
|
||||
VPackBuilder builder;
|
||||
{ VPackObjectBuilder o(&builder);
|
||||
builder.add(ENDPOINT, VPackValue(ep));
|
||||
builder.add(DATABASE, VPackValue(database));
|
||||
builder.add(COLLECTION, VPackValue(shard));
|
||||
builder.add(LEADER_ID, VPackValue(leader));
|
||||
builder.add("from", sy.get(LAST_LOG_TICK));
|
||||
builder.add("requestTimeout", VPackValue(60.0));
|
||||
builder.add("connectTimeout", VPackValue(60.0));
|
||||
}
|
||||
|
||||
Result fres = replicationSynchronizeFinalize (builder.slice());
|
||||
|
||||
if (fres.ok()) {
|
||||
result = addShardFollower(ep, database, shard, lockJobId, clientId, 60.0);
|
||||
|
||||
if (!result.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: failed to add follower"
|
||||
<< result.errorMessage();
|
||||
}
|
||||
} else {
|
||||
std::string errorMessage(
|
||||
"synchronizeOneshard: error in syncCollectionFinalize: ") ;
|
||||
errorMessage += fres.errorMessage();
|
||||
result = Result(TRI_ERROR_INTERNAL, errorMessage);
|
||||
}
|
||||
|
||||
// This result is unused, only in logs
|
||||
Result lockResult = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0);
|
||||
if (!lockResult.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: read lock has timed out for shard " << shard;
|
||||
}
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: lockJobId was false for shard" << shard;
|
||||
}
|
||||
|
||||
if (result.ok()) {
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: synchronization worked for shard " << shard;
|
||||
_result.reset(TRI_ERROR_NO_ERROR);
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: synchronization failed for shard " << shard;
|
||||
std::string errorMessage(
|
||||
"synchronizeOneShard: synchronization failed for shard "
|
||||
+ shard + ":" + result.errorMessage());
|
||||
_result = Result(TRI_ERROR_INTERNAL, errorMessage);;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// From here on, we have to call `cancelBarrier` in case of errors
|
||||
// as well as in the success case!
|
||||
int64_t barrierId = sy.get(BARRIER_ID).getNumber<int64_t>();
|
||||
TRI_DEFER(cancelBarrier(ep, database, barrierId, clientId));
|
||||
|
||||
VPackSlice collections = sy.get(COLLECTIONS);
|
||||
|
||||
if (collections.length() == 0 ||
|
||||
collections[0].get("name").copyString() != shard) {
|
||||
|
||||
std::stringstream error;
|
||||
error << "shard " << shard << " seems to be gone from leader, this "
|
||||
"can happen if a collection was dropped during synchronization!";
|
||||
LOG_TOPIC(WARN, Logger::MAINTENANCE) << "SynchronizeOneShard: "
|
||||
<< error.str();
|
||||
_result.reset(TRI_ERROR_INTERNAL, error.str());
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
auto lastTick = arangodb::basics::VelocyPackHelper::readNumericValue<TRI_voc_tick_t>(sy, LAST_LOG_TICK, 0);
|
||||
VPackBuilder builder;
|
||||
|
||||
ResultT<TRI_voc_tick_t> tickResult = catchupWithReadLock(ep, database, *collection, clientId,
|
||||
shard, leader, lastTick, builder);
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE) << res.errorMessage();
|
||||
_result.reset(tickResult);
|
||||
return false;
|
||||
}
|
||||
lastTick = tickResult.get();
|
||||
|
||||
// Now start a exclusive transaction to stop writes:
|
||||
res = catchupWithExclusiveLock(ep, database, *collection, clientId,
|
||||
shard, leader, lastTick, builder);
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE) << res.errorMessage();
|
||||
_result.reset(res);
|
||||
return false;
|
||||
}
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
auto const endTime = system_clock::now();
|
||||
std::stringstream error;
|
||||
error << "synchronization of local shard '" << database << "/" << shard
|
||||
<< "' for central '" << database << "/" << planId << "' failed: "
|
||||
<< e.what() << timepointToString(endTime);
|
||||
error << "synchronization of";
|
||||
AppendShardInformationToMessage(database, shard, planId, startTime, error);
|
||||
error << " failed: " << e.what();
|
||||
LOG_TOPIC(ERR, Logger::MAINTENANCE) << error.str();
|
||||
_result.reset(TRI_ERROR_INTERNAL, e.what());
|
||||
return false;
|
||||
}
|
||||
// Validate that HARDLOCK only works!
|
||||
} catch (std::exception const& e) {
|
||||
// This catches the case that we could not even find the collection
|
||||
// locally, because the DatabaseGuard constructor threw.
|
||||
LOG_TOPIC(WARN, Logger::MAINTENANCE)
|
||||
<< "action " << _description << " failed with exception " << e.what();
|
||||
_result.reset(TRI_ERROR_INTERNAL, e.what());
|
||||
|
@ -959,13 +906,169 @@ bool SynchronizeShard::first() {
|
|||
}
|
||||
|
||||
// Tell others that we are done:
|
||||
auto const endTime = system_clock::now();
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: done, " << database << "/" << shard << ", "
|
||||
<< database << "/" << planId << ", started: "
|
||||
<< timepointToString(startTime) << ", ended: " << timepointToString(endTime);
|
||||
|
||||
if (Logger::isEnabled(LogLevel::INFO, Logger::MAINTENANCE)) {
|
||||
// This wrap is just to not write the stream if not needed.
|
||||
std::stringstream msg;
|
||||
AppendShardInformationToMessage(database, shard, planId, startTime, msg);
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: done, " << msg.str();
|
||||
}
|
||||
notify();
|
||||
return false;;
|
||||
return false;
|
||||
|
||||
}
|
||||
ResultT<TRI_voc_tick_t> SynchronizeShard::catchupWithReadLock(
|
||||
std::string const& ep,
|
||||
std::string const& database,
|
||||
LogicalCollection const& collection,
|
||||
std::string const& clientId,
|
||||
std::string const& shard,
|
||||
std::string const& leader,
|
||||
TRI_voc_tick_t lastLogTick,
|
||||
VPackBuilder& builder) {
|
||||
// Now ask for a "soft stop" on the leader, in case of mmfiles, this
|
||||
// will be a hard stop, but for rocksdb, this is a no-op:
|
||||
uint64_t lockJobId = 0;
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: startReadLockOnLeader (soft): " << ep << ":"
|
||||
<< database << ":" << collection.name();
|
||||
Result res = startReadLockOnLeader(
|
||||
ep, database, collection.name(), clientId, lockJobId, true);
|
||||
if (!res.ok()) {
|
||||
std::string errorMessage =
|
||||
"synchronizeOneShard: error in startReadLockOnLeader (soft):"
|
||||
+ res.errorMessage();
|
||||
return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_INTERNAL, errorMessage);
|
||||
}
|
||||
auto readLockGuard = arangodb::scopeGuard([&]() {
|
||||
// Always cancel the read lock.
|
||||
// Reported seperately
|
||||
auto res = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0);
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "Could not cancel soft read lock on leader: "
|
||||
<< res.errorMessage();
|
||||
}
|
||||
});
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId;
|
||||
|
||||
// From now on, we need to cancel the read lock on the leader regardless
|
||||
// if things go wrong or right!
|
||||
|
||||
// Do a first try of a catch up with the WAL. In case of RocksDB,
|
||||
// this has not yet stopped the writes, so we have to be content
|
||||
// with nearly reaching the end of the WAL, which is a "soft" catchup.
|
||||
builder.clear();
|
||||
{ VPackObjectBuilder o(&builder);
|
||||
builder.add(ENDPOINT, VPackValue(ep));
|
||||
builder.add(DATABASE, VPackValue(database));
|
||||
builder.add(COLLECTION, VPackValue(shard));
|
||||
builder.add(LEADER_ID, VPackValue(leader));
|
||||
builder.add("from", VPackValue(lastLogTick));
|
||||
builder.add("requestTimeout", VPackValue(600.0));
|
||||
builder.add("connectTimeout", VPackValue(60.0));
|
||||
}
|
||||
|
||||
TRI_voc_tick_t tickReached = 0;
|
||||
res = replicationSynchronizeCatchup(builder.slice(), tickReached);
|
||||
|
||||
if (!res.ok()) {
|
||||
std::string errorMessage(
|
||||
"synchronizeOneshard: error in syncCollectionCatchup: ") ;
|
||||
errorMessage += res.errorMessage();
|
||||
return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_INTERNAL, errorMessage);
|
||||
}
|
||||
|
||||
// Stop the read lock again:
|
||||
res = cancelReadLockOnLeader(ep, database, lockJobId,
|
||||
clientId, 60.0);
|
||||
// We removed the readlock
|
||||
readLockGuard.cancel();
|
||||
if (!res.ok()) {
|
||||
std::string errorMessage
|
||||
= "synchronizeOneShard: error when cancelling soft read lock: "
|
||||
+ res.errorMessage();
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE) << errorMessage;
|
||||
_result.reset(TRI_ERROR_INTERNAL, errorMessage);
|
||||
return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_INTERNAL, errorMessage);
|
||||
}
|
||||
return ResultT<TRI_voc_tick_t>::success(tickReached);
|
||||
}
|
||||
|
||||
Result SynchronizeShard::catchupWithExclusiveLock(std::string const& ep,
|
||||
std::string const& database,
|
||||
LogicalCollection const& collection,
|
||||
std::string const& clientId,
|
||||
std::string const& shard,
|
||||
std::string const& leader,
|
||||
TRI_voc_tick_t lastLogTick,
|
||||
VPackBuilder& builder) {
|
||||
uint64_t lockJobId = 0;
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: startReadLockOnLeader: " << ep << ":"
|
||||
<< database << ":" << collection.name();
|
||||
Result res = startReadLockOnLeader(
|
||||
ep, database, collection.name(), clientId, lockJobId, false);
|
||||
if (!res.ok()) {
|
||||
std::string errorMessage =
|
||||
"synchronizeOneShard: error in startReadLockOnLeader (hard):"
|
||||
+ res.errorMessage();
|
||||
return {TRI_ERROR_INTERNAL, errorMessage};
|
||||
}
|
||||
auto readLockGuard = arangodb::scopeGuard([&]() {
|
||||
// Always cancel the read lock.
|
||||
// Reported seperately
|
||||
auto res = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0);
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "Could not cancel hard read lock on leader: "
|
||||
<< res.errorMessage();
|
||||
}
|
||||
});
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId;
|
||||
|
||||
builder.clear();
|
||||
{ VPackObjectBuilder o(&builder);
|
||||
builder.add(ENDPOINT, VPackValue(ep));
|
||||
builder.add(DATABASE, VPackValue(database));
|
||||
builder.add(COLLECTION, VPackValue(shard));
|
||||
builder.add(LEADER_ID, VPackValue(leader));
|
||||
builder.add("from", VPackValue(lastLogTick));
|
||||
builder.add("requestTimeout", VPackValue(600.0));
|
||||
builder.add("connectTimeout", VPackValue(60.0));
|
||||
}
|
||||
|
||||
res = replicationSynchronizeFinalize(builder.slice());
|
||||
|
||||
if (!res.ok()) {
|
||||
std::string errorMessage(
|
||||
"synchronizeOneshard: error in syncCollectionFinalize: ") ;
|
||||
errorMessage += res.errorMessage();
|
||||
return {TRI_ERROR_INTERNAL, errorMessage};
|
||||
}
|
||||
|
||||
res = addShardFollower(ep, database, shard, lockJobId, clientId, 60.0);
|
||||
|
||||
if (!res.ok()) {
|
||||
std::string errorMessage(
|
||||
"synchronizeOneshard: error in addShardFollower: ") ;
|
||||
errorMessage += res.errorMessage();
|
||||
return {TRI_ERROR_INTERNAL, errorMessage};
|
||||
}
|
||||
|
||||
// This result is unused, only in logs
|
||||
res = cancelReadLockOnLeader(ep, database, lockJobId, clientId, 60.0);
|
||||
readLockGuard.cancel();
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(INFO, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: read lock has timed out for shard " << shard;
|
||||
}
|
||||
|
||||
// Report success:
|
||||
LOG_TOPIC(DEBUG, Logger::MAINTENANCE)
|
||||
<< "synchronizeOneShard: synchronization worked for shard " << shard;
|
||||
_result.reset(TRI_ERROR_NO_ERROR);
|
||||
return {TRI_ERROR_NO_ERROR};
|
||||
}
|
||||
|
|
|
@ -27,11 +27,15 @@
|
|||
|
||||
#include "ActionBase.h"
|
||||
#include "ActionDescription.h"
|
||||
#include "Cluster/ResultT.h"
|
||||
#include "VocBase/voc-types.h"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class LogicalCollection;
|
||||
|
||||
class MaintenanceAction;
|
||||
|
||||
namespace maintenance {
|
||||
|
@ -50,13 +54,31 @@ private:
|
|||
arangodb::Result getReadLock(
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& collection, std::string const& clientId, uint64_t rlid,
|
||||
double timeout = 300.0);
|
||||
bool soft, double timeout = 300.0);
|
||||
|
||||
arangodb::Result startReadLockOnLeader(
|
||||
std::string const& endpoint, std::string const& database,
|
||||
std::string const& collection, std::string const& clientId, uint64_t& rlid,
|
||||
double timeout = 300.0);
|
||||
bool soft, double timeout = 300.0);
|
||||
|
||||
|
||||
arangodb::ResultT<TRI_voc_tick_t> catchupWithReadLock(std::string const& ep,
|
||||
std::string const& database,
|
||||
LogicalCollection const& collection,
|
||||
std::string const& clientId,
|
||||
std::string const& shard,
|
||||
std::string const& leader,
|
||||
TRI_voc_tick_t lastLogTick,
|
||||
VPackBuilder& builder);
|
||||
|
||||
arangodb::Result catchupWithExclusiveLock(std::string const& ep,
|
||||
std::string const& database,
|
||||
LogicalCollection const& collection,
|
||||
std::string const& clientId,
|
||||
std::string const& shard,
|
||||
std::string const& leader,
|
||||
TRI_voc_tick_t lastLogTick,
|
||||
VPackBuilder& builder);
|
||||
};
|
||||
|
||||
}}
|
||||
|
|
|
@ -101,8 +101,9 @@ Result DatabaseTailingSyncer::saveApplierState() {
|
|||
|
||||
/// @brief finalize the synchronization of a collection by tailing the WAL
|
||||
/// and filtering on the collection name until no more data is available
|
||||
Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||
std::string const& collectionName) {
|
||||
Result DatabaseTailingSyncer::syncCollectionCatchupInternal(
|
||||
std::string const& collectionName, bool hard, TRI_voc_tick_t& until) {
|
||||
|
||||
setAborted(false);
|
||||
// fetch master state just once
|
||||
Result r = _state.master.getState(_state.connection, _state.isChildSyncer);
|
||||
|
@ -117,9 +118,19 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
|||
|
||||
TRI_voc_tick_t fromTick = _initialTick;
|
||||
TRI_voc_tick_t lastScannedTick = fromTick;
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
<< "starting syncCollectionFinalize:" << collectionName << ", fromTick "
|
||||
<< fromTick;
|
||||
|
||||
if (hard) {
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
<< "starting syncCollectionFinalize:" << collectionName << ", fromTick "
|
||||
<< fromTick;
|
||||
} else {
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
<< "starting syncCollectionCatchup:" << collectionName << ", fromTick "
|
||||
<< fromTick;
|
||||
}
|
||||
|
||||
auto clock = std::chrono::steady_clock();
|
||||
auto startTime = clock.now();
|
||||
|
||||
while (true) {
|
||||
if (application_features::ApplicationServer::isStopping()) {
|
||||
|
@ -141,11 +152,13 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
|||
});
|
||||
|
||||
if (replutils::hasFailed(response.get())) {
|
||||
until = fromTick;
|
||||
return replutils::buildHttpError(response.get(), url, _state.connection);
|
||||
}
|
||||
|
||||
if (response->getHttpReturnCode() == 204) {
|
||||
// HTTP 204 No content: this means we are done
|
||||
until = fromTick;
|
||||
return Result();
|
||||
}
|
||||
|
||||
|
@ -166,6 +179,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
|||
header = response->getHeaderField(
|
||||
StaticStrings::ReplicationHeaderLastIncluded, found);
|
||||
if (!found) {
|
||||
until = fromTick;
|
||||
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
|
||||
std::string("got invalid response from master at ") +
|
||||
_state.master.endpoint + ": required header " +
|
||||
|
@ -182,6 +196,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
|||
fromIncluded = StringUtils::boolean(header);
|
||||
}
|
||||
if (!fromIncluded && fromTick > 0) {
|
||||
until = fromTick;
|
||||
return Result(
|
||||
TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT,
|
||||
std::string("required follow tick value '") +
|
||||
|
@ -198,6 +213,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
|||
Result r =
|
||||
applyLog(response.get(), fromTick, processedMarkers, ignoreCount);
|
||||
if (r.fail()) {
|
||||
until = fromTick;
|
||||
return r;
|
||||
}
|
||||
|
||||
|
@ -213,8 +229,28 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
|||
<< "this indicates we're at the end";
|
||||
}
|
||||
|
||||
// If this is non-hard, we employ some heuristics to stop early:
|
||||
if (!hard) {
|
||||
if (clock.now() - startTime > std::chrono::seconds(1) && _ongoingTransactions.empty()) {
|
||||
checkMore = false;
|
||||
} else {
|
||||
TRI_voc_tick_t lastTick = 0;
|
||||
header = response->getHeaderField(
|
||||
StaticStrings::ReplicationHeaderLastTick, found);
|
||||
if (found) {
|
||||
lastTick = StringUtils::uint64(header);
|
||||
if (_ongoingTransactions.empty() &&
|
||||
lastTick > lastIncludedTick && // just to make sure!
|
||||
lastTick - lastIncludedTick < 1000) {
|
||||
checkMore = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!checkMore) {
|
||||
// done!
|
||||
until = fromTick;
|
||||
return Result();
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
|
|
|
@ -50,10 +50,30 @@ class DatabaseTailingSyncer final : public TailingSyncer {
|
|||
|
||||
/// @brief finalize the synchronization of a collection by tailing the WAL
|
||||
/// and filtering on the collection name until no more data is available
|
||||
Result syncCollectionFinalize(std::string const& collectionName);
|
||||
Result syncCollectionFinalize(std::string const& collectionName) {
|
||||
TRI_voc_tick_t dummy = 0;
|
||||
return syncCollectionCatchupInternal(collectionName, true, dummy);
|
||||
}
|
||||
|
||||
/// @brief catch up with changes in a leader shard by doing the same
|
||||
/// as in syncCollectionFinalize, but potentially stopping earlier.
|
||||
/// This function will use some heuristics to stop early, when most
|
||||
/// of the catching up is already done. In this case, the replication
|
||||
/// will end and store the tick to which it got in the argument `until`.
|
||||
/// The idea is that one can use `syncCollectionCatchup` without stopping
|
||||
/// writes on the leader to catch up mostly. Then one can stop writes
|
||||
/// by getting an exclusive lock on the leader and use
|
||||
/// `syncCollectionFinalize` to finish off the rest.
|
||||
/// Internally, both use `syncCollectionCatchupInternal`.
|
||||
Result syncCollectionCatchup(std::string const& collectionName,
|
||||
TRI_voc_tick_t& until) {
|
||||
return syncCollectionCatchupInternal(collectionName, false, until);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
Result syncCollectionCatchupInternal(std::string const& collectionName,
|
||||
bool hard, TRI_voc_tick_t& until);
|
||||
/// @brief save the current applier state
|
||||
Result saveApplierState() override;
|
||||
|
||||
|
|
|
@ -2143,7 +2143,7 @@ void RestReplicationHandler::handleCommandAddFollower() {
|
|||
if (!checksumSlice.isEqualString(referenceChecksum.get())) {
|
||||
const std::string checksum = checksumSlice.copyString();
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << "Cannot add follower, mismatching checksums. "
|
||||
<< "Expected: " << referenceChecksum << " Actual: " << checksum;
|
||||
<< "Expected: " << referenceChecksum.get() << " Actual: " << checksum;
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_REPLICATION_WRONG_CHECKSUM,
|
||||
"'checksum' is wrong. Expected: "
|
||||
+ referenceChecksum.get()
|
||||
|
|
|
@ -744,6 +744,11 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
|
|||
// pre 3.4 breaking up write batches is not supported
|
||||
size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX;
|
||||
|
||||
/*
|
||||
maxTrxChunkSize = 16 * 16 * 16;
|
||||
chunkSize = 16 * 16 * 16;
|
||||
*/
|
||||
|
||||
MyWALDumper dumper(filter, func, maxTrxChunkSize);
|
||||
const uint64_t since = dumper.safeBeginTick();
|
||||
TRI_ASSERT(since <= filter.tickStart);
|
||||
|
@ -790,6 +795,7 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
|
|||
|
||||
//LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence;
|
||||
lastScannedTick = batch.sequence; // start of the batch
|
||||
|
||||
if (batch.sequence < since) {
|
||||
iterator->Next(); // skip
|
||||
continue;
|
||||
|
|
|
@ -189,6 +189,9 @@ std::string const StaticStrings::GraphDropCollection("dropCollection");
|
|||
std::string const StaticStrings::GraphCreateCollections("createCollections");
|
||||
std::string const StaticStrings::GraphCreateCollection("createCollection");
|
||||
|
||||
// Replication
|
||||
std::string const StaticStrings::ReplicationSoftLockOnly("doSoftLockOnly");
|
||||
|
||||
// misc strings
|
||||
std::string const StaticStrings::LastValue("lastValue");
|
||||
std::string const StaticStrings::checksumFileJs("JS_SHA1SUM.txt");
|
||||
|
|
|
@ -171,6 +171,9 @@ class StaticStrings {
|
|||
static std::string const GraphInitialCid;
|
||||
static std::string const GraphName;
|
||||
|
||||
// Replication
|
||||
static std::string const ReplicationSoftLockOnly;
|
||||
|
||||
// misc strings
|
||||
static std::string const LastValue;
|
||||
static std::string const checksumFileJs;
|
||||
|
|
|
@ -201,6 +201,7 @@ start() {
|
|||
--javascript.module-directory $SRC_DIR/enterprise/js \
|
||||
--javascript.app-path cluster/apps$PORT \
|
||||
--log.force-direct true \
|
||||
--log.thread true \
|
||||
--log.level $LOG_LEVEL_CLUSTER \
|
||||
--javascript.allow-admin-execute true \
|
||||
$STORAGE_ENGINE \
|
||||
|
|
Loading…
Reference in New Issue