1
0
Fork 0

Make count, figures, revision, and index warmup use non-blocking communication (#10048)

This commit is contained in:
Dan Larkin-York 2019-09-27 03:54:01 -04:00 committed by Jan
parent 176cff3eac
commit dc23896a01
16 changed files with 579 additions and 452 deletions

View File

@ -115,74 +115,73 @@ T addFigures(VPackSlice const& v1, VPackSlice const& v2,
return value;
}
void recursiveAdd(VPackSlice const& value, std::shared_ptr<VPackBuilder>& builder) {
void recursiveAdd(VPackSlice const& value, VPackBuilder& builder) {
TRI_ASSERT(value.isObject());
TRI_ASSERT(builder->slice().isObject());
TRI_ASSERT(builder->isClosed());
TRI_ASSERT(builder.slice().isObject());
TRI_ASSERT(builder.isClosed());
VPackBuilder updated;
updated.openObject();
updated.add("alive", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"alive", "count"})));
updated.add("size", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("size", VPackValue(addFigures<size_t>(value, builder.slice(),
{"alive", "size"})));
updated.close();
updated.add("dead", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"dead", "count"})));
updated.add("size", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("size", VPackValue(addFigures<size_t>(value, builder.slice(),
{"dead", "size"})));
updated.add("deletion", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("deletion", VPackValue(addFigures<size_t>(value, builder.slice(),
{"dead", "deletion"})));
updated.close();
updated.add("indexes", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"indexes", "count"})));
updated.add("size", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("size", VPackValue(addFigures<size_t>(value, builder.slice(),
{"indexes", "size"})));
updated.close();
updated.add("datafiles", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"datafiles", "count"})));
updated.add("fileSize",
VPackValue(addFigures<size_t>(value, builder->slice(),
VPackValue(addFigures<size_t>(value, builder.slice(),
{"datafiles", "fileSize"})));
updated.close();
updated.add("journals", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"journals", "count"})));
updated.add("fileSize",
VPackValue(addFigures<size_t>(value, builder->slice(),
VPackValue(addFigures<size_t>(value, builder.slice(),
{"journals", "fileSize"})));
updated.close();
updated.add("compactors", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder->slice(),
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"compactors", "count"})));
updated.add("fileSize",
VPackValue(addFigures<size_t>(value, builder->slice(),
VPackValue(addFigures<size_t>(value, builder.slice(),
{"compactors", "fileSize"})));
updated.close();
updated.add("documentReferences",
VPackValue(addFigures<size_t>(value, builder->slice(), {"documentReferences"})));
VPackValue(addFigures<size_t>(value, builder.slice(), {"documentReferences"})));
updated.close();
TRI_ASSERT(updated.slice().isObject());
TRI_ASSERT(updated.isClosed());
builder.reset(new VPackBuilder(
VPackCollection::merge(builder->slice(), updated.slice(), true, false)));
TRI_ASSERT(builder->slice().isObject());
TRI_ASSERT(builder->isClosed());
builder = VPackCollection::merge(builder.slice(), updated.slice(), true, false);
TRI_ASSERT(builder.slice().isObject());
TRI_ASSERT(builder.isClosed());
}
/// @brief begin a transaction on some leader shards
@ -245,6 +244,93 @@ void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap cons
"couldnt find shard in shardMap");
}
}
/// @brief Collect the results from all shards (fastpath variant)
/// All result bodies are stored in resultMap
template <typename T>
static void collectResponsesFromAllShards(
std::map<ShardID, std::vector<T>> const& shardMap,
std::vector<futures::Try<arangodb::network::Response>>& responses,
std::unordered_map<int, size_t>& errorCounter,
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>>& resultMap,
fuerte::StatusCode& code) {
// If none of the shards responds we return a SERVER_ERROR;
code = fuerte::StatusInternalError;
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
auto tmpBuilder = std::make_shared<VPackBuilder>();
// If there was no answer whatsoever, we cannot rely on the shardId
// being present in the result struct:
auto weSend = shardMap.find(sId);
TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
tmpBuilder->openObject();
tmpBuilder->add(StaticStrings::Error, VPackValue(true));
tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError));
tmpBuilder->close();
}
resultMap.emplace(sId, std::move(tmpBuilder));
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
auto tmpBuilder = std::make_shared<VPackBuilder>();
if (!slices.empty()) {
tmpBuilder->add(slices[0]);
}
resultMap.emplace(sId, std::move(tmpBuilder));
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
}
/// @brief iterate over shard responses and compile a result
/// This will take care of checking the fuerte responses. If the response has
/// a body, then the callback will be called on the body, with access to the
/// result-so-far. In particular, a VPackBuilder is initialized to empty before
/// handling any response. It will be passed to the pre callback (default noop)
/// for initialization, then it will be passed to each instantiation of the
/// handler callback, reduce-style. Finally, it will be passed to the post
/// callback and then returned via the OperationResult.
OperationResult handleResponsesFromAllShards(
std::vector<futures::Try<arangodb::network::Response>>& responses,
std::function<void(Result&, VPackBuilder&, ShardID&, VPackSlice)> handler,
std::function<void(Result&, VPackBuilder&)> pre = [](Result&, VPackBuilder&) -> void {},
std::function<void(Result&, VPackBuilder&)> post = [](Result&, VPackBuilder&) -> void {}) {
// If none of the shards responds we return a SERVER_ERROR;
Result result;
VPackBuilder builder;
pre(result, builder);
if (result.fail()) {
return OperationResult(result, builder.steal());
}
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
result.reset(commError);
break;
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
if (!slices.empty()) {
VPackSlice answer = slices[0];
handler(result, builder, sId, answer);
if (result.fail()) {
break;
}
}
}
}
post(result, builder);
return OperationResult(result, builder.steal());
}
} // namespace
namespace arangodb {
@ -821,206 +907,212 @@ bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice c
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////
int revisionOnCoordinator(ClusterFeature& feature, std::string const& dbname,
std::string const& collname, TRI_voc_rid_t& rid) {
futures::Future<OperationResult> revisionOnCoordinator(ClusterFeature& feature,
std::string const& dbname,
std::string const& collname) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return TRI_ERROR_SHUTTING_DOWN;
return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN));
}
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, collname);
if (collinfo == nullptr) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
rid = 0;
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
std::unordered_map<std::string, std::string> headers;
auto& network = feature.server().getFeature<NetworkFeature>();
for (auto const& p : *shards) {
cc->asyncRequest(coordTransactionID, "shard:" + p.first,
arangodb::rest::RequestType::GET,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" + StringUtils::urlEncode(p.first) +
"/revision",
std::shared_ptr<std::string const>(), headers, nullptr, 300.0);
// handler expects valid velocypack body (empty object minimum)
network::Headers headers;
auto future =
network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/revision",
VPackBuffer<uint8_t>(), network::Timeout(300.0), headers);
futures.emplace_back(std::move(future));
}
// Now listen to the results:
int count;
int nrok = 0;
for (count = (int)shards->size(); count > 0; count--) {
auto res = cc->wait(coordTransactionID, 0, "", 0.0);
if (res.status == CL_COMM_RECEIVED) {
if (res.answer_code == arangodb::rest::ResponseCode::OK) {
VPackSlice answer = res.answer->payload();
auto cb = [](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleResponsesFromAllShards(
results, [](Result& result, VPackBuilder& builder, ShardID&, VPackSlice answer) -> void {
if (answer.isObject()) {
VPackSlice r = answer.get("revision");
if (r.isString()) {
VPackValueLength len;
char const* p = r.getString(len);
TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false);
if (answer.isObject()) {
VPackSlice r = answer.get("revision");
if (r.isString()) {
VPackValueLength len;
char const* p = r.getString(len);
TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false);
if (cmp != UINT64_MAX && cmp > rid) {
// get the maximum value
rid = cmp;
TRI_voc_rid_t rid = builder.slice().isNumber()
? builder.slice().getNumber<TRI_voc_rid_t>()
: 0;
if (cmp != UINT64_MAX && cmp > rid) {
// get the maximum value
builder.clear();
builder.add(VPackValue(cmp));
}
}
} else {
// didn't get the expected response
result.reset(TRI_ERROR_INTERNAL);
}
nrok++;
}
}
}
}
if (nrok != (int)shards->size()) {
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
// the DBserver could have reported an error.
});
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
int warmupOnCoordinator(ClusterFeature& feature, std::string const& dbname,
std::string const& cid) {
futures::Future<Result> warmupOnCoordinator(ClusterFeature& feature,
std::string const& dbname,
std::string const& cid) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return TRI_ERROR_SHUTTING_DOWN;
return futures::makeFuture(Result(TRI_ERROR_SHUTTING_DOWN));
}
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, cid);
if (collinfo == nullptr) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
return futures::makeFuture(Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::unordered_map<std::string, std::string> headers;
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto& network = feature.server().getFeature<NetworkFeature>();
for (auto const& p : *shards) {
cc->asyncRequest(coordTransactionID, "shard:" + p.first,
arangodb::rest::RequestType::GET,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" + StringUtils::urlEncode(p.first) +
"/loadIndexesIntoMemory",
std::shared_ptr<std::string const>(), headers, nullptr, 300.0);
// handler expects valid velocypack body (empty object minimum)
VPackBuffer<uint8_t> buffer;
buffer.append(VPackSlice::emptyObjectSlice().begin(), 1);
network::Headers headers;
auto future =
network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" + StringUtils::urlEncode(p.first) +
"/loadIndexesIntoMemory",
std::move(buffer), network::Timeout(300.0), headers);
futures.emplace_back(std::move(future));
}
// Now listen to the results:
// Well actually we don't care...
int count;
for (count = (int)shards->size(); count > 0; count--) {
auto res = cc->wait(coordTransactionID, 0, "", 0.0);
}
return TRI_ERROR_NO_ERROR;
auto cb = [](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleResponsesFromAllShards(results,
[](Result&, VPackBuilder&, ShardID&, VPackSlice) -> void {
// we don't care about response bodies, just that the requests succeeded
});
};
return futures::collectAll(std::move(futures))
.thenValue(std::move(cb))
.thenValue([](OperationResult&& opRes) -> Result { return opRes.result; });
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////
int figuresOnCoordinator(ClusterFeature& feature, std::string const& dbname,
std::string const& collname,
std::shared_ptr<arangodb::velocypack::Builder>& result) {
futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature& feature,
std::string const& dbname,
std::string const& collname) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return TRI_ERROR_SHUTTING_DOWN;
return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN));
}
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, collname);
if (collinfo == nullptr) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
std::unordered_map<std::string, std::string> headers;
auto& network = feature.server().getFeature<NetworkFeature>();
for (auto const& p : *shards) {
cc->asyncRequest(coordTransactionID, "shard:" + p.first,
arangodb::rest::RequestType::GET,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" + StringUtils::urlEncode(p.first) +
"/figures",
std::shared_ptr<std::string const>(), headers, nullptr, 300.0);
// handler expects valid velocypack body (empty object minimum)
network::Headers headers;
auto future =
network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/figures",
VPackBuffer<uint8_t>(), network::Timeout(300.0), headers);
futures.emplace_back(std::move(future));
}
// Now listen to the results:
int count;
int nrok = 0;
for (count = (int)shards->size(); count > 0; count--) {
auto res = cc->wait(coordTransactionID, 0, "", 0.0);
if (res.status == CL_COMM_RECEIVED) {
if (res.answer_code == arangodb::rest::ResponseCode::OK) {
VPackSlice answer = res.answer->payload();
if (answer.isObject()) {
VPackSlice figures = answer.get("figures");
if (figures.isObject()) {
// add to the total
recursiveAdd(figures, result);
}
nrok++;
auto cb = [](std::vector<Try<network::Response>>&& results) mutable -> OperationResult {
auto handler = [](Result& result, VPackBuilder& builder, ShardID&,
VPackSlice answer) mutable -> void {
if (answer.isObject()) {
VPackSlice figures = answer.get("figures");
if (figures.isObject()) {
// add to the total
recursiveAdd(figures, builder);
}
} else {
// didn't get the expected response
result.reset(TRI_ERROR_INTERNAL);
}
}
}
if (nrok != (int)shards->size()) {
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
// the DBserver could have reported an error.
};
auto pre = [](Result&, VPackBuilder& builder) -> void {
// initialize to empty object
builder.openObject();
builder.close();
};
return handleResponsesFromAllShards(results, handler, pre);
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief counts number of documents in a coordinator, by shard
////////////////////////////////////////////////////////////////////////////////
int countOnCoordinator(transaction::Methods& trx, std::string const& cname,
std::vector<std::pair<std::string, uint64_t>>& result) {
futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& cname) {
std::vector<std::pair<std::string, uint64_t>> result;
// Set a few variables needed for our work:
ClusterFeature& feature = trx.vocbase().server().getFeature<ClusterFeature>();
ClusterInfo& ci = feature.clusterInfo();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return TRI_ERROR_SHUTTING_DOWN;
return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN));
}
result.clear();
std::string const& dbname = trx.vocbase().name();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, cname);
if (collinfo == nullptr) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
@ -1028,47 +1120,55 @@ int countOnCoordinator(transaction::Methods& trx, std::string const& cname,
if (isManaged) {
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
if (res.fail()) {
return res.errorNumber();
return futures::makeFuture(OperationResult(res));
}
}
std::vector<ClusterCommRequest> requests;
auto body = std::make_shared<std::string>();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
auto& network = trx.vocbase().server().getFeature<NetworkFeature>();
for (std::pair<ShardID, std::vector<ServerID>> const& p : *shardIds) {
auto headers = std::make_unique<std::unordered_map<std::string, std::string>>();
ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], *headers);
requests.emplace_back("shard:" + p.first, arangodb::rest::RequestType::GET,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/count",
body, std::move(headers));
network::Headers headers;
ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], headers);
auto future =
network::sendRequestRetry(network, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/count",
VPackBuffer<uint8_t>(),
network::Timeout(CL_DEFAULT_TIMEOUT), headers, true);
futures.emplace_back(std::move(future));
}
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::QUERIES,
/*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged);
for (auto& req : requests) {
auto& res = req.result;
if (res.status == CL_COMM_RECEIVED) {
if (res.answer_code == arangodb::rest::ResponseCode::OK) {
VPackSlice answer = res.answer->payload();
if (answer.isObject()) {
// add to the total
result.emplace_back(res.shardID,
arangodb::basics::VelocyPackHelper::getNumericValue<uint64_t>(
answer, "count", 0));
} else {
return TRI_ERROR_INTERNAL;
}
auto cb = [](std::vector<Try<network::Response>>&& results) mutable -> OperationResult {
auto handler = [](Result& result, VPackBuilder& builder, ShardID& shardId,
VPackSlice answer) mutable -> void {
if (answer.isObject()) {
// add to the total
VPackArrayBuilder array(&builder);
array->add(VPackValue(shardId));
array->add(VPackValue(arangodb::basics::VelocyPackHelper::getNumericValue<uint64_t>(
answer, "count", 0)));
} else {
return static_cast<int>(res.answer_code);
// didn't get the expected response
result.reset(TRI_ERROR_INTERNAL);
}
} else {
return handleGeneralCommErrors(&req.result);
}
}
return TRI_ERROR_NO_ERROR;
};
auto pre = [](Result&, VPackBuilder& builder) -> void {
builder.openArray();
};
auto post = [](Result& result, VPackBuilder& builder) -> void {
if (builder.isOpenArray()) {
builder.close();
} else {
result.reset(TRI_ERROR_INTERNAL, "result was corrupted");
builder.clear();
}
};
return handleResponsesFromAllShards(results, handler, pre, post);
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
////////////////////////////////////////////////////////////////////////////////
@ -1171,78 +1271,6 @@ int selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string const
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Collect the results from all shards (fastpath variant)
/// All result bodies are stored in resultMap
////////////////////////////////////////////////////////////////////////////////
template <typename T>
static void collectResponsesFromAllShards(
std::map<ShardID, std::vector<T>> const& shardMap,
std::vector<futures::Try<arangodb::network::Response>>& responses,
std::unordered_map<int, size_t>& errorCounter,
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>>& resultMap,
fuerte::StatusCode& code) {
// If none of the shards responds we return a SERVER_ERROR;
code = fuerte::StatusInternalError;
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
auto tmpBuilder = std::make_shared<VPackBuilder>();
// If there was no answer whatsoever, we cannot rely on the shardId
// being present in the result struct:
auto weSend = shardMap.find(sId);
TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
tmpBuilder->openObject();
tmpBuilder->add(StaticStrings::Error, VPackValue(true));
tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError));
tmpBuilder->close();
}
resultMap.emplace(sId, std::move(tmpBuilder));
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
auto tmpBuilder = std::make_shared<VPackBuilder>();
if (!slices.empty()) {
tmpBuilder->add(slices[0]);
}
resultMap.emplace(sId, std::move(tmpBuilder));
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
}
static OperationResult checkResponsesFromAllShards(
std::vector<futures::Try<arangodb::network::Response>>& responses) {
// If none of the shards responds we return a SERVER_ERROR;
Result result;
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
result.reset(commError);
break;
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
if (!slices.empty()) {
VPackSlice answer = slices[0];
if (VelocyPackHelper::readBooleanValue(answer, StaticStrings::Error, false)) {
result = network::resultFromBody(answer, TRI_ERROR_NO_ERROR);
}
}
}
}
return OperationResult(result);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates one or many documents in a coordinator
///
@ -1601,7 +1629,12 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(transaction::Me
}
auto cb = [](std::vector<Try<network::Response>>&& results) -> OperationResult {
return checkResponsesFromAllShards(results);
return handleResponsesFromAllShards(
results, [](Result& result, VPackBuilder&, ShardID&, VPackSlice answer) -> void {
if (VelocyPackHelper::readBooleanValue(answer, StaticStrings::Error, false)) {
result = network::resultFromBody(answer, TRI_ERROR_NO_ERROR);
}
});
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}

View File

@ -75,29 +75,32 @@ bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice c
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////
int revisionOnCoordinator(ClusterFeature&, std::string const& dbname,
std::string const& collname, TRI_voc_rid_t&);
futures::Future<OperationResult> revisionOnCoordinator(ClusterFeature&,
std::string const& dbname,
std::string const& collname);
////////////////////////////////////////////////////////////////////////////////
/// @brief Warmup index caches on Shards
////////////////////////////////////////////////////////////////////////////////
int warmupOnCoordinator(ClusterFeature&, std::string const& dbname, std::string const& cid);
futures::Future<Result> warmupOnCoordinator(ClusterFeature&,
std::string const& dbname,
std::string const& cid);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////
int figuresOnCoordinator(ClusterFeature&, std::string const& dbname,
std::string const& collname,
std::shared_ptr<arangodb::velocypack::Builder>&);
futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature&,
std::string const& dbname,
std::string const& collname);
////////////////////////////////////////////////////////////////////////////////
/// @brief counts number of documents in a coordinator, by shard
////////////////////////////////////////////////////////////////////////////////
int countOnCoordinator(transaction::Methods& trx, std::string const& collname,
std::vector<std::pair<std::string, uint64_t>>& result);
futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& collname);
////////////////////////////////////////////////////////////////////////////////
/// @brief gets the selectivity estimates from DBservers

View File

@ -30,6 +30,7 @@
#include "Cluster/ClusterMethods.h"
#include "ClusterEngine/ClusterEngine.h"
#include "ClusterEngine/ClusterIndex.h"
#include "Futures/Utilities.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "MMFiles/MMFilesCollection.h"
@ -250,20 +251,16 @@ void ClusterCollection::getPropertiesVPack(velocypack::Builder& result) const {
}
/// @brief return the figures for a collection
std::shared_ptr<VPackBuilder> ClusterCollection::figures() {
auto builder = std::make_shared<VPackBuilder>();
builder->openObject();
builder->close();
futures::Future<std::shared_ptr<VPackBuilder>> ClusterCollection::figures() {
auto& feature = _logicalCollection.vocbase().server().getFeature<ClusterFeature>();
auto res = figuresOnCoordinator(feature, _logicalCollection.vocbase().name(),
std::to_string(_logicalCollection.id()), builder);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
return builder;
return figuresOnCoordinator(feature, _logicalCollection.vocbase().name(),
std::to_string(_logicalCollection.id()))
.thenValue([](OperationResult&& opRes) -> std::shared_ptr<VPackBuilder> {
if (opRes.fail()) {
THROW_ARANGO_EXCEPTION(opRes.result);
}
return std::make_shared<VPackBuilder>(opRes.buffer);
});
}
void ClusterCollection::figuresSpecific(std::shared_ptr<arangodb::velocypack::Builder>& builder) {

View File

@ -81,7 +81,7 @@ class ClusterCollection final : public PhysicalCollection {
void getPropertiesVPack(velocypack::Builder&) const override;
/// @brief return the figures for a collection
std::shared_ptr<velocypack::Builder> figures() override;
futures::Future<std::shared_ptr<velocypack::Builder>> figures() override;
/// @brief closes an open collection
int close() override;

View File

@ -183,10 +183,9 @@ int fuerteToArangoErrorCode(fuerte::Error err) {
// returns TRI_ERROR_NO_ERROR.
// If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED
// and .answer can safely be inspected.
// LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, res.error != fuerte::Error::NoError) << fuerte::to_string(res.error);
// LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, err != fuerte::Error::NoError) << fuerte::to_string(err);
switch (err) {
case fuerte::Error::NoError:
return TRI_ERROR_NO_ERROR;

View File

@ -27,6 +27,7 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "Logger/LogMacros.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/PhysicalCollection.h"
@ -55,8 +56,7 @@ RestCollectionHandler::RestCollectionHandler(application_features::ApplicationSe
RestStatus RestCollectionHandler::execute() {
switch (_request->requestType()) {
case rest::RequestType::GET:
handleCommandGet();
break;
return handleCommandGet();
case rest::RequestType::POST:
handleCommandPost();
break;
@ -74,20 +74,21 @@ RestStatus RestCollectionHandler::execute() {
void RestCollectionHandler::shutdownExecute(bool isFinalized) noexcept {
if (isFinalized) {
// reset the transaction so it releases all locks as early as possible
// reset the transactions so they release all locks as early as possible
_activeTrx.reset();
_ctxt.reset();
}
}
void RestCollectionHandler::handleCommandGet() {
RestStatus RestCollectionHandler::handleCommandGet() {
RestStatus status = RestStatus::DONE;
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
VPackBuilder builder;
// /_api/collection
if (suffixes.empty()) {
bool excludeSystem = _request->parsedValue("excludeSystem", false);
builder.openArray();
_builder.openArray();
methods::Collections::enumerate(&_vocbase, [&](std::shared_ptr<LogicalCollection> const& coll) -> void {
TRI_ASSERT(coll);
bool canUse = ExecContext::current().canUseCollection(coll->name(), auth::Level::RO);
@ -96,41 +97,42 @@ void RestCollectionHandler::handleCommandGet() {
// We do not need a transaction here
methods::Collections::Context ctxt(_vocbase, *coll);
collectionRepresentation(builder, ctxt,
collectionRepresentation(ctxt,
/*showProperties*/ false,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ false);
}
});
builder.close();
generateOk(rest::ResponseCode::OK, builder.slice());
_builder.close();
generateOk(rest::ResponseCode::OK, _builder.slice());
return;
return status;
}
std::string const& name = suffixes[0];
// /_api/collection/<name>
if (suffixes.size() == 1) {
try {
collectionRepresentation(builder, name, /*showProperties*/ false,
collectionRepresentation(name, /*showProperties*/ false,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ false);
generateOk(rest::ResponseCode::OK, builder);
generateOk(rest::ResponseCode::OK, _builder);
} catch (basics::Exception const& ex) { // do not log not found exceptions
generateError(GeneralResponse::responseCode(ex.code()), ex.code(), ex.what());
}
return;
return status;
}
if (suffixes.size() > 2) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"expect GET /_api/collection/<collection-name>/<method>");
return;
return status;
}
std::string const& sub = suffixes[1];
bool skipGenerate = false;
_builder.clear();
auto found = methods::Collections::lookup( // find collection
_vocbase, // vocbase to search
name, // collection name to find
@ -148,7 +150,7 @@ void RestCollectionHandler::handleCommandGet() {
auto result = coll->checksum(withRevisions, withData);
if (result.ok()) {
VPackObjectBuilder obj(&builder, true);
VPackObjectBuilder obj(&_builder, true);
obj->add("checksum", result.slice().get("checksum"));
obj->add("revision", result.slice().get("revision"));
@ -156,7 +158,7 @@ void RestCollectionHandler::handleCommandGet() {
// We do not need a transaction here
methods::Collections::Context ctxt(_vocbase, *coll);
collectionRepresentation(builder, *coll,
collectionRepresentation(*coll,
/*showProperties*/ false,
/*showFigures*/ false,
/*showCount*/ false,
@ -167,43 +169,62 @@ void RestCollectionHandler::handleCommandGet() {
}
} else if (sub == "figures") {
// /_api/collection/<identifier>/figures
collectionRepresentation(builder, *coll,
/*showProperties*/ true,
/*showFigures*/ true,
/*showCount*/ true,
/*detailedCount*/ false);
_ctxt = std::make_unique<methods::Collections::Context>(_vocbase, *coll);
skipGenerate = true;
status = waitForFuture(
collectionRepresentationAsync(*_ctxt,
/*showProperties*/ true,
/*showFigures*/ true,
/*showCount*/ true,
/*detailedCount*/ false)
.thenValue([this](futures::Unit&&) { standardResponse(); }));
} else if (sub == "count") {
// /_api/collection/<identifier>/count
initializeTransaction(*coll);
_ctxt = std::make_unique<methods::Collections::Context>(_vocbase, *coll,
_activeTrx.get());
skipGenerate = true;
bool details = _request->parsedValue("details", false);
collectionRepresentation(builder, *coll,
/*showProperties*/ true,
/*showFigures*/ false,
/*showCount*/ true,
/*detailedCount*/ details);
status = waitForFuture(
collectionRepresentationAsync(*_ctxt,
/*showProperties*/ true,
/*showFigures*/ false,
/*showCount*/ true,
/*detailedCount*/ details)
.thenValue([this](futures::Unit&&) { standardResponse(); }));
} else if (sub == "properties") {
// /_api/collection/<identifier>/properties
collectionRepresentation(builder, *coll,
collectionRepresentation(*coll,
/*showProperties*/ true,
/*showFigures*/ false,
/*showCount*/ false,
/*detailedCount*/ true);
} else if (sub == "revision") {
methods::Collections::Context ctxt(_vocbase, *coll);
// /_api/collection/<identifier>/revision
TRI_voc_rid_t revisionId;
auto res = methods::Collections::revisionId(server(), ctxt, revisionId);
skipGenerate = true;
_ctxt = std::make_unique<methods::Collections::Context>(_vocbase, *coll);
status = waitForFuture(methods::Collections::revisionId(*_ctxt).thenValue(
[this, coll](OperationResult&& res) {
if (res.fail()) {
generateTransactionError(res);
return;
}
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
TRI_voc_rid_t rid = res.slice().isNumber()
? res.slice().getNumber<TRI_voc_rid_t>()
: 0;
{
VPackObjectBuilder obj(&_builder, true);
obj->add("revision", VPackValue(StringUtils::itoa(rid)));
VPackObjectBuilder obj(&builder, true);
obj->add("revision", VPackValue(StringUtils::itoa(revisionId)));
collectionRepresentation(builder, ctxt, /*showProperties*/ true,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
// no need to use async variant
collectionRepresentation(*_ctxt, /*showProperties*/ true,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
}
standardResponse();
}));
} else if (sub == "shards") {
// /_api/collection/<identifier>/shards
if (!ServerState::instance()->isRunningInCluster()) {
@ -212,9 +233,9 @@ void RestCollectionHandler::handleCommandGet() {
return;
}
VPackObjectBuilder obj(&builder, true); // need to open object
VPackObjectBuilder obj(&_builder, true); // need to open object
collectionRepresentation(builder, *coll,
collectionRepresentation(*coll,
/*showProperties*/ true,
/*showFigures*/ false,
/*showCount*/ false,
@ -225,7 +246,7 @@ void RestCollectionHandler::handleCommandGet() {
if (_request->parsedValue("details", false)) {
// with details
VPackObjectBuilder arr(&builder, "shards", true);
VPackObjectBuilder arr(&_builder, "shards", true);
for (ShardID const& shard : *shards) {
std::vector<ServerID> servers;
ci.getShardServers(shard, servers);
@ -234,7 +255,7 @@ void RestCollectionHandler::handleCommandGet() {
continue;
}
VPackArrayBuilder arr(&builder, shard);
VPackArrayBuilder arr(&_builder, shard);
for (auto const& server : servers) {
arr->add(VPackValue(server));
@ -242,7 +263,7 @@ void RestCollectionHandler::handleCommandGet() {
}
} else {
// no details
VPackArrayBuilder arr(&builder, "shards", true);
VPackArrayBuilder arr(&_builder, "shards", true);
for (ShardID const& shard : *shards) {
arr->add(VPackValue(shard));
@ -260,14 +281,15 @@ void RestCollectionHandler::handleCommandGet() {
});
if (skipGenerate) {
return;
return status;
}
if (found.ok()) {
generateOk(rest::ResponseCode::OK, builder);
_response->setHeaderNC(StaticStrings::Location, _request->requestPath());
standardResponse();
} else {
generateError(found);
}
return status;
}
// create a collection
@ -337,7 +359,7 @@ void RestCollectionHandler::handleCommandPost() {
// now we can create the collection
std::string const& name = nameSlice.copyString();
VPackBuilder builder;
_builder.clear();
Result res = methods::Collections::create(
_vocbase, // collection vocbase
name, // colection name
@ -348,7 +370,7 @@ void RestCollectionHandler::handleCommandPost() {
false, // new Database?, here always false
[&](std::shared_ptr<LogicalCollection> const& coll) -> void {
TRI_ASSERT(coll);
collectionRepresentation(builder, coll->name(),
collectionRepresentation(coll->name(),
/*showProperties*/ true,
/*showFigures*/ false,
/*showCount*/ false,
@ -356,7 +378,7 @@ void RestCollectionHandler::handleCommandPost() {
});
if (res.ok()) {
generateOk(rest::ResponseCode::OK, builder);
generateOk(rest::ResponseCode::OK, _builder);
} else {
generateError(res);
}
@ -387,7 +409,7 @@ RestStatus RestCollectionHandler::handleCommandPut() {
}
Result res;
VPackBuilder builder;
_builder.clear();
RestStatus status = RestStatus::DONE;
bool generateResponse = true;
auto found = methods::Collections::lookup( // find collection
@ -401,7 +423,7 @@ RestStatus RestCollectionHandler::handleCommandPut() {
if (res.ok()) {
bool cc = VelocyPackHelper::getBooleanValue(body, "count", true);
collectionRepresentation(builder, name, /*showProperties*/ false,
collectionRepresentation(name, /*showProperties*/ false,
/*showFigures*/ false, /*showCount*/ cc,
/*detailedCount*/ false);
}
@ -416,7 +438,7 @@ RestStatus RestCollectionHandler::handleCommandPut() {
res = methods::Collections::unload(&_vocbase, coll.get());
if (res.ok()) {
collectionRepresentation(builder, name, /*showProperties*/ false,
collectionRepresentation(name, /*showProperties*/ false,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
}
@ -424,7 +446,7 @@ RestStatus RestCollectionHandler::handleCommandPut() {
res = coll->compact();
if (res.ok()) {
collectionRepresentation(builder, name, /*showProperties*/ false,
collectionRepresentation(name, /*showProperties*/ false,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
}
@ -452,9 +474,9 @@ RestStatus RestCollectionHandler::handleCommandPut() {
res = coll->getResponsibleShard(body, false, shardId);
if (res.ok()) {
builder.openObject();
builder.add("shardId", VPackValue(shardId));
builder.close();
_builder.openObject();
_builder.add("shardId", VPackValue(shardId));
_builder.close();
}
}
} else if (sub == "truncate") {
@ -501,14 +523,13 @@ RestStatus RestCollectionHandler::handleCommandPut() {
coll->setStatus(TRI_vocbase_col_status_e::TRI_VOC_COL_STATUS_LOADED);
}
VPackBuilder builder;
collectionRepresentation(builder, *coll,
// no need to use async method, no
collectionRepresentation(*coll,
/*showProperties*/ false,
/*showFigures*/ false,
/*showCount*/ false,
/*detailedCount*/ true);
generateOk(rest::ResponseCode::OK, builder);
_response->setHeaderNC(StaticStrings::Location, _request->requestPath());
standardResponse();
}));
}
}
@ -556,7 +577,7 @@ RestStatus RestCollectionHandler::handleCommandPut() {
);
if (res.ok()) {
collectionRepresentation(builder, name, /*showProperties*/ true,
collectionRepresentation(name, /*showProperties*/ true,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
}
@ -574,18 +595,28 @@ RestStatus RestCollectionHandler::handleCommandPut() {
res = methods::Collections::rename(*coll, newName, false);
if (res.ok()) {
collectionRepresentation(builder, newName, /*showProperties*/ false,
collectionRepresentation(newName, /*showProperties*/ false,
/*showFigures*/ false, /*showCount*/ false,
/*detailedCount*/ true);
}
} else if (sub == "loadIndexesIntoMemory") {
res = methods::Collections::warmup(_vocbase, *coll);
generateResponse = false;
status = waitForFuture(
methods::Collections::warmup(_vocbase, *coll).thenValue([this, coll](Result&& res) {
if (res.fail()) {
generateTransactionError(coll->name(), res, "");
return;
}
VPackObjectBuilder obj(&builder, true);
{
VPackObjectBuilder obj(&_builder, true);
obj->add("result", VPackValue(res.ok()));
}
obj->add("result", VPackValue(res.ok()));
standardResponse();
}));
} else {
res = handleExtraCommandPut(*coll, sub, builder);
res = handleExtraCommandPut(*coll, sub, _builder);
if (res.is(TRI_ERROR_NOT_IMPLEMENTED)) {
res.reset(
TRI_ERROR_HTTP_NOT_FOUND,
@ -599,9 +630,7 @@ RestStatus RestCollectionHandler::handleCommandPut() {
if (found.fail()) {
generateError(found);
} else if (res.ok()) {
// TODO react to status?
generateOk(rest::ResponseCode::OK, builder);
_response->setHeaderNC(StaticStrings::Location, _request->requestPath());
standardResponse();
} else {
generateError(res);
}
@ -622,7 +651,7 @@ void RestCollectionHandler::handleCommandDelete() {
std::string const& name = suffixes[0];
bool allowDropSystem = _request->parsedValue("isSystem", false);
VPackBuilder builder;
_builder.clear();
Result res;
Result found = methods::Collections::lookup(
_vocbase, // vocbase to search
@ -631,7 +660,7 @@ void RestCollectionHandler::handleCommandDelete() {
TRI_ASSERT(coll);
auto cid = std::to_string(coll->id());
VPackObjectBuilder obj(&builder, true);
VPackObjectBuilder obj(&_builder, true);
obj->add("id", VPackValue(cid));
res = methods::Collections::drop(*coll, allowDropSystem, -1.0);
@ -643,7 +672,7 @@ void RestCollectionHandler::handleCommandDelete() {
} else if (res.fail()) {
generateError(res);
} else {
generateOk(rest::ResponseCode::OK, builder);
generateOk(rest::ResponseCode::OK, _builder);
}
}
@ -652,17 +681,15 @@ void RestCollectionHandler::handleCommandDelete() {
/// truncate
/// and create will not immediately show the expected results on a collection
/// object.
void RestCollectionHandler::collectionRepresentation(VPackBuilder& builder,
std::string const& name,
void RestCollectionHandler::collectionRepresentation(std::string const& name,
bool showProperties, bool showFigures,
bool showCount, bool detailedCount) {
Result r = methods::Collections::lookup(
_vocbase, // vocbase to search
name, // collection to find
[&](std::shared_ptr<LogicalCollection> const& coll) -> void { // callback if found
[=](std::shared_ptr<LogicalCollection> const& coll) -> void { // callback if found
TRI_ASSERT(coll);
collectionRepresentation(builder, *coll, showProperties, showFigures,
showCount, detailedCount);
collectionRepresentation(*coll, showProperties, showFigures, showCount, detailedCount);
});
if (r.fail()) {
@ -670,95 +697,124 @@ void RestCollectionHandler::collectionRepresentation(VPackBuilder& builder,
}
}
void RestCollectionHandler::collectionRepresentation(
arangodb::velocypack::Builder& builder, LogicalCollection& coll,
bool showProperties, bool showFigures, bool showCount, bool detailedCount) {
void RestCollectionHandler::collectionRepresentation(LogicalCollection& coll,
bool showProperties, bool showFigures,
bool showCount, bool detailedCount) {
if (showProperties || showCount) {
// Here we need a transaction
std::unique_ptr<transaction::Methods> trx;
try {
trx = createTransaction(coll.name(), AccessMode::Type::READ);
} catch (basics::Exception const& ex) {
if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) {
// this will happen if the tid of a managed transaction is passed in,
// but the transaction hasn't yet started on the DB server. in
// this case, we create an ad-hoc transaction on the underlying
// collection
trx = std::make_unique<SingleCollectionTransaction>(transaction::StandaloneContext::Create(_vocbase), coll.name(), AccessMode::Type::READ);
} else {
throw;
}
}
initializeTransaction(coll);
methods::Collections::Context ctxt(_vocbase, coll, _activeTrx.get());
TRI_ASSERT(trx != nullptr);
Result res = trx->begin();
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
methods::Collections::Context ctxt(_vocbase, coll, trx.get());
collectionRepresentation(builder, ctxt, showProperties, showFigures,
showCount, detailedCount);
collectionRepresentation(ctxt, showProperties, showFigures, showCount, detailedCount);
} else {
// We do not need a transaction here
methods::Collections::Context ctxt(_vocbase, coll);
collectionRepresentation(builder, ctxt, showProperties, showFigures,
showCount, detailedCount);
collectionRepresentation(ctxt, showProperties, showFigures, showCount, detailedCount);
}
}
void RestCollectionHandler::collectionRepresentation(VPackBuilder& builder,
methods::Collections::Context& ctxt,
void RestCollectionHandler::collectionRepresentation(methods::Collections::Context& ctxt,
bool showProperties, bool showFigures,
bool showCount, bool detailedCount) {
bool wasOpen = builder.isOpenObject();
collectionRepresentationAsync(ctxt, showProperties, showFigures, showCount, detailedCount)
.get();
}
futures::Future<futures::Unit> RestCollectionHandler::collectionRepresentationAsync(
methods::Collections::Context& ctxt, bool showProperties, bool showFigures,
bool showCount, bool detailedCount) {
bool wasOpen = _builder.isOpenObject();
if (!wasOpen) {
builder.openObject();
_builder.openObject();
}
auto coll = ctxt.coll();
TRI_ASSERT(coll != nullptr);
// `methods::Collections::properties` will filter these out
builder.add(StaticStrings::DataSourceId, VPackValue(std::to_string(coll->id())));
builder.add(StaticStrings::DataSourceName, VPackValue(coll->name()));
builder.add("status", VPackValue(coll->status()));
builder.add(StaticStrings::DataSourceType, VPackValue(coll->type()));
_builder.add(StaticStrings::DataSourceId, VPackValue(std::to_string(coll->id())));
_builder.add(StaticStrings::DataSourceName, VPackValue(coll->name()));
_builder.add("status", VPackValue(coll->status()));
_builder.add(StaticStrings::DataSourceType, VPackValue(coll->type()));
if (!showProperties) {
builder.add(StaticStrings::DataSourceSystem, VPackValue(coll->system()));
builder.add(StaticStrings::DataSourceGuid, VPackValue(coll->guid()));
_builder.add(StaticStrings::DataSourceSystem, VPackValue(coll->system()));
_builder.add(StaticStrings::DataSourceGuid, VPackValue(coll->guid()));
} else {
Result res = methods::Collections::properties(ctxt, builder);
Result res = methods::Collections::properties(ctxt, _builder);
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
}
futures::Future<std::shared_ptr<VPackBuilder>> figures =
futures::makeFuture(std::shared_ptr<VPackBuilder>(nullptr));
if (showFigures) {
auto figures = coll->figures();
builder.add("figures", figures->slice());
figures = coll->figures();
}
if (showCount) {
auto trx = ctxt.trx(AccessMode::Type::READ, true, true);
TRI_ASSERT(trx != nullptr);
OperationResult opRes =
trx->count(coll->name(), detailedCount ? transaction::CountType::Detailed
return std::move(figures)
.thenValue([=, &ctxt](std::shared_ptr<VPackBuilder>&& figures) -> futures::Future<OperationResult> {
if (figures) {
_builder.add("figures", figures->slice());
}
if (showCount) {
auto trx = ctxt.trx(AccessMode::Type::READ, true, true);
TRI_ASSERT(trx != nullptr);
return trx->countAsync(coll->name(),
detailedCount ? transaction::CountType::Detailed
: transaction::CountType::Normal);
}
return futures::makeFuture(OperationResult());
})
.thenValue([=, &ctxt](OperationResult&& opRes) -> void {
if (opRes.fail()) {
if (showCount) {
auto trx = ctxt.trx(AccessMode::Type::READ, true, true);
TRI_ASSERT(trx != nullptr);
trx->finish(opRes.result);
}
THROW_ARANGO_EXCEPTION(opRes.result);
}
if (opRes.fail()) {
trx->finish(opRes.result);
THROW_ARANGO_EXCEPTION(opRes.result);
if (showCount) {
_builder.add("count", opRes.slice());
}
if (!wasOpen) {
_builder.close();
}
});
}
void RestCollectionHandler::standardResponse() {
generateOk(rest::ResponseCode::OK, _builder);
_response->setHeaderNC(StaticStrings::Location, _request->requestPath());
}
void RestCollectionHandler::initializeTransaction(LogicalCollection& coll) {
try {
_activeTrx = createTransaction(coll.name(), AccessMode::Type::READ);
} catch (basics::Exception const& ex) {
if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) {
// this will happen if the tid of a managed transaction is passed in,
// but the transaction hasn't yet started on the DB server. in
// this case, we create an ad-hoc transaction on the underlying
// collection
_activeTrx = std::make_unique<SingleCollectionTransaction>(
transaction::StandaloneContext::Create(_vocbase), coll.name(),
AccessMode::Type::READ);
} else {
throw;
}
builder.add("count", opRes.slice());
}
if (!wasOpen) {
builder.close();
TRI_ASSERT(_activeTrx != nullptr);
Result res = _activeTrx->begin();
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
}

View File

@ -45,29 +45,36 @@ class RestCollectionHandler : public arangodb::RestVocbaseBaseHandler {
void shutdownExecute(bool isFinalized) noexcept override final;
protected:
void collectionRepresentation(VPackBuilder& builder, std::string const& name,
bool showProperties, bool showFigures,
bool showCount, bool detailedCount);
void collectionRepresentation(arangodb::velocypack::Builder& builder,
LogicalCollection& coll, bool showProperties,
void collectionRepresentation(std::string const& name, bool showProperties,
bool showFigures, bool showCount, bool detailedCount);
void collectionRepresentation(VPackBuilder& builder, methods::Collections::Context& ctxt,
bool showProperties, bool showFigures,
bool showCount, bool detailedCount);
void collectionRepresentation(LogicalCollection& coll, bool showProperties,
bool showFigures, bool showCount, bool detailedCount);
void collectionRepresentation(methods::Collections::Context& ctxt, bool showProperties,
bool showFigures, bool showCount, bool detailedCount);
futures::Future<futures::Unit> collectionRepresentationAsync(
methods::Collections::Context& ctxt, bool showProperties,
bool showFigures, bool showCount, bool detailedCount);
virtual Result handleExtraCommandPut(LogicalCollection& coll, std::string const& command,
velocypack::Builder& builder) = 0;
private:
void handleCommandGet();
void standardResponse();
void initializeTransaction(LogicalCollection&);
private:
RestStatus handleCommandGet();
void handleCommandPost();
RestStatus handleCommandPut();
void handleCommandDelete();
private:
VPackBuilder _builder;
std::unique_ptr<transaction::Methods> _activeTrx;
std::unique_ptr<methods::Collections::Context> _ctxt;
};
} // namespace arangodb

View File

@ -29,6 +29,7 @@
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/encoding.h"
#include "Futures/Utilities.h"
#include "Indexes/Index.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
@ -545,7 +546,7 @@ void PhysicalCollection::getIndexesVPack(VPackBuilder& result, unsigned flags,
}
/// @brief return the figures for a collection
std::shared_ptr<arangodb::velocypack::Builder> PhysicalCollection::figures() {
futures::Future<std::shared_ptr<arangodb::velocypack::Builder>> PhysicalCollection::figures() {
auto builder = std::make_shared<VPackBuilder>();
builder->openObject();
@ -576,7 +577,7 @@ std::shared_ptr<arangodb::velocypack::Builder> PhysicalCollection::figures() {
// add engine-specific figures
figuresSpecific(builder);
builder->close();
return builder;
return futures::makeFuture(builder);
}

View File

@ -23,15 +23,18 @@
#ifndef ARANGOD_VOCBASE_PHYSICAL_COLLECTION_H
#define ARANGOD_VOCBASE_PHYSICAL_COLLECTION_H 1
#include <set>
#include <velocypack/Builder.h>
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Futures/Future.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "VocBase/voc-types.h"
#include <velocypack/Builder.h>
namespace arangodb {
namespace transaction {
@ -127,7 +130,7 @@ class PhysicalCollection {
std::function<bool(arangodb::Index const*)> const& filter) const;
/// @brief return the figures for a collection
virtual std::shared_ptr<velocypack::Builder> figures();
virtual futures::Future<std::shared_ptr<velocypack::Builder>> figures();
/// @brief create or restore an index
/// @param restore utilize specified ID, assume index has to be created

View File

@ -2556,8 +2556,8 @@ Future<OperationResult> transaction::Methods::truncateLocal(std::string const& c
}
/// @brief count the number of documents in a collection
OperationResult transaction::Methods::count(std::string const& collectionName,
transaction::CountType type) {
futures::Future<OperationResult> transaction::Methods::countAsync(std::string const& collectionName,
transaction::CountType type) {
TRI_ASSERT(_state->status() == transaction::Status::RUNNING);
if (_state->isCoordinator()) {
@ -2570,25 +2570,25 @@ OperationResult transaction::Methods::count(std::string const& collectionName,
type = CountType::Normal;
}
return countLocal(collectionName, type);
return futures::makeFuture(countLocal(collectionName, type));
}
#ifndef USE_ENTERPRISE
/// @brief count the number of documents in a collection
OperationResult transaction::Methods::countCoordinator(std::string const& collectionName,
transaction::CountType type) {
futures::Future<OperationResult> transaction::Methods::countCoordinator(
std::string const& collectionName, transaction::CountType type) {
auto& feature = vocbase().server().getFeature<ClusterFeature>();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return OperationResult(TRI_ERROR_SHUTTING_DOWN);
return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN));
}
ClusterInfo& ci = feature.clusterInfo();
// First determine the collection ID from the name:
auto collinfo = ci.getCollectionNT(vocbase().name(), collectionName);
if (collinfo == nullptr) {
return OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
return countCoordinatorHelper(collinfo, collectionName, type);
@ -2596,7 +2596,7 @@ OperationResult transaction::Methods::countCoordinator(std::string const& collec
#endif
OperationResult transaction::Methods::countCoordinatorHelper(
futures::Future<OperationResult> transaction::Methods::countCoordinatorHelper(
std::shared_ptr<LogicalCollection> const& collinfo,
std::string const& collectionName, transaction::CountType type) {
TRI_ASSERT(collinfo != nullptr);
@ -2612,17 +2612,29 @@ OperationResult transaction::Methods::countCoordinatorHelper(
if (documents == CountCache::NotPopulated) {
// no cache hit, or detailed results requested
std::vector<std::pair<std::string, uint64_t>> count;
auto res = arangodb::countOnCoordinator(*this, collectionName, count);
return arangodb::countOnCoordinator(*this, collectionName)
.thenValue([&cache, type](OperationResult&& res) -> OperationResult {
if (res.fail()) {
return std::move(res);
}
if (res != TRI_ERROR_NO_ERROR) {
return OperationResult(res);
}
// reassemble counts from vpack
std::vector<std::pair<std::string, uint64_t>> counts;
TRI_ASSERT(res.slice().isArray());
for (VPackSlice count : VPackArrayIterator(res.slice())) {
TRI_ASSERT(count.isArray());
TRI_ASSERT(count[0].isString());
TRI_ASSERT(count[1].isNumber());
std::string key = count[0].copyString();
uint64_t value = count[1].getNumericValue<uint64_t>();
counts.emplace_back(std::move(key), value);
}
int64_t total = 0;
OperationResult opRes = buildCountResult(count, type, total);
cache.store(total);
return opRes;
int64_t total = 0;
OperationResult opRes = buildCountResult(counts, type, total);
cache.store(total);
return opRes;
});
}
// cache hit!

View File

@ -371,8 +371,14 @@ class Methods {
Future<OperationResult> truncateAsync(std::string const& collectionName,
OperationOptions const& options);
/// deprecated, use async variant
virtual OperationResult count(std::string const& collectionName, CountType type) {
return countAsync(collectionName, type).get();
}
/// @brief count the number of documents in a collection
virtual OperationResult count(std::string const& collectionName, CountType type);
virtual futures::Future<OperationResult> countAsync(std::string const& collectionName,
CountType type);
/// @brief Gets the best fitting index for an AQL condition.
/// note: the caller must have read-locked the underlying collection when
@ -528,10 +534,12 @@ class Methods {
TransactionCollection* trxCollection(
std::string const& name, AccessMode::Type type = AccessMode::Type::READ) const;
OperationResult countCoordinator(std::string const& collectionName, CountType type);
futures::Future<OperationResult> countCoordinator(std::string const& collectionName,
CountType type);
OperationResult countCoordinatorHelper(std::shared_ptr<LogicalCollection> const& collinfo,
std::string const& collectionName, CountType type);
futures::Future<OperationResult> countCoordinatorHelper(
std::shared_ptr<LogicalCollection> const& collinfo,
std::string const& collectionName, CountType type);
OperationResult countLocal(std::string const& collectionName, CountType type);

View File

@ -963,7 +963,7 @@ static void JS_FiguresVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args
TRI_V8_THROW_EXCEPTION(res);
}
auto builder = collection->figures();
auto builder = collection->figures().get();
trx.finish(TRI_ERROR_NO_ERROR);
@ -1841,16 +1841,16 @@ static void JS_RevisionVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& arg
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
TRI_voc_rid_t revisionId;
methods::Collections::Context ctxt(collection->vocbase(), *collection);
auto res = methods::Collections::revisionId(collection->vocbase().server(), ctxt, revisionId);
auto res = methods::Collections::revisionId(ctxt).get();
if (res.fail()) {
TRI_V8_THROW_EXCEPTION(res);
TRI_V8_THROW_EXCEPTION(res.result);
}
std::string ridString = TRI_RidToString(revisionId);
TRI_voc_rid_t rid =
res.slice().isNumber() ? res.slice().getNumber<TRI_voc_rid_t>() : 0;
std::string ridString = TRI_RidToString(rid);
TRI_V8_RETURN(TRI_V8_STD_STRING(isolate, ridString));
TRI_V8_TRY_CATCH_END
}
@ -2516,7 +2516,8 @@ static void JS_WarmupVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args)
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
auto res = arangodb::methods::Collections::warmup(collection->vocbase(), *collection);
auto res =
arangodb::methods::Collections::warmup(collection->vocbase(), *collection).get();
if (res.fail()) {
TRI_V8_THROW_EXCEPTION(res);

View File

@ -868,7 +868,7 @@ arangodb::Result LogicalCollection::properties(velocypack::Slice const& slice,
}
/// @brief return the figures for a collection
std::shared_ptr<arangodb::velocypack::Builder> LogicalCollection::figures() const {
futures::Future<std::shared_ptr<arangodb::velocypack::Builder>> LogicalCollection::figures() const {
return getPhysical()->figures();
}

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h"
#include "Futures/Future.h"
#include "Indexes/IndexIterator.h"
#include "Transaction/CountCache.h"
#include "VocBase/LogicalDataSource.h"
@ -264,7 +265,7 @@ class LogicalCollection : public LogicalDataSource {
virtual arangodb::Result properties(velocypack::Slice const& slice, bool partialUpdate) override;
/// @brief return the figures for a collection
virtual std::shared_ptr<velocypack::Builder> figures() const;
virtual futures::Future<std::shared_ptr<velocypack::Builder>> figures() const;
/// @brief opens an existing collection
void open(bool ignoreErrors);

View File

@ -34,6 +34,7 @@
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
@ -791,10 +792,11 @@ static Result DropVocbaseColCoordinator(arangodb::LogicalCollection* collection,
return res;
}
Result Collections::warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll) {
futures::Future<Result> Collections::warmup(TRI_vocbase_t& vocbase,
LogicalCollection const& coll) {
ExecContext const& exec = ExecContext::current(); // disallow expensive ops
if (!exec.canUseCollection(coll.name(), auth::Level::RO)) {
return Result(TRI_ERROR_FORBIDDEN);
return futures::makeFuture(Result(TRI_ERROR_FORBIDDEN));
}
if (ServerState::instance()->isCoordinator()) {
@ -808,7 +810,7 @@ Result Collections::warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll
Result res = trx.begin();
if (res.fail()) {
return res;
return futures::makeFuture(res);
}
auto poster = [](std::function<void()> fn) -> bool {
@ -827,24 +829,26 @@ Result Collections::warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll
if (queue->status() == TRI_ERROR_NO_ERROR) {
res = trx.commit();
} else {
return queue->status();
return futures::makeFuture(Result(queue->status()));
}
return res;
return futures::makeFuture(res);
}
Result Collections::revisionId(application_features::ApplicationServer& server,
Context& ctxt, TRI_voc_rid_t& rid) {
futures::Future<OperationResult> Collections::revisionId(Context& ctxt) {
if (ServerState::instance()->isCoordinator()) {
auto& databaseName = ctxt.coll()->vocbase().name();
auto cid = std::to_string(ctxt.coll()->id());
auto& feature = server.getFeature<ClusterFeature>();
return revisionOnCoordinator(feature, databaseName, cid, rid);
auto& feature = ctxt.coll()->vocbase().server().getFeature<ClusterFeature>();
return revisionOnCoordinator(feature, databaseName, cid);
}
rid = ctxt.coll()->revision(ctxt.trx(AccessMode::Type::READ, true, true));
TRI_voc_rid_t rid = ctxt.coll()->revision(ctxt.trx(AccessMode::Type::READ, true, true));
return TRI_ERROR_NO_ERROR;
VPackBuilder builder;
builder.add(VPackValue(rid));
return futures::makeFuture(OperationResult(Result(), builder.steal()));
}
/// @brief Helper implementation similar to ArangoCollection.all() in v8

View File

@ -24,6 +24,8 @@
#define ARANGOD_VOC_BASE_API_COLLECTIONS_H 1
#include "Basics/Result.h"
#include "Futures/Future.h"
#include "Utils/OperationResult.h"
#include "VocBase/AccessMode.h"
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
@ -121,10 +123,10 @@ struct Collections {
double timeout // single-server drop timeout
);
static Result warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll);
static futures::Future<Result> warmup(TRI_vocbase_t& vocbase,
LogicalCollection const& coll);
static Result revisionId(application_features::ApplicationServer&,
Context& ctxt, TRI_voc_rid_t& rid);
static futures::Future<OperationResult> revisionId(Context& ctxt);
/// @brief Helper implementation similar to ArangoCollection.all() in v8
static arangodb::Result all(TRI_vocbase_t& vocbase, std::string const& cname,