mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into engine-api
This commit is contained in:
commit
37b73f7377
|
@ -3517,6 +3517,11 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
CONDITION_LOCKER(locker, _condVar);
|
||||||
|
_holdReadLockJobs.emplace(id, false);
|
||||||
|
}
|
||||||
|
|
||||||
auto trxContext = StandaloneTransactionContext::Create(_vocbase);
|
auto trxContext = StandaloneTransactionContext::Create(_vocbase);
|
||||||
SingleCollectionTransaction trx(trxContext, col->cid(), TRI_TRANSACTION_READ);
|
SingleCollectionTransaction trx(trxContext, col->cid(), TRI_TRANSACTION_READ);
|
||||||
trx.addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false);
|
trx.addHint(TRI_TRANSACTION_HINT_LOCK_ENTIRELY, false);
|
||||||
|
@ -3530,7 +3535,16 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
|
||||||
|
|
||||||
{
|
{
|
||||||
CONDITION_LOCKER(locker, _condVar);
|
CONDITION_LOCKER(locker, _condVar);
|
||||||
_holdReadLockJobs.insert(id);
|
auto it = _holdReadLockJobs.find(id);
|
||||||
|
if (it == _holdReadLockJobs.end()) {
|
||||||
|
// Entry has been removed since, so we cancel the whole thing
|
||||||
|
// right away and generate an error:
|
||||||
|
generateError(rest::ResponseCode::SERVER_ERROR,
|
||||||
|
TRI_ERROR_TRANSACTION_INTERNAL,
|
||||||
|
"read transaction was cancelled");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
it->second = true; // mark the read lock as acquired
|
||||||
}
|
}
|
||||||
|
|
||||||
double now = TRI_microtime();
|
double now = TRI_microtime();
|
||||||
|
@ -3588,6 +3602,8 @@ void RestReplicationHandler::handleCommandCheckHoldReadLockCollection() {
|
||||||
}
|
}
|
||||||
std::string id = idSlice.copyString();
|
std::string id = idSlice.copyString();
|
||||||
|
|
||||||
|
bool lockHeld = false;
|
||||||
|
|
||||||
{
|
{
|
||||||
CONDITION_LOCKER(locker, _condVar);
|
CONDITION_LOCKER(locker, _condVar);
|
||||||
auto it = _holdReadLockJobs.find(id);
|
auto it = _holdReadLockJobs.find(id);
|
||||||
|
@ -3596,12 +3612,17 @@ void RestReplicationHandler::handleCommandCheckHoldReadLockCollection() {
|
||||||
"no hold read lock job found for 'id'");
|
"no hold read lock job found for 'id'");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (it->second) {
|
||||||
|
lockHeld = true;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
VPackBuilder b;
|
VPackBuilder b;
|
||||||
{
|
{
|
||||||
VPackObjectBuilder bb(&b);
|
VPackObjectBuilder bb(&b);
|
||||||
b.add("error", VPackValue(false));
|
b.add("error", VPackValue(false));
|
||||||
|
b.add("lockHeld", VPackValue(lockHeld));
|
||||||
}
|
}
|
||||||
|
|
||||||
generateResult(rest::ResponseCode::OK, b.slice());
|
generateResult(rest::ResponseCode::OK, b.slice());
|
||||||
|
@ -3633,11 +3654,19 @@ void RestReplicationHandler::handleCommandCancelHoldReadLockCollection() {
|
||||||
}
|
}
|
||||||
std::string id = idSlice.copyString();
|
std::string id = idSlice.copyString();
|
||||||
|
|
||||||
|
bool lockHeld = false;
|
||||||
{
|
{
|
||||||
CONDITION_LOCKER(locker, _condVar);
|
CONDITION_LOCKER(locker, _condVar);
|
||||||
auto it = _holdReadLockJobs.find(id);
|
auto it = _holdReadLockJobs.find(id);
|
||||||
if (it != _holdReadLockJobs.end()) {
|
if (it != _holdReadLockJobs.end()) {
|
||||||
|
// Note that this approach works if the lock has been acquired
|
||||||
|
// as well as if we still wait for the read lock, in which case
|
||||||
|
// it will eventually be acquired but immediately released:
|
||||||
|
if (it->second) {
|
||||||
|
lockHeld = true;
|
||||||
|
}
|
||||||
_holdReadLockJobs.erase(it);
|
_holdReadLockJobs.erase(it);
|
||||||
|
_condVar.broadcast();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3645,6 +3674,7 @@ void RestReplicationHandler::handleCommandCancelHoldReadLockCollection() {
|
||||||
{
|
{
|
||||||
VPackObjectBuilder bb(&b);
|
VPackObjectBuilder bb(&b);
|
||||||
b.add("error", VPackValue(false));
|
b.add("error", VPackValue(false));
|
||||||
|
b.add("lockHeld", VPackValue(lockHeld));
|
||||||
}
|
}
|
||||||
|
|
||||||
generateResult(rest::ResponseCode::OK, b.slice());
|
generateResult(rest::ResponseCode::OK, b.slice());
|
||||||
|
@ -3677,4 +3707,4 @@ arangodb::basics::ConditionVariable RestReplicationHandler::_condVar;
|
||||||
/// the flag is set of the ID of a job, the job is cancelled
|
/// the flag is set of the ID of a job, the job is cancelled
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
std::unordered_set<std::string> RestReplicationHandler::_holdReadLockJobs;
|
std::unordered_map<std::string, bool> RestReplicationHandler::_holdReadLockJobs;
|
||||||
|
|
|
@ -366,13 +366,18 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
|
||||||
static arangodb::basics::ConditionVariable _condVar;
|
static arangodb::basics::ConditionVariable _condVar;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief global set of ids of holdReadLockCollection jobs, if
|
/// @brief global set of ids of holdReadLockCollection jobs, an
|
||||||
/// an id is removed here (under the protection of the mutex of
|
/// id mapping to false here indicates that a request to get the
|
||||||
/// condVar) and a broadcast is sent, the job with that id is
|
/// read lock has been started, the bool is changed to true once
|
||||||
/// terminated.
|
/// this read lock is acquired. To cancel the read lock, remove
|
||||||
|
/// the entry here (under the protection of the mutex of
|
||||||
|
/// condVar) and send a broadcast to the condition variable,
|
||||||
|
/// the job with that id is terminated. If it timeouts, then
|
||||||
|
/// the read lock is released automatically and the entry here
|
||||||
|
/// is deleted.
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
static std::unordered_set<std::string> _holdReadLockJobs;
|
static std::unordered_map<std::string, bool> _holdReadLockJobs;
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -1824,6 +1824,25 @@ OperationResult Transaction::insertCoordinator(std::string const& collectionName
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief choose a timeout for synchronous replication, based on the
|
||||||
|
/// number of documents we ship over
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
static double chooseTimeout(size_t count) {
|
||||||
|
// We usually assume that a server can process at least 5000 documents
|
||||||
|
// per second (this is a low estimate), and use a low limit of 0.5s
|
||||||
|
// and a high timeout of 120s
|
||||||
|
double timeout = count / 5000;
|
||||||
|
if (timeout < 0.5) {
|
||||||
|
return 0.5;
|
||||||
|
} else if (timeout > 120) {
|
||||||
|
return 120.0;
|
||||||
|
} else {
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief create one or multiple documents in a collection, local
|
/// @brief create one or multiple documents in a collection, local
|
||||||
/// the single-document variant of this operation will either succeed or,
|
/// the single-document variant of this operation will either succeed or,
|
||||||
|
@ -1981,7 +2000,7 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
|
||||||
}
|
}
|
||||||
auto cc = arangodb::ClusterComm::instance();
|
auto cc = arangodb::ClusterComm::instance();
|
||||||
size_t nrDone = 0;
|
size_t nrDone = 0;
|
||||||
size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT,
|
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
|
||||||
nrDone, Logger::REPLICATION);
|
nrDone, Logger::REPLICATION);
|
||||||
if (nrGood < followers->size()) {
|
if (nrGood < followers->size()) {
|
||||||
// we drop all followers that were not successful:
|
// we drop all followers that were not successful:
|
||||||
|
@ -2275,6 +2294,7 @@ OperationResult Transaction::modifyLocal(
|
||||||
};
|
};
|
||||||
|
|
||||||
VPackSlice ourResult = resultBuilder.slice();
|
VPackSlice ourResult = resultBuilder.slice();
|
||||||
|
size_t count = 0;
|
||||||
if (multiCase) {
|
if (multiCase) {
|
||||||
VPackArrayBuilder guard(&payload);
|
VPackArrayBuilder guard(&payload);
|
||||||
VPackArrayIterator itValue(newValue);
|
VPackArrayIterator itValue(newValue);
|
||||||
|
@ -2283,6 +2303,7 @@ OperationResult Transaction::modifyLocal(
|
||||||
TRI_ASSERT((*itResult).isObject());
|
TRI_ASSERT((*itResult).isObject());
|
||||||
if (!(*itResult).hasKey("error")) {
|
if (!(*itResult).hasKey("error")) {
|
||||||
doOneDoc(itValue.value(), itResult.value());
|
doOneDoc(itValue.value(), itResult.value());
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
itValue.next();
|
itValue.next();
|
||||||
itResult.next();
|
itResult.next();
|
||||||
|
@ -2290,43 +2311,46 @@ OperationResult Transaction::modifyLocal(
|
||||||
} else {
|
} else {
|
||||||
VPackArrayBuilder guard(&payload);
|
VPackArrayBuilder guard(&payload);
|
||||||
doOneDoc(newValue, ourResult);
|
doOneDoc(newValue, ourResult);
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
auto body = std::make_shared<std::string>();
|
if (count > 0) {
|
||||||
*body = payload.slice().toJson();
|
auto body = std::make_shared<std::string>();
|
||||||
|
*body = payload.slice().toJson();
|
||||||
|
|
||||||
// Now prepare the requests:
|
// Now prepare the requests:
|
||||||
std::vector<ClusterCommRequest> requests;
|
std::vector<ClusterCommRequest> requests;
|
||||||
for (auto const& f : *followers) {
|
for (auto const& f : *followers) {
|
||||||
requests.emplace_back("server:" + f,
|
requests.emplace_back("server:" + f,
|
||||||
operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ?
|
operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ?
|
||||||
arangodb::rest::RequestType::PUT :
|
arangodb::rest::RequestType::PUT :
|
||||||
arangodb::rest::RequestType::PATCH,
|
arangodb::rest::RequestType::PATCH,
|
||||||
path, body);
|
path, body);
|
||||||
}
|
}
|
||||||
size_t nrDone = 0;
|
size_t nrDone = 0;
|
||||||
size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, nrDone,
|
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
|
||||||
Logger::REPLICATION);
|
nrDone, Logger::REPLICATION);
|
||||||
if (nrGood < followers->size()) {
|
if (nrGood < followers->size()) {
|
||||||
// we drop all followers that were not successful:
|
// we drop all followers that were not successful:
|
||||||
for (size_t i = 0; i < followers->size(); ++i) {
|
for (size_t i = 0; i < followers->size(); ++i) {
|
||||||
bool replicationWorked
|
bool replicationWorked
|
||||||
= requests[i].done &&
|
= requests[i].done &&
|
||||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||||
(requests[i].result.answer_code ==
|
(requests[i].result.answer_code ==
|
||||||
rest::ResponseCode::ACCEPTED ||
|
rest::ResponseCode::ACCEPTED ||
|
||||||
requests[i].result.answer_code ==
|
requests[i].result.answer_code ==
|
||||||
rest::ResponseCode::OK);
|
rest::ResponseCode::OK);
|
||||||
if (replicationWorked) {
|
if (replicationWorked) {
|
||||||
bool found;
|
bool found;
|
||||||
requests[i].result.answer->header(StaticStrings::ErrorCodes, found);
|
requests[i].result.answer->header(StaticStrings::ErrorCodes, found);
|
||||||
replicationWorked = !found;
|
replicationWorked = !found;
|
||||||
}
|
}
|
||||||
if (!replicationWorked) {
|
if (!replicationWorked) {
|
||||||
auto const& followerInfo = collection->followers();
|
auto const& followerInfo = collection->followers();
|
||||||
followerInfo->remove((*followers)[i]);
|
followerInfo->remove((*followers)[i]);
|
||||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||||
<< "modifyLocal: dropping follower "
|
<< "modifyLocal: dropping follower "
|
||||||
<< (*followers)[i] << " for shard " << collectionName;
|
<< (*followers)[i] << " for shard " << collectionName;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2524,6 +2548,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName,
|
||||||
};
|
};
|
||||||
|
|
||||||
VPackSlice ourResult = resultBuilder.slice();
|
VPackSlice ourResult = resultBuilder.slice();
|
||||||
|
size_t count = 0;
|
||||||
if (value.isArray()) {
|
if (value.isArray()) {
|
||||||
VPackArrayBuilder guard(&payload);
|
VPackArrayBuilder guard(&payload);
|
||||||
VPackArrayIterator itValue(value);
|
VPackArrayIterator itValue(value);
|
||||||
|
@ -2532,6 +2557,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName,
|
||||||
TRI_ASSERT((*itResult).isObject());
|
TRI_ASSERT((*itResult).isObject());
|
||||||
if (!(*itResult).hasKey("error")) {
|
if (!(*itResult).hasKey("error")) {
|
||||||
doOneDoc(itValue.value(), itResult.value());
|
doOneDoc(itValue.value(), itResult.value());
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
itValue.next();
|
itValue.next();
|
||||||
itResult.next();
|
itResult.next();
|
||||||
|
@ -2539,41 +2565,44 @@ OperationResult Transaction::removeLocal(std::string const& collectionName,
|
||||||
} else {
|
} else {
|
||||||
VPackArrayBuilder guard(&payload);
|
VPackArrayBuilder guard(&payload);
|
||||||
doOneDoc(value, ourResult);
|
doOneDoc(value, ourResult);
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
auto body = std::make_shared<std::string>();
|
if (count > 0) {
|
||||||
*body = payload.slice().toJson();
|
auto body = std::make_shared<std::string>();
|
||||||
|
*body = payload.slice().toJson();
|
||||||
|
|
||||||
// Now prepare the requests:
|
// Now prepare the requests:
|
||||||
std::vector<ClusterCommRequest> requests;
|
std::vector<ClusterCommRequest> requests;
|
||||||
for (auto const& f : *followers) {
|
for (auto const& f : *followers) {
|
||||||
requests.emplace_back("server:" + f,
|
requests.emplace_back("server:" + f,
|
||||||
arangodb::rest::RequestType::DELETE_REQ,
|
arangodb::rest::RequestType::DELETE_REQ,
|
||||||
path, body);
|
path, body);
|
||||||
}
|
}
|
||||||
size_t nrDone = 0;
|
size_t nrDone = 0;
|
||||||
size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, nrDone,
|
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
|
||||||
Logger::REPLICATION);
|
nrDone, Logger::REPLICATION);
|
||||||
if (nrGood < followers->size()) {
|
if (nrGood < followers->size()) {
|
||||||
// we drop all followers that were not successful:
|
// we drop all followers that were not successful:
|
||||||
for (size_t i = 0; i < followers->size(); ++i) {
|
for (size_t i = 0; i < followers->size(); ++i) {
|
||||||
bool replicationWorked
|
bool replicationWorked
|
||||||
= requests[i].done &&
|
= requests[i].done &&
|
||||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||||
(requests[i].result.answer_code ==
|
(requests[i].result.answer_code ==
|
||||||
rest::ResponseCode::ACCEPTED ||
|
rest::ResponseCode::ACCEPTED ||
|
||||||
requests[i].result.answer_code ==
|
requests[i].result.answer_code ==
|
||||||
rest::ResponseCode::OK);
|
rest::ResponseCode::OK);
|
||||||
if (replicationWorked) {
|
if (replicationWorked) {
|
||||||
bool found;
|
bool found;
|
||||||
requests[i].result.answer->header(StaticStrings::ErrorCodes, found);
|
requests[i].result.answer->header(StaticStrings::ErrorCodes, found);
|
||||||
replicationWorked = !found;
|
replicationWorked = !found;
|
||||||
}
|
}
|
||||||
if (!replicationWorked) {
|
if (!replicationWorked) {
|
||||||
auto const& followerInfo = collection->followers();
|
auto const& followerInfo = collection->followers();
|
||||||
followerInfo->remove((*followers)[i]);
|
followerInfo->remove((*followers)[i]);
|
||||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||||
<< "removeLocal: dropping follower "
|
<< "removeLocal: dropping follower "
|
||||||
<< (*followers)[i] << " for shard " << collectionName;
|
<< (*followers)[i] << " for shard " << collectionName;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ var endpointToURL = function (endpoint) {
|
||||||
function startReadLockOnLeader (endpoint, database, collName, timeout) {
|
function startReadLockOnLeader (endpoint, database, collName, timeout) {
|
||||||
var url = endpointToURL(endpoint) + '/_db/' + database;
|
var url = endpointToURL(endpoint) + '/_db/' + database;
|
||||||
var r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
var r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
||||||
method: 'GET' });
|
method: 'GET' });
|
||||||
if (r.status !== 200) {
|
if (r.status !== 200) {
|
||||||
console.error('startReadLockOnLeader: Could not get ID for shard',
|
console.error('startReadLockOnLeader: Could not get ID for shard',
|
||||||
collName, r);
|
collName, r);
|
||||||
|
@ -83,30 +83,47 @@ function startReadLockOnLeader (endpoint, database, collName, timeout) {
|
||||||
|
|
||||||
var body = { 'id': id, 'collection': collName, 'ttl': timeout };
|
var body = { 'id': id, 'collection': collName, 'ttl': timeout };
|
||||||
r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
||||||
body: JSON.stringify(body),
|
body: JSON.stringify(body),
|
||||||
method: 'POST', headers: {'x-arango-async': 'store'} });
|
method: 'POST', headers: {'x-arango-async': true} });
|
||||||
if (r.status !== 202) {
|
if (r.status !== 202) {
|
||||||
console.error('startReadLockOnLeader: Could not start read lock for shard',
|
console.error('startReadLockOnLeader: Could not start read lock for shard',
|
||||||
collName, r);
|
collName, r);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
var rr = r; // keep a copy
|
|
||||||
|
|
||||||
var count = 0;
|
var count = 0;
|
||||||
while (++count < 20) { // wait for some time until read lock established:
|
while (++count < 20) { // wait for some time until read lock established:
|
||||||
// Now check that we hold the read lock:
|
// Now check that we hold the read lock:
|
||||||
r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
||||||
body: JSON.stringify(body),
|
body: JSON.stringify(body), method: 'PUT' });
|
||||||
method: 'PUT' });
|
|
||||||
if (r.status === 200) {
|
if (r.status === 200) {
|
||||||
return id;
|
let ansBody = {};
|
||||||
|
try {
|
||||||
|
ansBody = JSON.parse(r.body);
|
||||||
|
} catch (err) {
|
||||||
|
}
|
||||||
|
if (ansBody.lockHeld) {
|
||||||
|
return id;
|
||||||
|
} else {
|
||||||
|
console.debug('startReadLockOnLeader: Lock not yet acquired...');
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
console.debug('startReadLockOnLeader: Do not see read lock yet...');
|
||||||
}
|
}
|
||||||
console.debug('startReadLockOnLeader: Do not see read lock yet...');
|
|
||||||
wait(0.5);
|
wait(0.5);
|
||||||
}
|
}
|
||||||
var asyncJobId = rr.headers['x-arango-async-id'];
|
console.error('startReadLockOnLeader: giving up');
|
||||||
r = request({ url: url + '/_api/job/' + asyncJobId, body: '', method: 'PUT'});
|
try {
|
||||||
console.error('startReadLockOnLeader: giving up, async result:', r);
|
r = request({ url: url + '/_api/replication/holdReadLockCollection',
|
||||||
|
body: JSON.stringify({'id': id}), method: 'DELETE' });
|
||||||
|
} catch (err2) {
|
||||||
|
console.error('startReadLockOnLeader: expection in cancel:',
|
||||||
|
JSON.stringify(err2));
|
||||||
|
}
|
||||||
|
if (r.status !== 200) {
|
||||||
|
console.error('startReadLockOnLeader: cancelation error for shard',
|
||||||
|
collName, r);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,8 +544,7 @@ function synchronizeOneShard (database, shard, planId, leader) {
|
||||||
'syncCollectionFinalize:', err3);
|
'syncCollectionFinalize:', err3);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (!cancelReadLockOnLeader(ep, database,
|
if (!cancelReadLockOnLeader(ep, database, lockJobId)) {
|
||||||
lockJobId)) {
|
|
||||||
console.error('synchronizeOneShard: read lock has timed out',
|
console.error('synchronizeOneShard: read lock has timed out',
|
||||||
'for shard', shard);
|
'for shard', shard);
|
||||||
ok = false;
|
ok = false;
|
||||||
|
@ -539,7 +555,7 @@ function synchronizeOneShard (database, shard, planId, leader) {
|
||||||
shard);
|
shard);
|
||||||
}
|
}
|
||||||
if (ok) {
|
if (ok) {
|
||||||
console.debug('synchronizeOneShard: synchronization worked for shard',
|
console.info('synchronizeOneShard: synchronization worked for shard',
|
||||||
shard);
|
shard);
|
||||||
} else {
|
} else {
|
||||||
throw 'Did not work for shard ' + shard + '.';
|
throw 'Did not work for shard ' + shard + '.';
|
||||||
|
|
|
@ -339,7 +339,7 @@ function syncCollectionFinalize (database, collname, from, config) {
|
||||||
l = l.map(JSON.parse);
|
l = l.map(JSON.parse);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return {error: true, errorMessage: 'could not parse body',
|
return {error: true, errorMessage: 'could not parse body',
|
||||||
response: chunk, exception: err};
|
response: chunk, exception: err};
|
||||||
}
|
}
|
||||||
|
|
||||||
console.debug('Applying chunk:', l);
|
console.debug('Applying chunk:', l);
|
||||||
|
@ -349,7 +349,7 @@ function syncCollectionFinalize (database, collname, from, config) {
|
||||||
}
|
}
|
||||||
} catch (err2) {
|
} catch (err2) {
|
||||||
return {error: true, errorMessage: 'could not replicate body ops',
|
return {error: true, errorMessage: 'could not replicate body ops',
|
||||||
response: chunk, exception: err2};
|
response: chunk, exception: err2};
|
||||||
}
|
}
|
||||||
if (chunk.headers['x-arango-replication-checkmore'] !== 'true') {
|
if (chunk.headers['x-arango-replication-checkmore'] !== 'true') {
|
||||||
break; // all done
|
break; // all done
|
||||||
|
|
Loading…
Reference in New Issue