1
0
Fork 0

Fix synchronous replication.

This commit is contained in:
Max Neunhoeffer 2016-04-20 13:20:07 +02:00
parent 5dcd21e23d
commit 10ce1b262f
1 changed files with 33 additions and 11 deletions

View File

@ -1211,6 +1211,8 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
!isLocked(document, TRI_TRANSACTION_WRITE)); !isLocked(document, TRI_TRANSACTION_WRITE));
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
// Error reporting in the babies case is done outside of here,
// in the single document case no body needs to be created at all.
return res; return res;
} }
@ -1232,8 +1234,9 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
}; };
int res = TRI_ERROR_NO_ERROR; int res = TRI_ERROR_NO_ERROR;
bool multiCase = value.isArray();
std::unordered_map<int, size_t> countErrorCodes; std::unordered_map<int, size_t> countErrorCodes;
if (value.isArray()) { if (multiCase) {
VPackArrayBuilder b(&resultBuilder); VPackArrayBuilder b(&resultBuilder);
for (auto const& s : VPackArrayIterator(value)) { for (auto const& s : VPackArrayIterator(value)) {
res = workForOneDocument(s); res = workForOneDocument(s);
@ -1241,14 +1244,18 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
createBabiesError(resultBuilder, countErrorCodes, res); createBabiesError(resultBuilder, countErrorCodes, res);
} }
} }
// With babies the reporting is handled somewhere else. // With babies the reporting is handled in the body of the result
res = TRI_ERROR_NO_ERROR; res = TRI_ERROR_NO_ERROR;
} else { } else {
res = workForOneDocument(value); res = workForOneDocument(value);
} }
if (doingSynchronousReplication) { if (doingSynchronousReplication && res == TRI_ERROR_NO_ERROR) {
// Now replicate the same operation on all followers: // In the multi babies case res is always TRI_ERROR_NO_ERROR if we
// get here, in the single document case, we do not try to replicate
// in case of an error.
// Now replicate the good operations on all followers:
auto cc = arangodb::ClusterComm::instance(); auto cc = arangodb::ClusterComm::instance();
std::string path std::string path
@ -1275,7 +1282,10 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
VPackArrayIterator itValue(value); VPackArrayIterator itValue(value);
VPackArrayIterator itResult(ourResult); VPackArrayIterator itResult(ourResult);
while (itValue.valid() && itResult.valid()) { while (itValue.valid() && itResult.valid()) {
doOneDoc(itValue.value(), itResult.value()); TRI_ASSERT(itResult->isObject());
if (!(*itResult).hasKey("error")) {
doOneDoc(itValue.value(), itResult.value());
}
itValue.next(); itValue.next();
itResult.next(); itResult.next();
} }
@ -1298,12 +1308,19 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
if (nrGood < followers->size()) { if (nrGood < followers->size()) {
// we drop all followers that were not successful: // we drop all followers that were not successful:
for (size_t i = 0; i < followers->size(); ++i) { for (size_t i = 0; i < followers->size(); ++i) {
if (!requests[i].done || bool replicationWorked
requests[i].result.status != CL_COMM_RECEIVED || = requests[i].done &&
(requests[i].result.answer_code != requests[i].result.status == CL_COMM_RECEIVED &&
GeneralResponse::ResponseCode::ACCEPTED && (requests[i].result.answer_code !=
requests[i].result.answer_code != GeneralResponse::ResponseCode::ACCEPTED &&
GeneralResponse::ResponseCode::CREATED)) { requests[i].result.answer_code !=
GeneralResponse::ResponseCode::CREATED);
if (replicationWorked) {
bool found;
requests[i].result.answer->header("x-arango-error-codes", found);
replicationWorked = !found;
}
if (!replicationWorked) {
auto const& followerInfo = document->followers(); auto const& followerInfo = document->followers();
followerInfo->remove((*followers)[i]); followerInfo->remove((*followers)[i]);
LOG_TOPIC(ERR, Logger::REPLICATION) LOG_TOPIC(ERR, Logger::REPLICATION)
@ -1314,6 +1331,11 @@ OperationResult Transaction::insertLocal(std::string const& collectionName,
} }
} }
if (doingSynchronousReplication && options.silent) {
// We needed the results, but do not want to report:
resultBuilder.clear();
}
return OperationResult(resultBuilder.steal(), nullptr, "", res, return OperationResult(resultBuilder.steal(), nullptr, "", res,
options.waitForSync, countErrorCodes); options.waitForSync, countErrorCodes);
} }