mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
7be2a951a6
|
@ -41,6 +41,8 @@
|
||||||
using namespace arangodb::basics;
|
using namespace arangodb::basics;
|
||||||
using namespace arangodb::rest;
|
using namespace arangodb::rest;
|
||||||
|
|
||||||
|
static size_t const CL_DEFAULT_TIMEOUT = 60.0;
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
|
|
||||||
static int handleGeneralCommErrors(ClusterCommResult const* res) {
|
static int handleGeneralCommErrors(ClusterCommResult const* res) {
|
||||||
|
@ -326,15 +328,14 @@ static int distributeBabyOnShards(
|
||||||
template <typename T>
|
template <typename T>
|
||||||
static void collectResultsFromAllShards(
|
static void collectResultsFromAllShards(
|
||||||
std::unordered_map<ShardID, std::vector<T>> const& shardMap,
|
std::unordered_map<ShardID, std::vector<T>> const& shardMap,
|
||||||
ClusterComm* cc, CoordTransactionID const& coordTransactionID,
|
std::vector<ClusterCommRequest>& requests,
|
||||||
std::unordered_map<int, size_t>& errorCounter,
|
std::unordered_map<int, size_t>& errorCounter,
|
||||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>>& resultMap,
|
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>>& resultMap,
|
||||||
GeneralResponse::ResponseCode& responseCode) {
|
GeneralResponse::ResponseCode& responseCode) {
|
||||||
size_t count;
|
|
||||||
// If none of the shards responds we return a SERVER_ERROR;
|
// If none of the shards responds we return a SERVER_ERROR;
|
||||||
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
||||||
for (count = shardMap.size(); count > 0; count--) {
|
for (auto const& req : requests) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto res = req.result;
|
||||||
if (res.status == CL_COMM_RECEIVED) {
|
if (res.status == CL_COMM_RECEIVED) {
|
||||||
int commError = handleGeneralCommErrors(&res);
|
int commError = handleGeneralCommErrors(&res);
|
||||||
if (commError != TRI_ERROR_NO_ERROR) {
|
if (commError != TRI_ERROR_NO_ERROR) {
|
||||||
|
@ -771,7 +772,6 @@ int countOnCoordinator(std::string const& dbname, std::string const& collname,
|
||||||
int createDocumentOnCoordinator(
|
int createDocumentOnCoordinator(
|
||||||
std::string const& dbname, std::string const& collname,
|
std::string const& dbname, std::string const& collname,
|
||||||
arangodb::OperationOptions const& options, VPackSlice const& slice,
|
arangodb::OperationOptions const& options, VPackSlice const& slice,
|
||||||
std::map<std::string, std::string> const& headers,
|
|
||||||
arangodb::GeneralResponse::ResponseCode& responseCode,
|
arangodb::GeneralResponse::ResponseCode& responseCode,
|
||||||
std::unordered_map<int, size_t>& errorCounter,
|
std::unordered_map<int, size_t>& errorCounter,
|
||||||
std::shared_ptr<VPackBuilder>& resultBody) {
|
std::shared_ptr<VPackBuilder>& resultBody) {
|
||||||
|
@ -820,8 +820,9 @@ int createDocumentOnCoordinator(
|
||||||
(options.returnOld ? "true" : "false");
|
(options.returnOld ? "true" : "false");
|
||||||
|
|
||||||
VPackBuilder reqBuilder;
|
VPackBuilder reqBuilder;
|
||||||
CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
|
||||||
|
|
||||||
|
// Now prepare the requests:
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
auto body = std::make_shared<std::string>();
|
auto body = std::make_shared<std::string>();
|
||||||
for (auto const& it : shardMap) {
|
for (auto const& it : shardMap) {
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
|
@ -853,21 +854,24 @@ int createDocumentOnCoordinator(
|
||||||
reqBuilder.close();
|
reqBuilder.close();
|
||||||
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
||||||
}
|
}
|
||||||
auto headersCopy =
|
|
||||||
std::make_unique<std::map<std::string, std::string>>(headers);
|
requests.emplace_back(
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + it.first,
|
"shard:" + it.first, arangodb::GeneralRequest::RequestType::POST,
|
||||||
arangodb::GeneralRequest::RequestType::POST,
|
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body);
|
||||||
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
|
|
||||||
body, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
TRI_ASSERT(requests.size() == 1);
|
||||||
|
auto const& req = requests[0];
|
||||||
int commError = handleGeneralCommErrors(&res);
|
auto res = req.result;
|
||||||
if (commError != TRI_ERROR_NO_ERROR) {
|
if (nrDone == 0) {
|
||||||
return commError;
|
// There has been Communcation error. Handle and return it.
|
||||||
|
return handleGeneralCommErrors(&res);
|
||||||
}
|
}
|
||||||
|
|
||||||
responseCode = res.answer_code;
|
responseCode = res.answer_code;
|
||||||
|
@ -880,7 +884,7 @@ int createDocumentOnCoordinator(
|
||||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
||||||
|
|
||||||
collectResultsFromAllShards<std::pair<VPackValueLength, std::string>>(
|
collectResultsFromAllShards<std::pair<VPackValueLength, std::string>>(
|
||||||
shardMap, cc, coordTransactionID, errorCounter, resultMap, responseCode);
|
shardMap, requests, errorCounter, resultMap, responseCode);
|
||||||
|
|
||||||
responseCode =
|
responseCode =
|
||||||
(options.waitForSync ? GeneralResponse::ResponseCode::CREATED
|
(options.waitForSync ? GeneralResponse::ResponseCode::CREATED
|
||||||
|
@ -900,7 +904,6 @@ int deleteDocumentOnCoordinator(
|
||||||
std::string const& dbname, std::string const& collname,
|
std::string const& dbname, std::string const& collname,
|
||||||
VPackSlice const slice,
|
VPackSlice const slice,
|
||||||
arangodb::OperationOptions const& options,
|
arangodb::OperationOptions const& options,
|
||||||
std::unique_ptr<std::map<std::string, std::string>>& headers,
|
|
||||||
arangodb::GeneralResponse::ResponseCode& responseCode,
|
arangodb::GeneralResponse::ResponseCode& responseCode,
|
||||||
std::unordered_map<int, size_t>& errorCounter,
|
std::unordered_map<int, size_t>& errorCounter,
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>& resultBody) {
|
std::shared_ptr<arangodb::velocypack::Builder>& resultBody) {
|
||||||
|
@ -928,7 +931,6 @@ int deleteDocumentOnCoordinator(
|
||||||
|
|
||||||
|
|
||||||
VPackBuilder reqBuilder;
|
VPackBuilder reqBuilder;
|
||||||
CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
|
||||||
|
|
||||||
if (useDefaultSharding) {
|
if (useDefaultSharding) {
|
||||||
// fastpath we know which server is responsible.
|
// fastpath we know which server is responsible.
|
||||||
|
@ -994,6 +996,8 @@ int deleteDocumentOnCoordinator(
|
||||||
|
|
||||||
// We sorted the shards correctly.
|
// We sorted the shards correctly.
|
||||||
|
|
||||||
|
// Now prepare the requests:
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
auto body = std::make_shared<std::string>();
|
auto body = std::make_shared<std::string>();
|
||||||
for (auto const& it : shardMap) {
|
for (auto const& it : shardMap) {
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
|
@ -1008,23 +1012,24 @@ int deleteDocumentOnCoordinator(
|
||||||
reqBuilder.close();
|
reqBuilder.close();
|
||||||
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
||||||
}
|
}
|
||||||
auto headersCopy =
|
requests.emplace_back(
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
"shard:" + it.first,
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + it.first,
|
arangodb::GeneralRequest::RequestType::DELETE_REQ,
|
||||||
arangodb::GeneralRequest::RequestType::DELETE_REQ,
|
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body);
|
||||||
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
|
|
||||||
body, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
TRI_ASSERT(requests.size() == 1);
|
||||||
|
auto const& req = requests[0];
|
||||||
int commError = handleGeneralCommErrors(&res);
|
auto res = req.result;
|
||||||
if (commError != TRI_ERROR_NO_ERROR) {
|
if (nrDone == 0) {
|
||||||
return commError;
|
return handleGeneralCommErrors(&res);
|
||||||
}
|
}
|
||||||
|
|
||||||
responseCode = res.answer_code;
|
responseCode = res.answer_code;
|
||||||
TRI_ASSERT(res.answer != nullptr);
|
TRI_ASSERT(res.answer != nullptr);
|
||||||
auto parsedResult = res.answer->toVelocyPack(&VPackOptions::Defaults);
|
auto parsedResult = res.answer->toVelocyPack(&VPackOptions::Defaults);
|
||||||
|
@ -1034,7 +1039,7 @@ int deleteDocumentOnCoordinator(
|
||||||
|
|
||||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
||||||
collectResultsFromAllShards<VPackValueLength>(
|
collectResultsFromAllShards<VPackValueLength>(
|
||||||
shardMap, cc, coordTransactionID, errorCounter, resultMap, responseCode);
|
shardMap, requests, errorCounter, resultMap, responseCode);
|
||||||
mergeResults(reverseMapping, resultMap, resultBody);
|
mergeResults(reverseMapping, resultMap, resultBody);
|
||||||
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
|
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
|
||||||
// the DBserver could have reported an error.
|
// the DBserver could have reported an error.
|
||||||
|
@ -1051,23 +1056,26 @@ int deleteDocumentOnCoordinator(
|
||||||
// if (!skipped) => insert NOT_FOUND
|
// if (!skipped) => insert NOT_FOUND
|
||||||
|
|
||||||
auto body = std::make_shared<std::string>(slice.toJson());
|
auto body = std::make_shared<std::string>(slice.toJson());
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
auto shardList = ci->getShardList(collid);
|
auto shardList = ci->getShardList(collid);
|
||||||
for (auto const& shard : *shardList) {
|
for (auto const& shard : *shardList) {
|
||||||
auto headersCopy =
|
requests.emplace_back(
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
"shard:" + shard, arangodb::GeneralRequest::RequestType::DELETE_REQ,
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + shard,
|
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, body);
|
||||||
arangodb::GeneralRequest::RequestType::DELETE_REQ,
|
|
||||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
|
||||||
body, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
// Only one can answer, we react a bit differently
|
// Only one can answer, we react a bit differently
|
||||||
int count;
|
size_t count;
|
||||||
int nrok = 0;
|
int nrok = 0;
|
||||||
for (count = (int)shardList->size(); count > 0; count--) {
|
for (count = requests.size(); count > 0; count--) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto const& req = requests[count - 1];
|
||||||
|
auto res = req.result;
|
||||||
if (res.status == CL_COMM_RECEIVED) {
|
if (res.status == CL_COMM_RECEIVED) {
|
||||||
if (res.answer_code !=
|
if (res.answer_code !=
|
||||||
arangodb::GeneralResponse::ResponseCode::NOT_FOUND ||
|
arangodb::GeneralResponse::ResponseCode::NOT_FOUND ||
|
||||||
|
@ -1095,12 +1103,10 @@ int deleteDocumentOnCoordinator(
|
||||||
allResults.reserve(shardList->size());
|
allResults.reserve(shardList->size());
|
||||||
// If no server responds we return 500
|
// If no server responds we return 500
|
||||||
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
||||||
for (size_t i = 0; i < shardList->size(); ++i) {
|
for (auto const& req : requests) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto res = req.result;
|
||||||
int error = handleGeneralCommErrors(&res);
|
int error = handleGeneralCommErrors(&res);
|
||||||
if (error != TRI_ERROR_NO_ERROR) {
|
if (error != TRI_ERROR_NO_ERROR) {
|
||||||
// Cluster is in bad state. Just report. Drop other results.
|
|
||||||
cc->drop("", coordTransactionID, 0, "");
|
|
||||||
// Local data structores are automatically freed
|
// Local data structores are automatically freed
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
|
@ -1252,24 +1258,26 @@ int getDocumentOnCoordinator(
|
||||||
// Contact all shards directly with the correct information.
|
// Contact all shards directly with the correct information.
|
||||||
|
|
||||||
VPackBuilder reqBuilder;
|
VPackBuilder reqBuilder;
|
||||||
|
|
||||||
|
// Now prepare the requests:
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
auto body = std::make_shared<std::string>();
|
auto body = std::make_shared<std::string>();
|
||||||
for (auto const& it : shardMap) {
|
for (auto const& it : shardMap) {
|
||||||
auto headersCopy =
|
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
TRI_ASSERT(it.second.size() == 1);
|
TRI_ASSERT(it.second.size() == 1);
|
||||||
if (!options.ignoreRevs && slice.hasKey(TRI_VOC_ATTRIBUTE_REV)) {
|
if (!options.ignoreRevs && slice.hasKey(TRI_VOC_ATTRIBUTE_REV)) {
|
||||||
headersCopy->emplace("if-match", slice.get(TRI_VOC_ATTRIBUTE_REV).copyString());
|
headers->emplace("if-match", slice.get(TRI_VOC_ATTRIBUTE_REV).copyString());
|
||||||
}
|
}
|
||||||
|
|
||||||
// We send to single endpoint
|
// We send to single endpoint
|
||||||
cc->asyncRequest(
|
requests.emplace_back(
|
||||||
"", coordTransactionID, "shard:" + it.first, reqType,
|
"shard:" + it.first, reqType,
|
||||||
baseUrl + StringUtils::urlEncode(it.first) + "/" +
|
baseUrl + StringUtils::urlEncode(it.first) + "/" +
|
||||||
StringUtils::urlEncode(
|
StringUtils::urlEncode(
|
||||||
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString()) +
|
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString()) +
|
||||||
optsUrlPart,
|
optsUrlPart,
|
||||||
body, headersCopy, nullptr, 60.0);
|
body);
|
||||||
|
requests[0].setHeaders(headers);
|
||||||
} else {
|
} else {
|
||||||
reqBuilder.clear();
|
reqBuilder.clear();
|
||||||
reqBuilder.openArray();
|
reqBuilder.openArray();
|
||||||
|
@ -1279,15 +1287,21 @@ int getDocumentOnCoordinator(
|
||||||
reqBuilder.close();
|
reqBuilder.close();
|
||||||
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
||||||
// We send to Babies endpoint
|
// We send to Babies endpoint
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + it.first, reqType,
|
requests.emplace_back(
|
||||||
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
|
"shard:" + it.first, reqType,
|
||||||
body, headersCopy, nullptr, 60.0);
|
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
TRI_ASSERT(requests.size() == 1);
|
||||||
|
auto const& req = requests[0];
|
||||||
|
auto res = req.result;
|
||||||
|
|
||||||
int commError = handleGeneralCommErrors(&res);
|
int commError = handleGeneralCommErrors(&res);
|
||||||
if (commError != TRI_ERROR_NO_ERROR) {
|
if (commError != TRI_ERROR_NO_ERROR) {
|
||||||
|
@ -1303,7 +1317,7 @@ int getDocumentOnCoordinator(
|
||||||
|
|
||||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
||||||
collectResultsFromAllShards<VPackValueLength>(
|
collectResultsFromAllShards<VPackValueLength>(
|
||||||
shardMap, cc, coordTransactionID, errorCounter, resultMap, responseCode);
|
shardMap, requests, errorCounter, resultMap, responseCode);
|
||||||
|
|
||||||
mergeResults(reverseMapping, resultMap, resultBody);
|
mergeResults(reverseMapping, resultMap, resultBody);
|
||||||
|
|
||||||
|
@ -1315,6 +1329,7 @@ int getDocumentOnCoordinator(
|
||||||
// Not all shard keys are known in all documents.
|
// Not all shard keys are known in all documents.
|
||||||
// We contact all shards with the complete body and ignore NOT_FOUND
|
// We contact all shards with the complete body and ignore NOT_FOUND
|
||||||
|
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
auto shardList = ci->getShardList(collid);
|
auto shardList = ci->getShardList(collid);
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
|
|
||||||
|
@ -1322,33 +1337,39 @@ int getDocumentOnCoordinator(
|
||||||
headers->emplace("if-match", slice.get(TRI_VOC_ATTRIBUTE_REV).copyString());
|
headers->emplace("if-match", slice.get(TRI_VOC_ATTRIBUTE_REV).copyString());
|
||||||
}
|
}
|
||||||
for (auto const& shard : *shardList) {
|
for (auto const& shard : *shardList) {
|
||||||
|
ClusterCommRequest req(
|
||||||
|
"shard:" + shard, reqType,
|
||||||
|
baseUrl + StringUtils::urlEncode(shard) + "/" +
|
||||||
|
StringUtils::urlEncode(
|
||||||
|
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString()) +
|
||||||
|
optsUrlPart,
|
||||||
|
nullptr);
|
||||||
auto headersCopy =
|
auto headersCopy =
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
std::make_unique<std::map<std::string, std::string>>(*headers);
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + shard, reqType,
|
req.setHeaders(headersCopy);
|
||||||
baseUrl + StringUtils::urlEncode(shard) + "/" +
|
requests.emplace_back(std::move(req));
|
||||||
StringUtils::urlEncode(
|
|
||||||
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString()) +
|
|
||||||
optsUrlPart,
|
|
||||||
nullptr, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
auto body = std::make_shared<std::string>(slice.toJson());
|
auto body = std::make_shared<std::string>(slice.toJson());
|
||||||
for (auto const& shard : *shardList) {
|
for (auto const& shard : *shardList) {
|
||||||
auto headersCopy =
|
requests.emplace_back(
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
"shard:" + shard, reqType,
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + shard, reqType,
|
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, body);
|
||||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
|
||||||
body, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
// Only one can answer, we react a bit differently
|
// Only one can answer, we react a bit differently
|
||||||
int count;
|
size_t count;
|
||||||
int nrok = 0;
|
int nrok = 0;
|
||||||
for (count = (int)shardList->size(); count > 0; count--) {
|
for (count = requests.size(); count > 0; count--) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto const& req = requests[count - 1];
|
||||||
|
auto res = req.result;
|
||||||
if (res.status == CL_COMM_RECEIVED) {
|
if (res.status == CL_COMM_RECEIVED) {
|
||||||
if (res.answer_code !=
|
if (res.answer_code !=
|
||||||
arangodb::GeneralResponse::ResponseCode::NOT_FOUND ||
|
arangodb::GeneralResponse::ResponseCode::NOT_FOUND ||
|
||||||
|
@ -1375,8 +1396,8 @@ int getDocumentOnCoordinator(
|
||||||
allResults.reserve(shardList->size());
|
allResults.reserve(shardList->size());
|
||||||
// If no server responds we return 500
|
// If no server responds we return 500
|
||||||
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
||||||
for (size_t i = 0; i < shardList->size(); ++i) {
|
for (auto const& req : requests) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto res = req.result;
|
||||||
int error = handleGeneralCommErrors(&res);
|
int error = handleGeneralCommErrors(&res);
|
||||||
if (error != TRI_ERROR_NO_ERROR) {
|
if (error != TRI_ERROR_NO_ERROR) {
|
||||||
// Cluster is in bad state. Just report. Drop other results.
|
// Cluster is in bad state. Just report. Drop other results.
|
||||||
|
@ -1461,7 +1482,6 @@ static void insertIntoShardMap(
|
||||||
int getFilteredDocumentsOnCoordinator(
|
int getFilteredDocumentsOnCoordinator(
|
||||||
std::string const& dbname,
|
std::string const& dbname,
|
||||||
std::vector<traverser::TraverserExpression*> const& expressions,
|
std::vector<traverser::TraverserExpression*> const& expressions,
|
||||||
std::unique_ptr<std::map<std::string, std::string>>& headers,
|
|
||||||
std::unordered_set<std::string>& documentIds,
|
std::unordered_set<std::string>& documentIds,
|
||||||
std::unordered_map<std::string, std::shared_ptr<VPackBuffer<uint8_t>>>& result) {
|
std::unordered_map<std::string, std::shared_ptr<VPackBuffer<uint8_t>>>& result) {
|
||||||
// Set a few variables needed for our work:
|
// Set a few variables needed for our work:
|
||||||
|
@ -1477,11 +1497,9 @@ int getFilteredDocumentsOnCoordinator(
|
||||||
// We do not have to care for shard attributes esp. shard by key.
|
// We do not have to care for shard attributes esp. shard by key.
|
||||||
// If it is by key the key was only added to one key list, if not
|
// If it is by key the key was only added to one key list, if not
|
||||||
// it is contained multiple times.
|
// it is contained multiple times.
|
||||||
CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
std::vector<ClusterCommRequest> requests;
|
||||||
VPackBuilder bodyBuilder;
|
VPackBuilder bodyBuilder;
|
||||||
for (auto const& shard : shardRequestMap) {
|
for (auto const& shard : shardRequestMap) {
|
||||||
std::unique_ptr<std::map<std::string, std::string>> headersCopy(
|
|
||||||
new std::map<std::string, std::string>(*headers));
|
|
||||||
bodyBuilder.clear();
|
bodyBuilder.clear();
|
||||||
bodyBuilder.openObject();
|
bodyBuilder.openObject();
|
||||||
bodyBuilder.add("collection", VPackValue(shard.first));
|
bodyBuilder.add("collection", VPackValue(shard.first));
|
||||||
|
@ -1500,18 +1518,21 @@ int getFilteredDocumentsOnCoordinator(
|
||||||
bodyBuilder.close(); // Object
|
bodyBuilder.close(); // Object
|
||||||
|
|
||||||
auto bodyString = std::make_shared<std::string>(bodyBuilder.toJson());
|
auto bodyString = std::make_shared<std::string>(bodyBuilder.toJson());
|
||||||
|
requests.emplace_back("shard:" + shard.first,
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + shard.first,
|
arangodb::GeneralRequest::RequestType::PUT,
|
||||||
arangodb::GeneralRequest::RequestType::PUT,
|
"/_db/" + StringUtils::urlEncode(dbname) +
|
||||||
"/_db/" + StringUtils::urlEncode(dbname) +
|
"/_api/simple/lookup-by-keys",
|
||||||
"/_api/simple/lookup-by-keys",
|
bodyString);
|
||||||
bodyString, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
// All requests send, now collect results.
|
|
||||||
for (size_t i = 0; i < shardRequestMap.size(); ++i) {
|
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
|
||||||
if (res.status == CL_COMM_RECEIVED) {
|
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
|
// All requests send, now collect results.
|
||||||
|
for (auto const& req : requests) {
|
||||||
|
auto res = req.result;
|
||||||
|
if (res.status == CL_COMM_RECEIVED) {
|
||||||
std::shared_ptr<VPackBuilder> resultBody = res.answer->toVelocyPack(&VPackOptions::Defaults);
|
std::shared_ptr<VPackBuilder> resultBody = res.answer->toVelocyPack(&VPackOptions::Defaults);
|
||||||
VPackSlice resSlice = resultBody->slice();
|
VPackSlice resSlice = resultBody->slice();
|
||||||
|
|
||||||
|
@ -1783,26 +1804,23 @@ int modifyDocumentOnCoordinator(
|
||||||
optsUrlPart += "&returnOld=true";
|
optsUrlPart += "&returnOld=true";
|
||||||
}
|
}
|
||||||
|
|
||||||
CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
|
||||||
if (canUseFastPath) {
|
if (canUseFastPath) {
|
||||||
// All shard keys are known in all documents.
|
// All shard keys are known in all documents.
|
||||||
// Contact all shards directly with the correct information.
|
// Contact all shards directly with the correct information.
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
VPackBuilder reqBuilder;
|
VPackBuilder reqBuilder;
|
||||||
auto body = std::make_shared<std::string>();
|
auto body = std::make_shared<std::string>();
|
||||||
for (auto const& it : shardMap) {
|
for (auto const& it : shardMap) {
|
||||||
auto headersCopy =
|
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
TRI_ASSERT(it.second.size() == 1);
|
TRI_ASSERT(it.second.size() == 1);
|
||||||
body = std::make_shared<std::string>(slice.toJson());
|
body = std::make_shared<std::string>(slice.toJson());
|
||||||
|
|
||||||
// We send to single endpoint
|
// We send to single endpoint
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + it.first, reqType,
|
requests.emplace_back(
|
||||||
baseUrl + StringUtils::urlEncode(it.first) + "/" +
|
"shard:" + it.first, reqType,
|
||||||
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString() +
|
baseUrl + StringUtils::urlEncode(it.first) + "/" +
|
||||||
optsUrlPart,
|
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString() + optsUrlPart,
|
||||||
body, headersCopy, nullptr, 60.0);
|
body);
|
||||||
} else {
|
} else {
|
||||||
reqBuilder.clear();
|
reqBuilder.clear();
|
||||||
reqBuilder.openArray();
|
reqBuilder.openArray();
|
||||||
|
@ -1812,15 +1830,20 @@ int modifyDocumentOnCoordinator(
|
||||||
reqBuilder.close();
|
reqBuilder.close();
|
||||||
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
||||||
// We send to Babies endpoint
|
// We send to Babies endpoint
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + it.first, reqType,
|
requests.emplace_back(
|
||||||
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
|
"shard:" + it.first, reqType,
|
||||||
body, headersCopy, nullptr, 60.0);
|
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
TRI_ASSERT(requests.size() == 1);
|
||||||
|
auto res = requests[0].result;
|
||||||
|
|
||||||
int commError = handleGeneralCommErrors(&res);
|
int commError = handleGeneralCommErrors(&res);
|
||||||
if (commError != TRI_ERROR_NO_ERROR) {
|
if (commError != TRI_ERROR_NO_ERROR) {
|
||||||
|
@ -1836,7 +1859,7 @@ int modifyDocumentOnCoordinator(
|
||||||
|
|
||||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
||||||
collectResultsFromAllShards<VPackValueLength>(
|
collectResultsFromAllShards<VPackValueLength>(
|
||||||
shardMap, cc, coordTransactionID, errorCounter, resultMap, responseCode);
|
shardMap, requests, errorCounter, resultMap, responseCode);
|
||||||
|
|
||||||
mergeResults(reverseMapping, resultMap, resultBody);
|
mergeResults(reverseMapping, resultMap, resultBody);
|
||||||
|
|
||||||
|
@ -1848,35 +1871,36 @@ int modifyDocumentOnCoordinator(
|
||||||
// Not all shard keys are known in all documents.
|
// Not all shard keys are known in all documents.
|
||||||
// We contact all shards with the complete body and ignore NOT_FOUND
|
// We contact all shards with the complete body and ignore NOT_FOUND
|
||||||
|
|
||||||
|
std::vector<ClusterCommRequest> requests;
|
||||||
auto body = std::make_shared<std::string>(slice.toJson());
|
auto body = std::make_shared<std::string>(slice.toJson());
|
||||||
auto shardList = ci->getShardList(collid);
|
auto shardList = ci->getShardList(collid);
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
|
std::string key = slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString();
|
||||||
for (auto const& shard : *shardList) {
|
for (auto const& shard : *shardList) {
|
||||||
auto headersCopy =
|
requests.emplace_back(
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
"shard:" + shard, reqType,
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + shard, reqType,
|
baseUrl + StringUtils::urlEncode(shard) + "/" + key + optsUrlPart,
|
||||||
baseUrl + StringUtils::urlEncode(shard) + "/" +
|
body);
|
||||||
slice.get(TRI_VOC_ATTRIBUTE_KEY).copyString() +
|
|
||||||
optsUrlPart,
|
|
||||||
body, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (auto const& shard : *shardList) {
|
for (auto const& shard : *shardList) {
|
||||||
auto headersCopy =
|
requests.emplace_back(
|
||||||
std::make_unique<std::map<std::string, std::string>>(*headers);
|
"shard:" + shard, reqType,
|
||||||
cc->asyncRequest("", coordTransactionID, "shard:" + shard, reqType,
|
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, body);
|
||||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
|
||||||
body, headersCopy, nullptr, 60.0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform the requests
|
||||||
|
size_t nrDone = 0;
|
||||||
|
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, nrDone, Logger::REQUESTS);
|
||||||
|
|
||||||
// Now listen to the results:
|
// Now listen to the results:
|
||||||
if (!useMultiple) {
|
if (!useMultiple) {
|
||||||
// Only one can answer, we react a bit differently
|
// Only one can answer, we react a bit differently
|
||||||
int count;
|
|
||||||
int nrok = 0;
|
int nrok = 0;
|
||||||
for (count = (int)shardList->size(); count > 0; count--) {
|
for (size_t count = shardList->size(); count > 0; count--) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto const& req = requests[count - 1];
|
||||||
|
auto res = req.result;
|
||||||
if (res.status == CL_COMM_RECEIVED) {
|
if (res.status == CL_COMM_RECEIVED) {
|
||||||
if (res.answer_code !=
|
if (res.answer_code !=
|
||||||
arangodb::GeneralResponse::ResponseCode::NOT_FOUND ||
|
arangodb::GeneralResponse::ResponseCode::NOT_FOUND ||
|
||||||
|
@ -1895,19 +1919,17 @@ int modifyDocumentOnCoordinator(
|
||||||
}
|
}
|
||||||
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
|
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
|
||||||
// the DBserver could have reported an error.
|
// the DBserver could have reported an error.
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
responseCode = GeneralResponse::ResponseCode::SERVER_ERROR;
|
||||||
// We select all results from all shards an merge them back again.
|
// We select all results from all shards an merge them back again.
|
||||||
std::vector<std::shared_ptr<VPackBuilder>> allResults;
|
std::vector<std::shared_ptr<VPackBuilder>> allResults;
|
||||||
allResults.reserve(shardList->size());
|
allResults.reserve(requests.size());
|
||||||
for (size_t i = 0; i < shardList->size(); ++i) {
|
for (auto const& req : requests) {
|
||||||
auto res = cc->wait("", coordTransactionID, 0, "", 0.0);
|
auto res = req.result;
|
||||||
int error = handleGeneralCommErrors(&res);
|
int error = handleGeneralCommErrors(&res);
|
||||||
if (error != TRI_ERROR_NO_ERROR) {
|
if (error != TRI_ERROR_NO_ERROR) {
|
||||||
// Cluster is in bad state. Just report. Drop other results.
|
// Cluster is in bad state. Just report.
|
||||||
cc->drop("", coordTransactionID, 0, "");
|
|
||||||
// Local data structores are automatically freed
|
// Local data structores are automatically freed
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,6 @@ int countOnCoordinator(std::string const& dbname, std::string const& collname,
|
||||||
int createDocumentOnCoordinator(
|
int createDocumentOnCoordinator(
|
||||||
std::string const& dbname, std::string const& collname,
|
std::string const& dbname, std::string const& collname,
|
||||||
OperationOptions const& options, arangodb::velocypack::Slice const& slice,
|
OperationOptions const& options, arangodb::velocypack::Slice const& slice,
|
||||||
std::map<std::string, std::string> const& headers,
|
|
||||||
arangodb::GeneralResponse::ResponseCode& responseCode,
|
arangodb::GeneralResponse::ResponseCode& responseCode,
|
||||||
std::unordered_map<int, size_t>& errorCounters,
|
std::unordered_map<int, size_t>& errorCounters,
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>& resultBody);
|
std::shared_ptr<arangodb::velocypack::Builder>& resultBody);
|
||||||
|
@ -110,7 +109,6 @@ int createDocumentOnCoordinator(
|
||||||
int deleteDocumentOnCoordinator(
|
int deleteDocumentOnCoordinator(
|
||||||
std::string const& dbname, std::string const& collname,
|
std::string const& dbname, std::string const& collname,
|
||||||
VPackSlice const slice, OperationOptions const& options,
|
VPackSlice const slice, OperationOptions const& options,
|
||||||
std::unique_ptr<std::map<std::string, std::string>>& headers,
|
|
||||||
arangodb::GeneralResponse::ResponseCode& responseCode,
|
arangodb::GeneralResponse::ResponseCode& responseCode,
|
||||||
std::unordered_map<int, size_t>& errorCounters,
|
std::unordered_map<int, size_t>& errorCounters,
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>& resultBody);
|
std::shared_ptr<arangodb::velocypack::Builder>& resultBody);
|
||||||
|
@ -137,7 +135,6 @@ int getDocumentOnCoordinator(
|
||||||
int getFilteredDocumentsOnCoordinator(
|
int getFilteredDocumentsOnCoordinator(
|
||||||
std::string const& dbname,
|
std::string const& dbname,
|
||||||
std::vector<traverser::TraverserExpression*> const& expressions,
|
std::vector<traverser::TraverserExpression*> const& expressions,
|
||||||
std::unique_ptr<std::map<std::string, std::string>>& headers,
|
|
||||||
std::unordered_set<std::string>& documentIds,
|
std::unordered_set<std::string>& documentIds,
|
||||||
std::unordered_map<std::string, std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>>>& result);
|
std::unordered_map<std::string, std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>>>& result);
|
||||||
|
|
||||||
|
|
|
@ -240,8 +240,6 @@ void ClusterTraverser::setStartVertex(std::string const& id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterTraverser::fetchVertices(std::unordered_set<std::string>& verticesToFetch, size_t depth) {
|
void ClusterTraverser::fetchVertices(std::unordered_set<std::string>& verticesToFetch, size_t depth) {
|
||||||
std::unique_ptr<std::map<std::string, std::string>> headers(
|
|
||||||
new std::map<std::string, std::string>());
|
|
||||||
_readDocuments += verticesToFetch.size();
|
_readDocuments += verticesToFetch.size();
|
||||||
|
|
||||||
std::vector<TraverserExpression*> expVertices;
|
std::vector<TraverserExpression*> expVertices;
|
||||||
|
@ -250,7 +248,7 @@ void ClusterTraverser::fetchVertices(std::unordered_set<std::string>& verticesTo
|
||||||
expVertices = found->second;
|
expVertices = found->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
int res = getFilteredDocumentsOnCoordinator(_dbname, expVertices, headers,
|
int res = getFilteredDocumentsOnCoordinator(_dbname, expVertices,
|
||||||
verticesToFetch, _vertices);
|
verticesToFetch, _vertices);
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
THROW_ARANGO_EXCEPTION(res);
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
|
|
|
@ -1148,14 +1148,13 @@ OperationResult Transaction::insertCoordinator(std::string const& collectionName
|
||||||
VPackSlice const value,
|
VPackSlice const value,
|
||||||
OperationOptions& options) {
|
OperationOptions& options) {
|
||||||
|
|
||||||
std::map<std::string, std::string> headers;
|
|
||||||
GeneralResponse::ResponseCode responseCode;
|
GeneralResponse::ResponseCode responseCode;
|
||||||
|
|
||||||
std::unordered_map<int, size_t> errorCounter;
|
std::unordered_map<int, size_t> errorCounter;
|
||||||
auto resultBody = std::make_shared<VPackBuilder>();
|
auto resultBody = std::make_shared<VPackBuilder>();
|
||||||
|
|
||||||
int res = arangodb::createDocumentOnCoordinator(
|
int res = arangodb::createDocumentOnCoordinator(
|
||||||
_vocbase->_name, collectionName, options, value, headers, responseCode,
|
_vocbase->_name, collectionName, options, value, responseCode,
|
||||||
errorCounter, resultBody);
|
errorCounter, resultBody);
|
||||||
|
|
||||||
if (res == TRI_ERROR_NO_ERROR) {
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
@ -1717,14 +1716,13 @@ OperationResult Transaction::removeCoordinator(std::string const& collectionName
|
||||||
VPackSlice const value,
|
VPackSlice const value,
|
||||||
OperationOptions& options) {
|
OperationOptions& options) {
|
||||||
|
|
||||||
auto headers = std::make_unique<std::map<std::string, std::string>>();
|
|
||||||
GeneralResponse::ResponseCode responseCode;
|
GeneralResponse::ResponseCode responseCode;
|
||||||
std::unordered_map<int, size_t> errorCounter;
|
std::unordered_map<int, size_t> errorCounter;
|
||||||
auto resultBody = std::make_shared<VPackBuilder>();
|
auto resultBody = std::make_shared<VPackBuilder>();
|
||||||
|
|
||||||
int res = arangodb::deleteDocumentOnCoordinator(
|
int res = arangodb::deleteDocumentOnCoordinator(
|
||||||
_vocbase->_name, collectionName, value, options, headers,
|
_vocbase->_name, collectionName, value, options, responseCode,
|
||||||
responseCode, errorCounter, resultBody);
|
errorCounter, resultBody);
|
||||||
|
|
||||||
if (res == TRI_ERROR_NO_ERROR) {
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
if (responseCode == GeneralResponse::ResponseCode::OK ||
|
if (responseCode == GeneralResponse::ResponseCode::OK ||
|
||||||
|
|
Loading…
Reference in New Issue