1
0
Fork 0

Implement synchronous replication for truncate.

This commit is contained in:
Max Neunhoeffer 2016-04-22 12:21:50 +02:00
parent 45d7ae567f
commit f3cb1e2bc0
1 changed files with 51 additions and 0 deletions

View File

@ -2081,6 +2081,57 @@ OperationResult Transaction::truncateLocal(std::string const& collectionName,
return OperationResult(ex.code());
}
// Now see whether or not we have to do synchronous replication:
if (ServerState::instance()->isDBServer()) {
std::shared_ptr<std::vector<ServerID> const> followers;
// Now replicate the same operation on all followers:
auto const& followerInfo = document->followers();
followers = followerInfo->get();
if (followers->size() > 0) {
// Now replicate the good operations on all followers:
auto cc = arangodb::ClusterComm::instance();
std::string path
= "/_db/" +
arangodb::basics::StringUtils::urlEncode(_vocbase->_name) +
"/_api/collection/" + collectionName + "/truncate";
auto body = std::make_shared<std::string>();
// Now prepare the requests:
std::vector<ClusterCommRequest> requests;
for (auto const& f : *followers) {
requests.emplace_back("server:" + f,
arangodb::GeneralRequest::RequestType::PUT,
path, body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, 15.0, nrDone,
Logger::REPLICATION);
if (nrGood < followers->size()) {
// we drop all followers that were not successful:
for (size_t i = 0; i < followers->size(); ++i) {
bool replicationWorked
= requests[i].done &&
requests[i].result.status == CL_COMM_RECEIVED &&
(requests[i].result.answer_code !=
GeneralResponse::ResponseCode::ACCEPTED &&
requests[i].result.answer_code !=
GeneralResponse::ResponseCode::OK);
if (!replicationWorked) {
auto const& followerInfo = document->followers();
followerInfo->remove((*followers)[i]);
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "truncateLocal: dropping follower "
<< (*followers)[i] << " for shard " << collectionName;
}
}
}
}
}
res = unlock(trxCollection(cid), TRI_TRANSACTION_WRITE);
if (res != TRI_ERROR_NO_ERROR) {