1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/ArangoDB into bug-fix/speedup-tests

This commit is contained in:
Wilfried Goesgens 2019-09-27 17:46:41 +02:00
commit 1bdd4895b6
34 changed files with 613 additions and 735 deletions

View File

@ -182,7 +182,7 @@ class Request final : public Message {
///////////////////////////////////////////////
// add payload
///////////////////////////////////////////////
void addVPack(velocypack::Slice const& slice);
void addVPack(velocypack::Slice const slice);
void addVPack(velocypack::Buffer<uint8_t> const& buffer);
void addVPack(velocypack::Buffer<uint8_t>&& buffer);
void addBinary(uint8_t const* data, std::size_t length);

View File

@ -56,7 +56,7 @@ std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameter,
velocypack::Slice const& payload);
velocypack::Slice const payload);
std::unique_ptr<Request> createRequest(
RestVerb verb, std::string const& path,

View File

@ -180,7 +180,7 @@ constexpr std::chrono::milliseconds Request::defaultTimeout;
ContentType Request::acceptType() const { return header.acceptType(); }
//// add payload add VelocyPackData
void Request::addVPack(VPackSlice const& slice) {
void Request::addVPack(VPackSlice const slice) {
#ifdef FUERTE_CHECKED_MODE
// FUERTE_LOG_ERROR << "Checking data that is added to the message: " <<
// std::endl;

View File

@ -48,7 +48,7 @@ std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameters,
VPackSlice const& payload) {
VPackSlice const payload) {
auto request = createRequest(verb, ContentType::VPack);
request->header.path = path;
request->header.parameters = parameters;

View File

@ -71,11 +71,3 @@ SharedAqlItemBlockPtr itemBlock::concatenate(AqlItemBlockManager& manager,
return res;
}
void itemBlock::forRowInBlock(SharedAqlItemBlockPtr const& block,
std::function<void(InputAqlItemRow&&)> const& callback) {
TRI_ASSERT(block != nullptr);
for (std::size_t index = 0; index < block->size(); ++index) {
callback(InputAqlItemRow{block, index});
}
}

View File

@ -40,10 +40,6 @@ namespace itemBlock {
/// set to nullptr, just to be sure.
SharedAqlItemBlockPtr concatenate(AqlItemBlockManager&,
std::vector<SharedAqlItemBlockPtr>& blocks);
void forRowInBlock(SharedAqlItemBlockPtr const& block,
std::function<void(InputAqlItemRow&&)> const& callback);
} // namespace itemBlock
} // namespace aql

View File

@ -44,9 +44,9 @@ std::shared_ptr<AqlTransaction> AqlTransaction::create(
std::unordered_set<std::string> inaccessibleCollections) {
#ifdef USE_ENTERPRISE
if (options.skipInaccessibleCollections) {
return std::make_shared<transaction::IgnoreNoAccessAqlTransaction>(transactionContext, collections,
options, isMainTransaction,
std::move(inaccessibleCollections));
return std::make_shared<transaction::IgnoreNoAccessAqlTransaction>(
transactionContext, collections, options, isMainTransaction,
std::move(inaccessibleCollections));
}
#endif
return std::make_shared<AqlTransaction>(transactionContext, collections, options, isMainTransaction);

View File

@ -32,7 +32,6 @@
#include "Aql/ExecutionStats.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h"
#include "Aql/WakeupQueryCallback.h"
#include "Basics/Exceptions.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringBuffer.h"

View File

@ -92,8 +92,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi
// It might in fact be empty, if we only have edge collections in a graph.
// Or if we guarantee to never read vertex data.
for (auto const& col : vertices) {
auto shards = getAllLocalShards(shardMapping, server,
col->shardIds(restrictToShards));
auto shards = getAllLocalShards(shardMapping, server, col->shardIds(restrictToShards));
#ifdef USE_ENTERPRISE
for (auto const& s : shards) {
if (query.trx()->isInaccessibleCollectionId(col->getPlanId())) {

View File

@ -276,13 +276,6 @@ ExecutionNode const* ExecutionBlock::getPlanNode() const { return _exeNode; }
transaction::Methods* ExecutionBlock::transaction() const { return _trx; }
bool ExecutionBlock::handleAsyncResult(ClusterCommResult* result) {
// This indicates that a node uses async functionality
// but does not react to the response.
TRI_ASSERT(false);
return true;
}
void ExecutionBlock::addDependency(ExecutionBlock* ep) {
TRI_ASSERT(ep != nullptr);
// We can never have the same dependency twice

View File

@ -121,11 +121,6 @@ class ExecutionBlock {
transaction::Methods* transaction() const;
// @brief Will be called on the querywakeup callback with the
// result collected over the network. Needs to be implemented
// on all nodes that use this mechanism.
virtual bool handleAsyncResult(ClusterCommResult* result);
/// @brief add a dependency
void addDependency(ExecutionBlock* ep);

View File

@ -195,7 +195,10 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats&
RegisterId const inReg = info._input1RegisterId;
TRI_ASSERT(_block != nullptr);
itemBlock::forRowInBlock(_block, [this, inReg, &info](InputAqlItemRow&& row) {
for (std::size_t index = 0; index < _block->size(); ++index) {
InputAqlItemRow row{_block, index};
auto const& inVal = row.getValue(inReg);
if (!info._consultAqlWriteFilter ||
!info._aqlCollection->getCollection()->skipForAqlWrite(inVal.slice(),
@ -207,7 +210,7 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats&
// not relevant for ourselves... just pass it on to the next block
_operations.push_back(ModOperationType::IGNORE_RETURN);
}
});
}
TRI_ASSERT(_operations.size() == _block->size());
@ -322,8 +325,8 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats&
RegisterId const inReg = info._input1RegisterId;
TRI_ASSERT(_block != nullptr);
itemBlock::forRowInBlock(_block, [this, &stats, &errorCode, &key, &rev, trx,
inReg, &info](InputAqlItemRow&& row) {
for (std::size_t index = 0; index < _block->size(); ++index) {
InputAqlItemRow row{_block, index};
auto const& inVal = row.getValue(inReg);
if (!info._consultAqlWriteFilter ||
@ -363,7 +366,7 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats&
_operations.push_back(ModOperationType::IGNORE_RETURN);
this->_last_not_skip = _operations.size();
}
});
}
TRI_ASSERT(_operations.size() == _block->size());
@ -462,9 +465,9 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats&
RegisterId const insertReg = info._input2RegisterId;
RegisterId const updateReg = info._input3RegisterId;
itemBlock::forRowInBlock(_block, [this, &stats, &errorCode, &errorMessage,
&key, trx, inDocReg, insertReg, updateReg,
&info](InputAqlItemRow&& row) {
for (std::size_t index = 0; index < _block->size(); ++index) {
InputAqlItemRow row{_block, index};
errorMessage.clear();
errorCode = TRI_ERROR_NO_ERROR;
auto const& inVal = row.getValue(inDocReg);
@ -524,7 +527,7 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats&
_operations.push_back(ModOperationType::IGNORE_SKIP);
handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage);
}
});
}
TRI_ASSERT(_operations.size() == _block->size());
@ -659,9 +662,9 @@ bool UpdateReplace<ModType>::doModifications(ModificationExecutorInfos& info,
RegisterId const keyReg = info._input2RegisterId;
bool const hasKeyVariable = keyReg != RegisterPlan::MaxRegisterId;
itemBlock::forRowInBlock(_block, [this, &options, &stats, &errorCode,
&errorMessage, &key, &rev, trx, inDocReg, keyReg,
hasKeyVariable, &info](InputAqlItemRow&& row) {
for (std::size_t index = 0; index < _block->size(); ++index) {
InputAqlItemRow row{_block, index};
auto const& inVal = row.getValue(inDocReg);
errorCode = TRI_ERROR_NO_ERROR;
errorMessage.clear();
@ -718,7 +721,7 @@ bool UpdateReplace<ModType>::doModifications(ModificationExecutorInfos& info,
_operations.push_back(ModOperationType::IGNORE_SKIP);
handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage);
}
});
}
TRI_ASSERT(_operations.size() == _block->size());

View File

@ -482,8 +482,8 @@ ExecutionPlan* Query::preparePlan() {
#endif
auto trx = AqlTransaction::create(std::move(ctx), _collections.collections(),
_queryOptions.transactionOptions,
_part == PART_MAIN, std::move(inaccessibleCollections));
_queryOptions.transactionOptions, _part == PART_MAIN,
std::move(inaccessibleCollections));
// create the transaction object, but do not start it yet
_trx = trx;
_trx->addHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL); // only used on toplevel

View File

@ -255,7 +255,7 @@ class Query {
/// @brief return the transaction, if prepared
TEST_VIRTUAL inline transaction::Methods* trx() const { return _trx.get(); }
/// @brief get the plan for the query
ExecutionPlan* plan() const { return _plan.get(); }

View File

@ -27,14 +27,20 @@
#include "Aql/ExecutorInfos.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h"
#include "Aql/WakeupQueryCallback.h"
#include "Basics/MutexLocker.h"
#include "Basics/RecursiveLocker.h"
#include "Basics/StringBuffer.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ServerState.h"
#include "Logger/LogMacros.h"
#include "Network/ConnectionPool.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Rest/CommonDefines.h"
#include "VocBase/vocbase.h"
#include <fuerte/connection.h>
#include <fuerte/requests.h>
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
@ -44,8 +50,10 @@ using namespace arangodb::aql;
using arangodb::basics::VelocyPackHelper;
namespace {
/// @brief timeout
double const ExecutionBlockImpl<RemoteExecutor>::defaultTimeOut = 3600.0;
constexpr std::chrono::seconds kDefaultTimeOutSecs(3600);
} // namespace
ExecutionBlockImpl<RemoteExecutor>::ExecutionBlockImpl(
ExecutionEngine* engine, RemoteNode const* node, ExecutorInfos&& infos,
@ -57,9 +65,8 @@ ExecutionBlockImpl<RemoteExecutor>::ExecutionBlockImpl(
_ownName(ownName),
_queryId(queryId),
_isResponsibleForInitializeCursor(node->isResponsibleForInitializeCursor()),
_lastResponse(nullptr),
_lastError(TRI_ERROR_NO_ERROR),
_lastTicketId(0),
_lastTicket(0),
_hasTriggeredShutdown(false) {
TRI_ASSERT(!queryId.empty());
TRI_ASSERT((arangodb::ServerState::instance()->isCoordinator() && ownName.empty()) ||
@ -101,13 +108,13 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
TRI_ASSERT(_lastError.ok());
// We do not have an error but a result, all is good
// We have an open result still.
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody();
auto response = std::move(_lastResponse);
// Result is the response which will be a serialized AqlItemBlock
// both must be reset before return or throw
TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr);
VPackSlice responseBody = responseBodyBuilder->slice();
VPackSlice responseBody = response->slice();
ExecutionState state = ExecutionState::HASMORE;
if (VelocyPackHelper::getBooleanValue(responseBody, "done", true)) {
@ -123,16 +130,20 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
}
// We need to send a request here
VPackBuilder builder;
builder.openObject();
builder.add("atMost", VPackValue(atMost));
builder.close();
VPackBuffer<uint8_t> buffer;
{
VPackBuilder builder(buffer);
builder.openObject();
builder.add("atMost", VPackValue(atMost));
builder.close();
traceGetSomeRequest(builder.slice(), atMost);
}
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/getSome/",
std::move(buffer));
auto bodyString = std::make_shared<std::string const>(builder.slice().toJson());
traceGetSomeRequest(bodyString, atMost);
auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/getSome/", bodyString);
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
@ -160,12 +171,12 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWi
// We have an open result still.
// Result is the response which will be a serialized AqlItemBlock
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody();
auto response = std::move(_lastResponse);
// both must be reset before return or throw
TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr);
VPackSlice slice = responseBodyBuilder->slice();
VPackSlice slice = response->slice();
if (!slice.hasKey(StaticStrings::Error) || slice.get(StaticStrings::Error).getBoolean()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION);
@ -190,16 +201,17 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWi
// For every call we simply forward via HTTP
VPackBuilder builder;
builder.openObject();
builder.add("atMost", VPackValue(atMost));
builder.close();
VPackBuffer<uint8_t> buffer;
{
VPackBuilder builder(buffer);
builder.openObject(/*unindexed*/ true);
builder.add("atMost", VPackValue(atMost));
builder.close();
traceSkipSomeRequest(builder.slice(), atMost);
}
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/skipSome/",
std::move(buffer));
auto bodyString = std::make_shared<std::string const>(builder.slice().toJson());
traceSkipSomeRequest(bodyString, atMost);
auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/skipSome/", bodyString);
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
@ -224,10 +236,10 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
if (_lastResponse != nullptr || _lastError.fail()) {
// We have an open result still.
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody();
auto response = std::move(_lastResponse);
// Result is the response which is an object containing the ErrorCode
VPackSlice slice = responseBodyBuilder->slice();
VPackSlice slice = response->slice();
if (slice.hasKey("code")) {
return {ExecutionState::DONE, slice.get("code").getNumericValue<int>()};
}
@ -238,8 +250,9 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
VPackBuilder builder(&options);
builder.openObject();
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer, &options);
builder.openObject(/*unindexed*/ true);
// Backwards Compatibility 3.3
// NOTE: Removing this breaks tests in current devel - is this really for
@ -252,16 +265,14 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
// Now only the one output row is send.
builder.add("pos", VPackValue(0));
builder.add(VPackValue("items"));
builder.openObject();
builder.openObject(/*unindexed*/ true);
input.toVelocyPack(_engine->getQuery()->trx(), builder);
builder.close();
builder.close();
auto bodyString = std::make_shared<std::string const>(builder.slice().toJson());
auto res = sendAsyncRequest(rest::RequestType::PUT,
"/_api/aql/initializeCursor/", bodyString);
auto res = sendAsyncRequest(fuerte::RestVerb::Put,
"/_api/aql/initializeCursor/", std::move(buffer));
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
@ -277,20 +288,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
}
if (!_hasTriggeredShutdown) {
// Make sure to cover against the race that the request
// in flight is not overtaking in the drop phase here.
// After this lock is released even a response
// will be discarded in the handle response code
MUTEX_LOCKER(locker, _communicationMutex);
if (_lastTicketId != 0) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr only happens on controlled shutdown
return {ExecutionState::DONE, TRI_ERROR_SHUTTING_DOWN};
}
cc->drop(0, _lastTicketId, "");
}
_lastTicketId = 0;
std::lock_guard<std::mutex> guard(_communicationMutex);
_lastTicket = 0;
_lastError.reset(TRI_ERROR_NO_ERROR);
_lastResponse.reset();
_hasTriggeredShutdown = true;
@ -318,12 +317,12 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
if (_lastResponse != nullptr) {
TRI_ASSERT(_lastError.ok());
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody();
auto response = std::move(_lastResponse);
// both must be reset before return or throw
TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr);
VPackSlice slice = responseBodyBuilder->slice();
VPackSlice slice = response->slice();
if (slice.isObject()) {
if (slice.hasKey("stats")) {
ExecutionStats newStats(slice.get("stats"));
@ -357,187 +356,127 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
_didSendShutdownRequest = true;
#endif
// For every call we simply forward via HTTP
VPackBuilder bodyBuilder;
bodyBuilder.openObject();
bodyBuilder.add("code", VPackValue(errorCode));
bodyBuilder.close();
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
builder.openObject(/*unindexed*/ true);
builder.add("code", VPackValue(errorCode));
builder.close();
auto bodyString = std::make_shared<std::string const>(bodyBuilder.slice().toJson());
auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/shutdown/", bodyString);
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/shutdown/",
std::move(buffer));
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(
arangodb::rest::RequestType type, std::string const& urlPart,
std::shared_ptr<std::string const> body) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
namespace {
Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
fuerte::Response* response) {
TRI_ASSERT(err != fuerte::Error::NoError || response->statusCode() >= 400);
std::string msg;
if (spec.shardId.empty()) {
msg.append("Error message received from cluster node '")
.append(spec.serverId)
.append("': ");
} else {
msg.append("Error message received from shard '")
.append(spec.shardId)
.append("' on cluster node '")
.append(spec.serverId)
.append("': ");
}
int res = TRI_ERROR_INTERNAL;
if (err != fuerte::Error::NoError) {
res = network::fuerteToArangoErrorCode(err);
msg.append(TRI_errno_string(res));
} else {
VPackSlice slice = response->slice();
if (slice.isObject()) {
VPackSlice err = slice.get(StaticStrings::Error);
if (err.isBool() && err.getBool()) {
res = VelocyPackHelper::readNumericValue(slice, StaticStrings::ErrorNum, res);
VPackStringRef ref =
VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage,
"(no valid error in response)");
msg.append(ref.data(), ref.size());
}
}
}
return Result(res, std::move(msg));
}
} // namespace
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type,
std::string const& urlPart,
VPackBuffer<uint8_t> body) {
NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
if (!pool) {
// nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN};
}
// Later, we probably want to set these sensibly:
CoordTransactionID const coordTransactionId = TRI_NewTickServer();
std::unordered_map<std::string, std::string> headers;
if (!_ownName.empty()) {
headers.emplace("Shard-Id", _ownName);
}
std::string url = std::string("/_db/") +
arangodb::basics::StringUtils::urlEncode(
_engine->getQuery()->trx()->vocbase().name()) +
_engine->getQuery()->vocbase().name()) +
urlPart + _queryId;
arangodb::network::EndpointSpec spec;
int res = network::resolveDestination(nf, _server, spec);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
return Result(res);
}
TRI_ASSERT(!spec.endpoint.empty());
auto req = fuerte::createRequest(type, url, {}, std::move(body));
// Later, we probably want to set these sensibly:
req->timeout(kDefaultTimeOutSecs);
if (!_ownName.empty()) {
req->header.addMeta("Shard-Id", _ownName);
}
network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint);
std::lock_guard<std::mutex> guard(_communicationMutex);
unsigned ticket = ++_lastTicket;
std::shared_ptr<fuerte::Connection> conn = ref.connection();
conn->sendRequest(std::move(req),
[=, ref(std::move(ref))](fuerte::Error err,
std::unique_ptr<fuerte::Request>,
std::unique_ptr<fuerte::Response> res) {
_query.sharedState()->execute([&] { // notifies outside
std::lock_guard<std::mutex> guard(_communicationMutex);
if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
}
});
});
++_engine->_stats.requests;
std::shared_ptr<ClusterCommCallback> callback =
std::make_shared<WakeupQueryCallback>(this, _engine->getQuery());
// Make sure to cover against the race that this
// Request is fullfilled before the register has taken place
// @note the only reason for not using recursive mutext always is due to the
// concern that there might be recursive calls in production
#ifdef ARANGODB_USE_GOOGLE_TESTS
RECURSIVE_MUTEX_LOCKER(_communicationMutex, _communicationMutexOwner);
#else
MUTEX_LOCKER(locker, _communicationMutex);
#endif
// We can only track one request at a time.
// So assert there is no other request in flight!
TRI_ASSERT(_lastTicketId == 0);
_lastTicketId =
cc->asyncRequest(coordTransactionId, _server, type, url, std::move(body),
headers, callback, defaultTimeOut, true);
return {TRI_ERROR_NO_ERROR};
}
bool ExecutionBlockImpl<RemoteExecutor>::handleAsyncResult(ClusterCommResult* result) {
// So we cannot have the response being produced while sending the request.
// Make sure to cover against the race that this
// Request is fullfilled before the register has taken place
// @note the only reason for not using recursive mutext always is due to the
// concern that there might be recursive calls in production
#ifdef ARANGODB_USE_GOOGLE_TESTS
RECURSIVE_MUTEX_LOCKER(_communicationMutex, _communicationMutexOwner);
#else
MUTEX_LOCKER(locker, _communicationMutex);
#endif
if (_lastTicketId == result->operationID) {
// TODO Handle exceptions thrown while we are in this code
// Query will not be woken up again.
_lastError = handleCommErrors(result);
if (_lastError.ok()) {
_lastResponse = result->result;
}
_lastTicketId = 0;
}
return true;
}
arangodb::Result ExecutionBlockImpl<RemoteExecutor>::handleCommErrors(ClusterCommResult* res) const {
if (res->status == CL_COMM_TIMEOUT || res->status == CL_COMM_BACKEND_UNAVAILABLE) {
return {res->getErrorCode(), res->stringifyErrorMessage()};
}
if (res->status == CL_COMM_ERROR) {
std::string errorMessage;
auto const& shardID = res->shardID;
if (shardID.empty()) {
errorMessage = std::string("Error message received from cluster node '") +
std::string(res->serverID) + std::string("': ");
} else {
errorMessage = std::string("Error message received from shard '") +
std::string(shardID) + std::string("' on cluster node '") +
std::string(res->serverID) + std::string("': ");
}
int errorNum = TRI_ERROR_INTERNAL;
if (res->result != nullptr) {
errorNum = TRI_ERROR_NO_ERROR;
arangodb::basics::StringBuffer const& responseBodyBuf(res->result->getBody());
std::shared_ptr<VPackBuilder> builder =
VPackParser::fromJson(responseBodyBuf.c_str(), responseBodyBuf.length());
VPackSlice slice = builder->slice();
if (!slice.hasKey(StaticStrings::Error) ||
slice.get(StaticStrings::Error).getBoolean()) {
errorNum = TRI_ERROR_INTERNAL;
}
if (slice.isObject()) {
VPackSlice v = slice.get(StaticStrings::ErrorNum);
if (v.isNumber()) {
if (v.getNumericValue<int>() != TRI_ERROR_NO_ERROR) {
/* if we've got an error num, error has to be true. */
TRI_ASSERT(errorNum == TRI_ERROR_INTERNAL);
errorNum = v.getNumericValue<int>();
}
}
v = slice.get(StaticStrings::ErrorMessage);
if (v.isString()) {
errorMessage += v.copyString();
} else {
errorMessage += std::string("(no valid error in response)");
}
}
}
// In this case a proper HTTP error was reported by the DBserver,
if (errorNum > 0 && !errorMessage.empty()) {
return {errorNum, errorMessage};
}
// default error
return {TRI_ERROR_CLUSTER_AQL_COMMUNICATION};
}
TRI_ASSERT(res->status == CL_COMM_SENT);
return {TRI_ERROR_NO_ERROR};
}
/**
* @brief Steal the last returned body. Will throw an error if
* there has been an error of any kind, e.g. communication
* or error created by remote server.
* Will reset the lastResponse, so after this call we are
* ready to send a new request.
*
* @return A shared_ptr containing the remote response.
*/
std::shared_ptr<VPackBuilder> ExecutionBlockImpl<RemoteExecutor>::stealResultBody() {
// NOTE: This cannot participate in the race in communication.
// This will not be called after the MUTEX to send was released.
// It can only be called by the next getSome call.
// This getSome however is locked by the QueryRegistery several layers above
if (!_lastError.ok()) {
THROW_ARANGO_EXCEPTION(_lastError);
}
// We have an open result still.
// Result is the response which is an object containing the ErrorCode
std::shared_ptr<VPackBuilder> responseBodyBuilder = _lastResponse->getBodyVelocyPack();
_lastResponse.reset();
return responseBodyBuilder;
}
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(
std::shared_ptr<const std::string> const& body, size_t const atMost) {
traceRequest("getSome", body, atMost);
VPackSlice slice, size_t const atMost) {
traceRequest("getSome", slice, atMost);
}
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(
std::shared_ptr<const std::string> const& body, size_t const atMost) {
traceRequest("skipSome", body, atMost);
VPackSlice slice, size_t const atMost) {
traceRequest("skipSome", slice, atMost);
}
void ExecutionBlockImpl<RemoteExecutor>::traceRequest(
const char* rpc, std::shared_ptr<const std::string> const& sharedPtr, size_t atMost) {
const char* rpc, VPackSlice slice, size_t atMost) {
if (_profile >= PROFILE_LEVEL_TRACE_1) {
auto const queryId = this->_engine->getQuery()->id();
auto const remoteQueryId = _queryId;

View File

@ -24,12 +24,13 @@
#define ARANGOD_AQL_REMOTE_EXECUTOR_H
#include "Aql/ClusterNodes.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutorInfos.h"
#include "Basics/Mutex.h"
#include "Cluster/ClusterComm.h"
#include "Rest/CommonDefines.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "Aql/ExecutionBlockImpl.h"
#include <mutex>
#include <fuerte/message.h>
#include <fuerte/types.h>
namespace arangodb {
namespace aql {
@ -61,9 +62,6 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
std::pair<ExecutionState, Result> shutdown(int errorCode) override;
/// @brief handleAsyncResult
bool handleAsyncResult(ClusterCommResult* result) override;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// only for asserts:
public:
@ -81,27 +79,12 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
Query const& getQuery() const { return _query; }
/**
* @brief Handle communication errors in Async case.
*
* @param result The network response we got from cluster comm.
*
* @return A wrapped Result Object, that is either ok() or contains
* the error information to be thrown in get/skip some.
*/
arangodb::Result handleCommErrors(ClusterCommResult* result) const;
/// @brief internal method to send a request. Will register a callback to be
/// reactivated
arangodb::Result sendAsyncRequest(rest::RequestType type, std::string const& urlPart,
std::shared_ptr<std::string const> body);
std::shared_ptr<velocypack::Builder> stealResultBody();
arangodb::Result sendAsyncRequest(fuerte::RestVerb type, std::string const& urlPart,
velocypack::Buffer<uint8_t> body);
private:
/// @brief timeout
static double const defaultTimeOut;
ExecutorInfos _infos;
Query const& _query;
@ -122,29 +105,24 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
/// @brief the last unprocessed result. Make sure to reset it
/// after it is processed.
std::shared_ptr<httpclient::SimpleHttpResult> _lastResponse;
std::unique_ptr<fuerte::Response> _lastResponse;
/// @brief the last remote response Result object, may contain an error.
arangodb::Result _lastError;
/// @brief Mutex to cover against the race, that a getSome request
/// is responded before the ticket id is registered.
arangodb::Mutex _communicationMutex;
#ifdef ARANGODB_USE_GOOGLE_TESTS
std::atomic<std::thread::id> _communicationMutexOwner; // current thread owning '_communicationMutex' lock (workaround for non-recusrive MutexLocker)
#endif
OperationID _lastTicketId;
std::mutex _communicationMutex;
unsigned _lastTicket; /// used to check for canceled requests
bool _hasTriggeredShutdown;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _didSendShutdownRequest = false;
#endif
void traceGetSomeRequest(std::shared_ptr<const std::string> const& sharedPtr, size_t atMost);
void traceSkipSomeRequest(std::shared_ptr<const std::string> const& body, size_t atMost);
void traceRequest(const char* rpc, std::shared_ptr<const std::string> const& sharedPtr, size_t atMost);
void traceGetSomeRequest(velocypack::Slice slice, size_t atMost);
void traceSkipSomeRequest(velocypack::Slice slice, size_t atMost);
void traceRequest(const char* rpc, velocypack::Slice slice, size_t atMost);
};
} // namespace aql

View File

@ -63,7 +63,7 @@ class SharedQueryState {
return false;
}
bool res = std::forward<F>(cb)();
std::forward<F>(cb)();
if (_hasHandler) {
if (ADB_UNLIKELY(!executeContinueCallback())) {
return false; // likely shutting down
@ -73,8 +73,7 @@ class SharedQueryState {
// simon: bad experience on macOS guard.unloack();
_condition.notify_one();
}
return res;
return true;
}
/// this has to stay for a backwards-compatible AQL HTTP API (hasMore).

View File

@ -30,11 +30,11 @@
using namespace arangodb;
using namespace arangodb::aql;
template<bool isModificationSubquery>
template <bool isModificationSubquery>
constexpr bool SubqueryExecutor<isModificationSubquery>::Properties::preservesOrder;
template<bool isModificationSubquery>
template <bool isModificationSubquery>
constexpr bool SubqueryExecutor<isModificationSubquery>::Properties::allowsBlockPassthrough;
template<bool isModificationSubquery>
template <bool isModificationSubquery>
constexpr bool SubqueryExecutor<isModificationSubquery>::Properties::inputSizeRestrictsOutputSize;
SubqueryExecutorInfos::SubqueryExecutorInfos(

View File

@ -1,43 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018-2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "WakeupQueryCallback.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/Query.h"
using namespace arangodb;
using namespace arangodb::aql;
WakeupQueryCallback::WakeupQueryCallback(ExecutionBlock* initiator, Query* query)
: _initiator(initiator), _query(query), _sharedState(query->sharedState()) {}
WakeupQueryCallback::~WakeupQueryCallback() {}
bool WakeupQueryCallback::operator()(ClusterCommResult* result) {
return _sharedState->execute([result, this]() {
TRI_ASSERT(_initiator != nullptr);
TRI_ASSERT(_query != nullptr);
// TODO Validate that _initiator and _query have not been deleted (ttl)
// TODO Handle exceptions
return _initiator->handleAsyncResult(result);
});
}

View File

@ -1,51 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018-2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_WAKEUP_QUERY_CALLBACK_H
#define ARANGOD_AQL_WAKEUP_QUERY_CALLBACK_H 1
#include "Basics/Common.h"
#include "Cluster/ClusterComm.h"
namespace arangodb {
namespace aql {
class ExecutionBlock;
class Query;
class SharedQueryState;
struct WakeupQueryCallback : public ClusterCommCallback {
WakeupQueryCallback(ExecutionBlock* initiator, Query* query);
~WakeupQueryCallback();
bool operator()(ClusterCommResult*) override;
private:
ExecutionBlock* _initiator;
Query* _query;
std::shared_ptr<SharedQueryState> _sharedState;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -342,7 +342,6 @@ set(LIB_ARANGO_AQL_SOURCES
Aql/V8Executor.cpp
Aql/Variable.cpp
Aql/VariableGenerator.cpp
Aql/WakeupQueryCallback.cpp
Aql/grammar.cpp
Aql/tokens.cpp
)
@ -504,6 +503,14 @@ set(LIB_ARANGO_V8SERVER_SOURCES
V8Server/v8-vocindex.cpp
)
set(LIB_ARANGO_NETWORK_SOURCES
Network/ClusterUtils.cpp
Network/ConnectionPool.cpp
Network/Methods.cpp
Network/NetworkFeature.cpp
Network/Utils.cpp
)
set(LIB_ARANGOSERVER_SOURCES
Actions/ActionFeature.cpp
Actions/RestActionHandler.cpp
@ -578,10 +585,6 @@ set(LIB_ARANGOSERVER_SOURCES
GeneralServer/SslServerFeature.cpp
GeneralServer/Task.cpp
GeneralServer/VstCommTask.cpp
Network/ConnectionPool.cpp
Network/Methods.cpp
Network/NetworkFeature.cpp
Network/Utils.cpp
RestHandler/RestAdminDatabaseHandler.cpp
RestHandler/RestAdminExecuteHandler.cpp
RestHandler/RestAdminLogHandler.cpp
@ -755,6 +758,10 @@ add_library(arangoserver STATIC
${ProductVersionFiles}
)
add_library(arango_network STATIC
${LIB_ARANGO_NETWORK_SOURCES}
)
add_library(arango_mmfiles STATIC
${MMFILES_SOURCES}
)
@ -803,6 +810,7 @@ target_link_libraries(arango_aql arango_geo)
target_link_libraries(arango_aql arango_graph)
target_link_libraries(arango_aql arango_indexes)
target_link_libraries(arango_aql arango_iresearch)
target_link_libraries(arango_aql arango_network)
target_link_libraries(arango_cache arango)
target_link_libraries(arango_cache boost_system)
@ -811,6 +819,7 @@ target_link_libraries(arango_cluster_engine arango_indexes)
target_link_libraries(arango_cluster_engine boost_boost)
target_link_libraries(arango_cluster_methods arango)
target_link_libraries(arango_cluster_methods arango_network)
target_link_libraries(arango_common_rest_handler arango_cluster_methods)
target_link_libraries(arango_common_rest_handler arango_utils)
@ -833,6 +842,10 @@ target_link_libraries(arango_mmfiles boost_boost)
target_link_libraries(arango_mmfiles boost_system)
target_link_libraries(arango_mmfiles llhttp)
target_link_libraries(arango_network boost_boost)
target_link_libraries(arango_network fuerte)
target_link_libraries(arango_network llhttp)
target_link_libraries(arango_pregel arango_agency)
target_link_libraries(arango_pregel boost_boost)
target_link_libraries(arango_pregel boost_system)
@ -875,6 +888,7 @@ target_link_libraries(arangoserver arango_geo)
target_link_libraries(arangoserver arango_graph)
target_link_libraries(arangoserver arango_indexes)
target_link_libraries(arangoserver arango_iresearch)
target_link_libraries(arangoserver arango_network)
target_link_libraries(arangoserver arango_pregel)
target_link_libraries(arangoserver arango_replication)
target_link_libraries(arangoserver arango_storage_engine)
@ -882,8 +896,6 @@ target_link_libraries(arangoserver arango_utils)
target_link_libraries(arangoserver arango_v8server)
target_link_libraries(arangoserver arango_vocbase)
target_link_libraries(arangoserver boost_boost)
target_link_libraries(arangoserver fuerte)
target_link_libraries(arangoserver llhttp)
target_link_libraries(arangoserver
${LINENOISE_LIBS} # Is this ever anything but empty?

View File

@ -42,6 +42,7 @@
#include "Cluster/ClusterTrxMethods.h"
#include "Futures/Utilities.h"
#include "Graph/Traverser.h"
#include "Network/ClusterUtils.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
@ -245,51 +246,6 @@ void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap cons
}
}
/// @brief Collect the results from all shards (fastpath variant)
/// All result bodies are stored in resultMap
template <typename T>
static void collectResponsesFromAllShards(
std::map<ShardID, std::vector<T>> const& shardMap,
std::vector<futures::Try<arangodb::network::Response>>& responses,
std::unordered_map<int, size_t>& errorCounter,
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>>& resultMap,
fuerte::StatusCode& code) {
// If none of the shards responds we return a SERVER_ERROR;
code = fuerte::StatusInternalError;
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
auto tmpBuilder = std::make_shared<VPackBuilder>();
// If there was no answer whatsoever, we cannot rely on the shardId
// being present in the result struct:
auto weSend = shardMap.find(sId);
TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
tmpBuilder->openObject();
tmpBuilder->add(StaticStrings::Error, VPackValue(true));
tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError));
tmpBuilder->close();
}
resultMap.emplace(sId, std::move(tmpBuilder));
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
auto tmpBuilder = std::make_shared<VPackBuilder>();
if (!slices.empty()) {
tmpBuilder->add(slices[0]);
}
resultMap.emplace(sId, std::move(tmpBuilder));
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
}
/// @brief iterate over shard responses and compile a result
/// This will take care of checking the fuerte responses. If the response has
/// a body, then the callback will be called on the body, with access to the
@ -331,47 +287,7 @@ OperationResult handleResponsesFromAllShards(
post(result, builder);
return OperationResult(result, builder.steal());
}
} // namespace
namespace arangodb {
////////////////////////////////////////////////////////////////////////////////
/// @brief merge the baby-object results.
/// The shard map contains the ordering of elements, the vector in this
/// Map is expected to be sorted from front to back.
/// The second map contains the answers for each shard.
/// The builder in the third parameter will be cleared and will contain
/// the resulting array. It is guaranteed that the resulting array
/// indexes
/// are equal to the original request ordering before it was destructured
/// for babies.
////////////////////////////////////////////////////////////////////////////////
static void mergeResults(std::vector<std::pair<ShardID, VPackValueLength>> const& reverseMapping,
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> const& resultMap,
VPackBuilder& resultBody) {
resultBody.clear();
resultBody.openArray();
for (auto const& pair : reverseMapping) {
VPackSlice arr = resultMap.find(pair.first)->second->slice();
if (arr.isObject() && arr.hasKey(StaticStrings::Error) &&
arr.get(StaticStrings::Error).isBoolean() &&
arr.get(StaticStrings::Error).getBoolean()) {
// an error occurred, now rethrow the error
int res = arr.get(StaticStrings::ErrorNum).getNumericValue<int>();
VPackSlice msg = arr.get(StaticStrings::ErrorMessage);
if (msg.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString());
} else {
THROW_ARANGO_EXCEPTION(res);
}
}
resultBody.add(arr.at(pair.second));
}
resultBody.close();
}
namespace {
// velocypack representation of object
// {"error":true,"errorMessage":"document not found","errorNum":1202}
static const char* notFoundSlice =
@ -437,12 +353,88 @@ void mergeResultsAllShards(std::vector<VPackSlice> const& results, VPackBuilder&
}
}
/// @brief handle CRUD api shard responses, slow path
template <typename F, typename CT>
OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
std::vector<Try<network::Response>> const& results) {
std::map<ShardID, VPackSlice> resultMap;
std::map<ShardID, int> shardError;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code = fuerte::StatusInternalError;
// If none of the shards responded we return a SERVER_ERROR;
for (Try<arangodb::network::Response> const& tryRes : results) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
shardError.emplace(sId, commError);
} else {
resultMap.emplace(sId, res.response->slice());
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
// merge the baby-object results. reverseMapping contains
// the ordering of elements, the vector in this
// map is expected to be sorted from front to back.
// resultMap contains the answers for each shard.
// It is guaranteed that the resulting array indexes are
// equal to the original request ordering before it was destructured
VPackBuilder resultBody;
resultBody.openArray();
for (auto const& pair : opCtx.reverseMapping) {
ShardID const& sId = pair.first;
auto const& it = resultMap.find(sId);
if (it == resultMap.end()) { // no answer from this shard
auto const& it2 = shardError.find(sId);
TRI_ASSERT(it2 != shardError.end());
auto weSend = opCtx.shardMap.find(sId);
TRI_ASSERT(weSend != opCtx.shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
resultBody.openObject(/*unindexed*/ true);
resultBody.add(StaticStrings::Error, VPackValue(true));
resultBody.add(StaticStrings::ErrorNum, VPackValue(it2->second));
resultBody.close();
}
} else {
VPackSlice arr = it->second;
// we expect an array of baby-documents, but the response might
// also be an error, if the DBServer threw a hissy fit
if (arr.isObject() && arr.hasKey(StaticStrings::Error) &&
arr.get(StaticStrings::Error).isBoolean() &&
arr.get(StaticStrings::Error).getBoolean()) {
// an error occurred, now rethrow the error
int res = arr.get(StaticStrings::ErrorNum).getNumericValue<int>();
VPackSlice msg = arr.get(StaticStrings::ErrorMessage);
if (msg.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString());
} else {
THROW_ARANGO_EXCEPTION(res);
}
}
resultBody.add(arr.at(pair.second));
}
}
resultBody.close();
return std::forward<F>(func)(code, resultBody.steal(),
std::move(opCtx.options), std::move(errorCounter));
}
/// @brief handle CRUD api shard responses, slow path
template <typename F>
OperationResult handleCRUDShardResponsesSlow(F&& func, size_t expectedLen, OperationOptions options,
std::vector<Try<network::Response>> const& responses) {
std::shared_ptr<VPackBuffer<uint8_t>> buffer;
if (expectedLen == 0) { // Only one can answer, we react a bit differently
std::shared_ptr<VPackBuffer<uint8_t>> buffer;
int nrok = 0;
int commError = TRI_ERROR_NO_ERROR;
@ -767,6 +759,8 @@ static std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>
return result;
}
namespace arangodb {
/// @brief convert ClusterComm error into arango error code
int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) {
// This function creates an error code from a ClusterCommResult,
@ -1270,7 +1264,7 @@ int selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string const
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates one or many documents in a coordinator
///
@ -1381,24 +1375,15 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
}
return network::clusterResultInsert(res.response->statusCode(),
res.response->copyPayload(), options, {});
res.response->stealPayload(), options, {});
};
return std::move(futures[0]).thenValue(cb);
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultInsert(code, resultBody.steal(), std::move(opCtx.options), errorCounter);
});
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>> results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultInsert, opCtx, results);
});
});
}
@ -1500,23 +1485,11 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
};
return std::move(futures[0]).thenValue(cb);
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
// the cluster operation was OK, however,
// the DBserver could have reported an error.
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultDelete(code, resultBody.steal(),
opCtx.options, errorCounter);
});
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultDelete, opCtx, results);
});
});
}
@ -1766,20 +1739,11 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
res.response->stealPayload(), options, {});
});
}
return futures::collectAll(std::move(futures)).thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultDocument(fuerte::StatusOK, resultBody.steal(),
std::move(opCtx.options), errorCounter);
});
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>> results) {
return handleCRUDShardResponsesFast(network::clusterResultDocument, opCtx, results);
});
});
}
@ -2427,23 +2391,11 @@ Future<OperationResult> modifyDocumentOnCoordinator(
};
return std::move(futures[0]).thenValue(cb);
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
// the cluster operation was OK, however,
// the DBserver could have reported an error.
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultModify(code, resultBody.steal(),
opCtx.options, errorCounter);
});
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultModify, opCtx, results);
});
});
}
@ -3514,6 +3466,7 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
// We keep the currently registered timestamps in Current/ServersRegistered,
// such that we can wait until all have reregistered and are up:
ci.loadCurrentDBServers();
auto const preServersKnown = ci.rebootIds();
@ -3523,7 +3476,14 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
if (!result.ok()) { // This is disaster!
return result;
}
// no need to keep connections to shut-down servers
auto const& nf = feature.server().getFeature<NetworkFeature>();
auto* pool = nf.pool();
if (pool) {
pool->drainConnections();
}
auto startTime = std::chrono::steady_clock::now();
while (true) { // will be left by a timeout
std::this_thread::sleep_for(std::chrono::seconds(1));
@ -3557,7 +3517,7 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
break;
}
}
{
VPackObjectBuilder o(&report);
report.add("previous", VPackValue(previous));

View File

@ -0,0 +1,131 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "ClusterUtils.h"
#include "Network/ConnectionPool.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Logger/LogMacros.h"
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace network {
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusCreated: {
options.waitForSync = true; // wait for sync is abused herea
// operationResult should get a return code.
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_CONFLICT);
case fuerte::StatusBadRequest:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
case fuerte::StatusConflict:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
/// @brief Create Cluster Communication result for document
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(Result(TRI_ERROR_ARANGO_CONFLICT), std::move(body),
std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
/// @brief Create Cluster Communication result for modify
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
options.waitForSync = (code == fuerte::StatusCreated);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusConflict:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED),
body, std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
}
/// @brief Create Cluster Communication result for delete
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
OperationOptions options;
options.waitForSync = (code != fuerte::StatusAccepted);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
}
} // namespace network
} // namespace arangodb

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGOD_NETWORK_CLUSTER_UTILS_H
#define ARANGOD_NETWORK_CLUSTER_UTILS_H 1
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h"
#include <fuerte/types.h>
#include <velocypack/Buffer.h>
#include <velocypack/Slice.h>
namespace arangodb {
namespace network {
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(fuerte::StatusCode responsecode,
std::shared_ptr<velocypack::Buffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
} // namespace network
} // namespace arangodb
#endif

View File

@ -46,7 +46,7 @@ ConnectionPool::~ConnectionPool() { shutdown(); }
/// @brief request a connection for a specific endpoint
/// note: it is the callers responsibility to ensure the endpoint
/// is always the same, we do not do any post-processing
ConnectionPool::Ref ConnectionPool::leaseConnection(EndpointSpec const& str) {
ConnectionPool::Ref ConnectionPool::leaseConnection(std::string const& str) {
fuerte::ConnectionBuilder builder;
builder.endpoint(str);
builder.protocolType(_config.protocol); // always overwrite protocol
@ -69,8 +69,8 @@ ConnectionPool::Ref ConnectionPool::leaseConnection(EndpointSpec const& str) {
return selectConnection(*(it->second), builder);
}
/// @brief shutdown all connections
void ConnectionPool::shutdown() {
/// @brief drain all connections
void ConnectionPool::drainConnections() {
WRITE_LOCKER(guard, _lock);
for (auto& pair : _connections) {
ConnectionList& list = *(pair.second);
@ -79,6 +79,13 @@ void ConnectionPool::shutdown() {
c->fuerte->cancel();
}
}
}
/// @brief shutdown all connections
void ConnectionPool::shutdown() {
drainConnections();
WRITE_LOCKER(guard, _lock);
_connections.clear();
}
@ -161,15 +168,15 @@ void ConnectionPool::pruneConnections() {
}
/// @brief cancel connections to this endpoint
void ConnectionPool::cancelConnections(EndpointSpec const& str) {
void ConnectionPool::cancelConnections(std::string const& endpoint) {
fuerte::ConnectionBuilder builder;
builder.endpoint(str);
builder.endpoint(endpoint);
builder.protocolType(_config.protocol); // always overwrite protocol
std::string endpoint = builder.normalizedEndpoint();
std::string normalized = builder.normalizedEndpoint();
WRITE_LOCKER(guard, _lock);
auto const& it = _connections.find(endpoint);
auto const& it = _connections.find(normalized);
if (it != _connections.end()) {
// {
// ConnectionList& list = *(it->second);

View File

@ -84,11 +84,14 @@ class ConnectionPool final {
/// @brief request a connection for a specific endpoint
/// note: it is the callers responsibility to ensure the endpoint
/// is always the same, we do not do any post-processing
Ref leaseConnection(EndpointSpec const&);
Ref leaseConnection(std::string const& endpoint);
/// @brief event loop service to create a connection seperately
/// user is responsible for correctly shutting it down
fuerte::EventLoopService& eventLoopService() { return _loop; }
/// @brief shutdown all connections
void drainConnections();
/// @brief shutdown all connections
void shutdown();
@ -97,7 +100,7 @@ class ConnectionPool final {
void pruneConnections();
/// @brief cancel connections to this endpoint
void cancelConnections(EndpointSpec const&);
void cancelConnections(std::string const& endpoint);
/// @brief return the number of open connections
size_t numOpenConnections() const;

View File

@ -106,14 +106,13 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
Response{destination, Error::Canceled, nullptr});
}
arangodb::network::EndpointSpec endpoint;
int res = resolveDestination(feature, destination, endpoint);
arangodb::network::EndpointSpec spec;
int res = resolveDestination(feature, destination, spec);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
return futures::makeFuture(
Response{destination, Error::Canceled, nullptr});
}
TRI_ASSERT(!endpoint.empty());
TRI_ASSERT(!spec.endpoint.empty());
auto req = prepareRequest(type, path, std::move(payload), timeout, std::move(headers));
@ -126,7 +125,7 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
: destination(dest), ref(std::move(r)), promise() {}
};
static_assert(sizeof(std::shared_ptr<Pack>) <= 2*sizeof(void*), "does not fit in sfo");
auto p = std::make_shared<Pack>(destination, pool->leaseConnection(endpoint));
auto p = std::make_shared<Pack>(destination, pool->leaseConnection(spec.endpoint));
auto conn = p->ref.connection();
auto f = p->promise.getFuture();
@ -189,8 +188,9 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
return; // we are done
}
arangodb::network::EndpointSpec endpoint;
int res = resolveDestination(_feature, _destination, endpoint);
// actual server endpoint is always re-evaluated
arangodb::network::EndpointSpec spec;
int res = resolveDestination(_feature, _destination, spec);
if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work
callResponse(Error::Canceled, nullptr);
return;
@ -206,7 +206,7 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
auto localTO = std::chrono::duration_cast<std::chrono::milliseconds>(_endTime - now);
TRI_ASSERT(localTO.count() > 0);
auto ref = pool->leaseConnection(endpoint);
auto ref = pool->leaseConnection(spec.endpoint);
auto req = prepareRequest(_type, _path, _payload, localTO, _headers);
auto self = RequestsState::shared_from_this();
auto cb = [self, ref](fuerte::Error err,

View File

@ -148,16 +148,18 @@ void NetworkFeature::beginShutdown() {
_workItem.reset();
}
_poolPtr.store(nullptr, std::memory_order_release);
if (_pool) {
_pool->shutdown();
_pool.reset();
if (_pool) { // first cancel all connections
_pool->drainConnections();
}
}
void NetworkFeature::stop() {
// we might have posted another workItem during shutdown.
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset();
{
// we might have posted another workItem during shutdown.
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset();
}
_pool->drainConnections();
}
arangodb::network::ConnectionPool* NetworkFeature::pool() const {

View File

@ -38,42 +38,47 @@
namespace arangodb {
namespace network {
int resolveDestination(NetworkFeature& feature, DestinationId const& dest,
std::string& endpoint) {
using namespace arangodb;
if (dest.find("tcp://") == 0 || dest.find("ssl://") == 0) {
endpoint = dest;
return TRI_ERROR_NO_ERROR; // all good
}
int resolveDestination(NetworkFeature const& feature, DestinationId const& dest,
network::EndpointSpec& spec) {
// Now look up the actual endpoint:
if (!feature.server().hasFeature<ClusterFeature>()) {
return TRI_ERROR_SHUTTING_DOWN;
}
auto& ci = feature.server().getFeature<ClusterFeature>().clusterInfo();
return resolveDestination(ci, dest, spec);
}
int resolveDestination(ClusterInfo& ci, DestinationId const& dest,
network::EndpointSpec& spec) {
using namespace arangodb;
if (dest.find("tcp://") == 0 || dest.find("ssl://") == 0) {
spec.endpoint = dest;
return TRI_ERROR_NO_ERROR; // all good
}
// This sets result.shardId, result.serverId and result.endpoint,
// depending on what dest is. Note that if a shardID is given, the
// responsible server is looked up, if a serverID is given, the endpoint
// is looked up, both can fail and immediately lead to a CL_COMM_ERROR
// state.
ServerID serverID;
if (dest.compare(0, 6, "shard:", 6) == 0) {
ShardID shardID = dest.substr(6);
spec.shardId = dest.substr(6);
{
std::shared_ptr<std::vector<ServerID>> resp = ci.getResponsibleServer(shardID);
std::shared_ptr<std::vector<ServerID>> resp = ci.getResponsibleServer(spec.shardId);
if (!resp->empty()) {
serverID = (*resp)[0];
spec.serverId = (*resp)[0];
} else {
LOG_TOPIC("60ee8", ERR, Logger::CLUSTER)
<< "cannot find responsible server for shard '" << shardID << "'";
<< "cannot find responsible server for shard '" << spec.shardId << "'";
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
}
}
LOG_TOPIC("64670", DEBUG, Logger::CLUSTER) << "Responsible server: " << serverID;
LOG_TOPIC("64670", DEBUG, Logger::CLUSTER) << "Responsible server: " << spec.serverId;
} else if (dest.compare(0, 7, "server:", 7) == 0) {
serverID = dest.substr(7);
spec.serverId = dest.substr(7);
} else {
std::string errorMessage = "did not understand destination '" + dest + "'";
LOG_TOPIC("77a84", ERR, Logger::COMMUNICATION)
@ -81,15 +86,15 @@ int resolveDestination(NetworkFeature& feature, DestinationId const& dest,
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
}
endpoint = ci.getServerEndpoint(serverID);
if (endpoint.empty()) {
if (serverID.find(',') != std::string::npos) {
spec.endpoint = ci.getServerEndpoint(spec.serverId);
if (spec.endpoint.empty()) {
if (spec.serverId.find(',') != std::string::npos) {
TRI_ASSERT(false);
}
std::string errorMessage =
"did not find endpoint of server '" + serverID + "'";
"did not find endpoint of server '" + spec.serverId + "'";
LOG_TOPIC("f29ef", ERR, Logger::COMMUNICATION)
<< "did not find endpoint of server '" << serverID << "'";
<< "did not find endpoint of server '" << spec.serverId << "'";
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
}
return TRI_ERROR_NO_ERROR;
@ -106,7 +111,7 @@ int errorCodeFromBody(arangodb::velocypack::Slice body) {
}
return TRI_ERROR_ILLEGAL_NUMBER;
}
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>> const& body,
int defaultError) {
// read the error number from the response and use it if present
@ -115,10 +120,9 @@ Result resultFromBody(std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>> con
}
return Result(defaultError);
}
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& body,
int defaultError) {
// read the error number from the response and use it if present
if (body) {
return resultFromBody(body->slice(), defaultError);
@ -126,9 +130,8 @@ Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& body
return Result(defaultError);
}
Result resultFromBody(arangodb::velocypack::Slice slice,
int defaultError) {
Result resultFromBody(arangodb::velocypack::Slice slice, int defaultError) {
// read the error number from the response and use it if present
if (slice.isObject()) {
VPackSlice num = slice.get(StaticStrings::ErrorNum);
@ -159,7 +162,7 @@ void errorCodesFromHeaders(network::Headers headers,
if (!codesSlice.isObject()) {
return;
}
for (auto const& code : VPackObjectIterator(codesSlice)) {
VPackValueLength codeLength;
char const* codeString = code.key.getString(codeLength);
@ -170,22 +173,17 @@ void errorCodesFromHeaders(network::Headers headers,
}
}
}
int fuerteToArangoErrorCode(network::Response const& res) {
return fuerteToArangoErrorCode(res.error);
}
int fuerteToArangoErrorCode(fuerte::Error err) {
// This function creates an error code from a ClusterCommResult,
namespace {
int toArangoErrorCodeInternal(fuerte::Error err) {
// This function creates an error code from a fuerte::Error,
// but only if it is a communication error. If the communication
// was successful and there was an HTTP error code, this function
// returns TRI_ERROR_NO_ERROR.
// If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED
// and .answer can safely be inspected.
// LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, err != fuerte::Error::NoError) << fuerte::to_string(err);
switch (err) {
case fuerte::Error::NoError:
return TRI_ERROR_NO_ERROR;
@ -200,111 +198,34 @@ int fuerteToArangoErrorCode(fuerte::Error err) {
case fuerte::Error::Timeout: // No reply, we give up:
return TRI_ERROR_CLUSTER_TIMEOUT;
case fuerte::Error::Canceled:
return TRI_ERROR_REQUEST_CANCELED;
case fuerte::Error::QueueCapacityExceeded: // there is no result
case fuerte::Error::ReadError:
case fuerte::Error::WriteError:
case fuerte::Error::Canceled:
case fuerte::Error::ProtocolError:
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
case fuerte::Error::VstUnauthorized:
return TRI_ERROR_FORBIDDEN;
}
return TRI_ERROR_INTERNAL;
}
} // namespace
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusCreated: {
options.waitForSync = true; // wait for sync is abused herea
// operationResult should get a return code.
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_CONFLICT);
case fuerte::StatusBadRequest:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
case fuerte::StatusConflict:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
int fuerteToArangoErrorCode(network::Response const& res) {
LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, res.error != fuerte::Error::NoError)
<< "cluster error: '" << fuerte::to_string(res.error)
<< "' from destination '" << res.destination << "'";
return toArangoErrorCodeInternal(res.error);
}
/// @brief Create Cluster Communication result for document
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(Result(TRI_ERROR_ARANGO_CONFLICT), std::move(body),
std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
/// @brief Create Cluster Communication result for modify
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
options.waitForSync = (code == fuerte::StatusCreated);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusConflict:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED),
body, std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
}
/// @brief Create Cluster Communication result for delete
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
OperationOptions options;
options.waitForSync = (code != fuerte::StatusAccepted);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
int fuerteToArangoErrorCode(fuerte::Error err) {
LOG_TOPIC_IF("abcdf", ERR, Logger::CLUSTER, err != fuerte::Error::NoError)
<< "cluster error: '" << fuerte::to_string(err) << "'";
return toArangoErrorCodeInternal(err);
}
} // namespace network
} // namespace arangodb

View File

@ -20,41 +20,39 @@
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGOD_NETWORK_UTILS_H
#define ARANGOD_NETWORK_UTILS_H 1
#include "Basics/Result.h"
#include "Network/types.h"
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h"
#include <fuerte/types.h>
#include <velocypack/Buffer.h>
#include <velocypack/Slice.h>
#include <chrono>
namespace arangodb {
namespace velocypack {
class Builder;
}
class NetworkFeature;
class ClusterInfo;
namespace network {
/// @brief resolve 'shard:' or 'server:' url to actual endpoint
int resolveDestination(NetworkFeature&, DestinationId const& dest, std::string&);
int resolveDestination(NetworkFeature const&, DestinationId const& dest, network::EndpointSpec&);
int resolveDestination(ClusterInfo&, DestinationId const& dest, network::EndpointSpec&);
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>> const& b,
int defaultError);
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& b,
int defaultError);
Result resultFromBody(arangodb::velocypack::Slice b,
int defaultError);
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& b, int defaultError);
Result resultFromBody(arangodb::velocypack::Slice b, int defaultError);
/// @brief extract the error from a cluster response
template<typename T>
OperationResult opResultFromBody(T const& body,
int defaultErrorCode) {
template <typename T>
OperationResult opResultFromBody(T const& body, int defaultErrorCode) {
return OperationResult(arangodb::network::resultFromBody(body, defaultErrorCode));
}
@ -70,24 +68,6 @@ void errorCodesFromHeaders(network::Headers headers,
int fuerteToArangoErrorCode(network::Response const& res);
int fuerteToArangoErrorCode(fuerte::Error err);
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(fuerte::StatusCode responsecode,
std::shared_ptr<velocypack::Buffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
} // namespace network
} // namespace arangodb

View File

@ -35,8 +35,12 @@ typedef std::string DestinationId;
using Headers = std::map<std::string, std::string>;
using Timeout = std::chrono::duration<double>;
/// @brief unified endpoint
typedef std::string EndpointSpec;
struct EndpointSpec {
std::string shardId;
std::string serverId;
std::string endpoint;
};
} // namespace network
} // namespace arangodb

View File

@ -459,7 +459,7 @@ class Methods {
ENTERPRISE_VIRT bool isInaccessibleCollection(std::string const& /*cid*/) const {
return false;
}
static int validateSmartJoinAttribute(LogicalCollection const& collinfo,
arangodb::velocypack::Slice value);

View File

@ -143,7 +143,7 @@ arangodb::Result Databases::grantCurrentUser(CreateDatabaseInfo const& info, int
// If the current user is empty (which happens if a Maintenance job
// called us, or when authentication is off), granting rights
// will fail. We hence ignore it here, but issue a warning below
if (!exec.isSuperuser()) {
if (!exec.isAdminUser()) {
auto const endTime = std::chrono::steady_clock::now() + std::chrono::seconds(timeout);
while (true) {
res = um->updateUser(exec.user(), [&](auth::User& entry) {

View File

@ -28,8 +28,9 @@ function testSuite() {
const name = "TestAuthAnalyzer";
if(!users.exists('bob'))
if (!users.exists('bob')) {
users.save(user, ''); // password must be empty otherwise switchUser will not work
}
// analyzers can only be changed from the `_system` database
// analyzer API does not support database selection via the usual `_db/<dbname>/_api/<api>`