1
0
Fork 0

Use fuerte in RemoteExecutor (#10077)

This commit is contained in:
Simon 2019-09-27 16:20:38 +02:00 committed by Jan
parent a088678866
commit cb7bf0314b
32 changed files with 610 additions and 733 deletions

View File

@ -182,7 +182,7 @@ class Request final : public Message {
/////////////////////////////////////////////// ///////////////////////////////////////////////
// add payload // add payload
/////////////////////////////////////////////// ///////////////////////////////////////////////
void addVPack(velocypack::Slice const& slice); void addVPack(velocypack::Slice const slice);
void addVPack(velocypack::Buffer<uint8_t> const& buffer); void addVPack(velocypack::Buffer<uint8_t> const& buffer);
void addVPack(velocypack::Buffer<uint8_t>&& buffer); void addVPack(velocypack::Buffer<uint8_t>&& buffer);
void addBinary(uint8_t const* data, std::size_t length); void addBinary(uint8_t const* data, std::size_t length);

View File

@ -56,7 +56,7 @@ std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path, std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameter, StringMap const& parameter,
velocypack::Slice const& payload); velocypack::Slice const payload);
std::unique_ptr<Request> createRequest( std::unique_ptr<Request> createRequest(
RestVerb verb, std::string const& path, RestVerb verb, std::string const& path,

View File

@ -180,7 +180,7 @@ constexpr std::chrono::milliseconds Request::defaultTimeout;
ContentType Request::acceptType() const { return header.acceptType(); } ContentType Request::acceptType() const { return header.acceptType(); }
//// add payload add VelocyPackData //// add payload add VelocyPackData
void Request::addVPack(VPackSlice const& slice) { void Request::addVPack(VPackSlice const slice) {
#ifdef FUERTE_CHECKED_MODE #ifdef FUERTE_CHECKED_MODE
// FUERTE_LOG_ERROR << "Checking data that is added to the message: " << // FUERTE_LOG_ERROR << "Checking data that is added to the message: " <<
// std::endl; // std::endl;

View File

@ -48,7 +48,7 @@ std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path, std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameters, StringMap const& parameters,
VPackSlice const& payload) { VPackSlice const payload) {
auto request = createRequest(verb, ContentType::VPack); auto request = createRequest(verb, ContentType::VPack);
request->header.path = path; request->header.path = path;
request->header.parameters = parameters; request->header.parameters = parameters;

View File

@ -71,11 +71,3 @@ SharedAqlItemBlockPtr itemBlock::concatenate(AqlItemBlockManager& manager,
return res; return res;
} }
void itemBlock::forRowInBlock(SharedAqlItemBlockPtr const& block,
std::function<void(InputAqlItemRow&&)> const& callback) {
TRI_ASSERT(block != nullptr);
for (std::size_t index = 0; index < block->size(); ++index) {
callback(InputAqlItemRow{block, index});
}
}

View File

@ -40,10 +40,6 @@ namespace itemBlock {
/// set to nullptr, just to be sure. /// set to nullptr, just to be sure.
SharedAqlItemBlockPtr concatenate(AqlItemBlockManager&, SharedAqlItemBlockPtr concatenate(AqlItemBlockManager&,
std::vector<SharedAqlItemBlockPtr>& blocks); std::vector<SharedAqlItemBlockPtr>& blocks);
void forRowInBlock(SharedAqlItemBlockPtr const& block,
std::function<void(InputAqlItemRow&&)> const& callback);
} // namespace itemBlock } // namespace itemBlock
} // namespace aql } // namespace aql

View File

@ -44,8 +44,8 @@ std::shared_ptr<AqlTransaction> AqlTransaction::create(
std::unordered_set<std::string> inaccessibleCollections) { std::unordered_set<std::string> inaccessibleCollections) {
#ifdef USE_ENTERPRISE #ifdef USE_ENTERPRISE
if (options.skipInaccessibleCollections) { if (options.skipInaccessibleCollections) {
return std::make_shared<transaction::IgnoreNoAccessAqlTransaction>(transactionContext, collections, return std::make_shared<transaction::IgnoreNoAccessAqlTransaction>(
options, isMainTransaction, transactionContext, collections, options, isMainTransaction,
std::move(inaccessibleCollections)); std::move(inaccessibleCollections));
} }
#endif #endif

View File

@ -32,7 +32,6 @@
#include "Aql/ExecutionStats.h" #include "Aql/ExecutionStats.h"
#include "Aql/InputAqlItemRow.h" #include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/WakeupQueryCallback.h"
#include "Basics/Exceptions.h" #include "Basics/Exceptions.h"
#include "Basics/StaticStrings.h" #include "Basics/StaticStrings.h"
#include "Basics/StringBuffer.h" #include "Basics/StringBuffer.h"

View File

@ -92,8 +92,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi
// It might in fact be empty, if we only have edge collections in a graph. // It might in fact be empty, if we only have edge collections in a graph.
// Or if we guarantee to never read vertex data. // Or if we guarantee to never read vertex data.
for (auto const& col : vertices) { for (auto const& col : vertices) {
auto shards = getAllLocalShards(shardMapping, server, auto shards = getAllLocalShards(shardMapping, server, col->shardIds(restrictToShards));
col->shardIds(restrictToShards));
#ifdef USE_ENTERPRISE #ifdef USE_ENTERPRISE
for (auto const& s : shards) { for (auto const& s : shards) {
if (query.trx()->isInaccessibleCollectionId(col->getPlanId())) { if (query.trx()->isInaccessibleCollectionId(col->getPlanId())) {

View File

@ -276,13 +276,6 @@ ExecutionNode const* ExecutionBlock::getPlanNode() const { return _exeNode; }
transaction::Methods* ExecutionBlock::transaction() const { return _trx; } transaction::Methods* ExecutionBlock::transaction() const { return _trx; }
bool ExecutionBlock::handleAsyncResult(ClusterCommResult* result) {
// This indicates that a node uses async functionality
// but does not react to the response.
TRI_ASSERT(false);
return true;
}
void ExecutionBlock::addDependency(ExecutionBlock* ep) { void ExecutionBlock::addDependency(ExecutionBlock* ep) {
TRI_ASSERT(ep != nullptr); TRI_ASSERT(ep != nullptr);
// We can never have the same dependency twice // We can never have the same dependency twice

View File

@ -121,11 +121,6 @@ class ExecutionBlock {
transaction::Methods* transaction() const; transaction::Methods* transaction() const;
// @brief Will be called on the querywakeup callback with the
// result collected over the network. Needs to be implemented
// on all nodes that use this mechanism.
virtual bool handleAsyncResult(ClusterCommResult* result);
/// @brief add a dependency /// @brief add a dependency
void addDependency(ExecutionBlock* ep); void addDependency(ExecutionBlock* ep);

View File

@ -195,7 +195,10 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats&
RegisterId const inReg = info._input1RegisterId; RegisterId const inReg = info._input1RegisterId;
TRI_ASSERT(_block != nullptr); TRI_ASSERT(_block != nullptr);
itemBlock::forRowInBlock(_block, [this, inReg, &info](InputAqlItemRow&& row) {
for (std::size_t index = 0; index < _block->size(); ++index) {
InputAqlItemRow row{_block, index};
auto const& inVal = row.getValue(inReg); auto const& inVal = row.getValue(inReg);
if (!info._consultAqlWriteFilter || if (!info._consultAqlWriteFilter ||
!info._aqlCollection->getCollection()->skipForAqlWrite(inVal.slice(), !info._aqlCollection->getCollection()->skipForAqlWrite(inVal.slice(),
@ -207,7 +210,7 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats&
// not relevant for ourselves... just pass it on to the next block // not relevant for ourselves... just pass it on to the next block
_operations.push_back(ModOperationType::IGNORE_RETURN); _operations.push_back(ModOperationType::IGNORE_RETURN);
} }
}); }
TRI_ASSERT(_operations.size() == _block->size()); TRI_ASSERT(_operations.size() == _block->size());
@ -322,8 +325,8 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats&
RegisterId const inReg = info._input1RegisterId; RegisterId const inReg = info._input1RegisterId;
TRI_ASSERT(_block != nullptr); TRI_ASSERT(_block != nullptr);
itemBlock::forRowInBlock(_block, [this, &stats, &errorCode, &key, &rev, trx, for (std::size_t index = 0; index < _block->size(); ++index) {
inReg, &info](InputAqlItemRow&& row) { InputAqlItemRow row{_block, index};
auto const& inVal = row.getValue(inReg); auto const& inVal = row.getValue(inReg);
if (!info._consultAqlWriteFilter || if (!info._consultAqlWriteFilter ||
@ -363,7 +366,7 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats&
_operations.push_back(ModOperationType::IGNORE_RETURN); _operations.push_back(ModOperationType::IGNORE_RETURN);
this->_last_not_skip = _operations.size(); this->_last_not_skip = _operations.size();
} }
}); }
TRI_ASSERT(_operations.size() == _block->size()); TRI_ASSERT(_operations.size() == _block->size());
@ -462,9 +465,9 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats&
RegisterId const insertReg = info._input2RegisterId; RegisterId const insertReg = info._input2RegisterId;
RegisterId const updateReg = info._input3RegisterId; RegisterId const updateReg = info._input3RegisterId;
itemBlock::forRowInBlock(_block, [this, &stats, &errorCode, &errorMessage, for (std::size_t index = 0; index < _block->size(); ++index) {
&key, trx, inDocReg, insertReg, updateReg, InputAqlItemRow row{_block, index};
&info](InputAqlItemRow&& row) {
errorMessage.clear(); errorMessage.clear();
errorCode = TRI_ERROR_NO_ERROR; errorCode = TRI_ERROR_NO_ERROR;
auto const& inVal = row.getValue(inDocReg); auto const& inVal = row.getValue(inDocReg);
@ -524,7 +527,7 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats&
_operations.push_back(ModOperationType::IGNORE_SKIP); _operations.push_back(ModOperationType::IGNORE_SKIP);
handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage); handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage);
} }
}); }
TRI_ASSERT(_operations.size() == _block->size()); TRI_ASSERT(_operations.size() == _block->size());
@ -659,9 +662,9 @@ bool UpdateReplace<ModType>::doModifications(ModificationExecutorInfos& info,
RegisterId const keyReg = info._input2RegisterId; RegisterId const keyReg = info._input2RegisterId;
bool const hasKeyVariable = keyReg != RegisterPlan::MaxRegisterId; bool const hasKeyVariable = keyReg != RegisterPlan::MaxRegisterId;
itemBlock::forRowInBlock(_block, [this, &options, &stats, &errorCode, for (std::size_t index = 0; index < _block->size(); ++index) {
&errorMessage, &key, &rev, trx, inDocReg, keyReg, InputAqlItemRow row{_block, index};
hasKeyVariable, &info](InputAqlItemRow&& row) {
auto const& inVal = row.getValue(inDocReg); auto const& inVal = row.getValue(inDocReg);
errorCode = TRI_ERROR_NO_ERROR; errorCode = TRI_ERROR_NO_ERROR;
errorMessage.clear(); errorMessage.clear();
@ -718,7 +721,7 @@ bool UpdateReplace<ModType>::doModifications(ModificationExecutorInfos& info,
_operations.push_back(ModOperationType::IGNORE_SKIP); _operations.push_back(ModOperationType::IGNORE_SKIP);
handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage); handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage);
} }
}); }
TRI_ASSERT(_operations.size() == _block->size()); TRI_ASSERT(_operations.size() == _block->size());

View File

@ -482,8 +482,8 @@ ExecutionPlan* Query::preparePlan() {
#endif #endif
auto trx = AqlTransaction::create(std::move(ctx), _collections.collections(), auto trx = AqlTransaction::create(std::move(ctx), _collections.collections(),
_queryOptions.transactionOptions, _queryOptions.transactionOptions, _part == PART_MAIN,
_part == PART_MAIN, std::move(inaccessibleCollections)); std::move(inaccessibleCollections));
// create the transaction object, but do not start it yet // create the transaction object, but do not start it yet
_trx = trx; _trx = trx;
_trx->addHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL); // only used on toplevel _trx->addHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL); // only used on toplevel

View File

@ -27,14 +27,20 @@
#include "Aql/ExecutorInfos.h" #include "Aql/ExecutorInfos.h"
#include "Aql/InputAqlItemRow.h" #include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/WakeupQueryCallback.h"
#include "Basics/MutexLocker.h" #include "Basics/MutexLocker.h"
#include "Basics/RecursiveLocker.h" #include "Basics/RecursiveLocker.h"
#include "Basics/StringBuffer.h" #include "Basics/StringBuffer.h"
#include "Basics/VelocyPackHelper.h" #include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ServerState.h" #include "Cluster/ServerState.h"
#include "Logger/LogMacros.h"
#include "Network/ConnectionPool.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Rest/CommonDefines.h" #include "Rest/CommonDefines.h"
#include "VocBase/vocbase.h"
#include <fuerte/connection.h>
#include <fuerte/requests.h>
#include <velocypack/Iterator.h> #include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h> #include <velocypack/velocypack-aliases.h>
@ -44,8 +50,10 @@ using namespace arangodb::aql;
using arangodb::basics::VelocyPackHelper; using arangodb::basics::VelocyPackHelper;
namespace {
/// @brief timeout /// @brief timeout
double const ExecutionBlockImpl<RemoteExecutor>::defaultTimeOut = 3600.0; constexpr std::chrono::seconds kDefaultTimeOutSecs(3600);
} // namespace
ExecutionBlockImpl<RemoteExecutor>::ExecutionBlockImpl( ExecutionBlockImpl<RemoteExecutor>::ExecutionBlockImpl(
ExecutionEngine* engine, RemoteNode const* node, ExecutorInfos&& infos, ExecutionEngine* engine, RemoteNode const* node, ExecutorInfos&& infos,
@ -57,9 +65,8 @@ ExecutionBlockImpl<RemoteExecutor>::ExecutionBlockImpl(
_ownName(ownName), _ownName(ownName),
_queryId(queryId), _queryId(queryId),
_isResponsibleForInitializeCursor(node->isResponsibleForInitializeCursor()), _isResponsibleForInitializeCursor(node->isResponsibleForInitializeCursor()),
_lastResponse(nullptr),
_lastError(TRI_ERROR_NO_ERROR), _lastError(TRI_ERROR_NO_ERROR),
_lastTicketId(0), _lastTicket(0),
_hasTriggeredShutdown(false) { _hasTriggeredShutdown(false) {
TRI_ASSERT(!queryId.empty()); TRI_ASSERT(!queryId.empty());
TRI_ASSERT((arangodb::ServerState::instance()->isCoordinator() && ownName.empty()) || TRI_ASSERT((arangodb::ServerState::instance()->isCoordinator() && ownName.empty()) ||
@ -101,13 +108,13 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
TRI_ASSERT(_lastError.ok()); TRI_ASSERT(_lastError.ok());
// We do not have an error but a result, all is good // We do not have an error but a result, all is good
// We have an open result still. // We have an open result still.
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody(); auto response = std::move(_lastResponse);
// Result is the response which will be a serialized AqlItemBlock // Result is the response which will be a serialized AqlItemBlock
// both must be reset before return or throw // both must be reset before return or throw
TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr);
VPackSlice responseBody = responseBodyBuilder->slice(); VPackSlice responseBody = response->slice();
ExecutionState state = ExecutionState::HASMORE; ExecutionState state = ExecutionState::HASMORE;
if (VelocyPackHelper::getBooleanValue(responseBody, "done", true)) { if (VelocyPackHelper::getBooleanValue(responseBody, "done", true)) {
@ -123,16 +130,20 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
} }
// We need to send a request here // We need to send a request here
VPackBuilder builder; VPackBuffer<uint8_t> buffer;
{
VPackBuilder builder(buffer);
builder.openObject(); builder.openObject();
builder.add("atMost", VPackValue(atMost)); builder.add("atMost", VPackValue(atMost));
builder.close(); builder.close();
traceGetSomeRequest(builder.slice(), atMost);
}
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/getSome/",
std::move(buffer));
auto bodyString = std::make_shared<std::string const>(builder.slice().toJson());
traceGetSomeRequest(bodyString, atMost);
auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/getSome/", bodyString);
if (!res.ok()) { if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
} }
@ -160,12 +171,12 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWi
// We have an open result still. // We have an open result still.
// Result is the response which will be a serialized AqlItemBlock // Result is the response which will be a serialized AqlItemBlock
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody(); auto response = std::move(_lastResponse);
// both must be reset before return or throw // both must be reset before return or throw
TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr);
VPackSlice slice = responseBodyBuilder->slice(); VPackSlice slice = response->slice();
if (!slice.hasKey(StaticStrings::Error) || slice.get(StaticStrings::Error).getBoolean()) { if (!slice.hasKey(StaticStrings::Error) || slice.get(StaticStrings::Error).getBoolean()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION); THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION);
@ -190,16 +201,17 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWi
// For every call we simply forward via HTTP // For every call we simply forward via HTTP
VPackBuilder builder; VPackBuffer<uint8_t> buffer;
builder.openObject(); {
VPackBuilder builder(buffer);
builder.openObject(/*unindexed*/ true);
builder.add("atMost", VPackValue(atMost)); builder.add("atMost", VPackValue(atMost));
builder.close(); builder.close();
traceSkipSomeRequest(builder.slice(), atMost);
}
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/skipSome/",
std::move(buffer));
auto bodyString = std::make_shared<std::string const>(builder.slice().toJson());
traceSkipSomeRequest(bodyString, atMost);
auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/skipSome/", bodyString);
if (!res.ok()) { if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
} }
@ -224,10 +236,10 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
if (_lastResponse != nullptr || _lastError.fail()) { if (_lastResponse != nullptr || _lastError.fail()) {
// We have an open result still. // We have an open result still.
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody(); auto response = std::move(_lastResponse);
// Result is the response which is an object containing the ErrorCode // Result is the response which is an object containing the ErrorCode
VPackSlice slice = responseBodyBuilder->slice(); VPackSlice slice = response->slice();
if (slice.hasKey("code")) { if (slice.hasKey("code")) {
return {ExecutionState::DONE, slice.get("code").getNumericValue<int>()}; return {ExecutionState::DONE, slice.get("code").getNumericValue<int>()};
} }
@ -238,8 +250,9 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
options.buildUnindexedArrays = true; options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true; options.buildUnindexedObjects = true;
VPackBuilder builder(&options); VPackBuffer<uint8_t> buffer;
builder.openObject(); VPackBuilder builder(buffer, &options);
builder.openObject(/*unindexed*/ true);
// Backwards Compatibility 3.3 // Backwards Compatibility 3.3
// NOTE: Removing this breaks tests in current devel - is this really for // NOTE: Removing this breaks tests in current devel - is this really for
@ -252,16 +265,14 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
// Now only the one output row is send. // Now only the one output row is send.
builder.add("pos", VPackValue(0)); builder.add("pos", VPackValue(0));
builder.add(VPackValue("items")); builder.add(VPackValue("items"));
builder.openObject(); builder.openObject(/*unindexed*/ true);
input.toVelocyPack(_engine->getQuery()->trx(), builder); input.toVelocyPack(_engine->getQuery()->trx(), builder);
builder.close(); builder.close();
builder.close(); builder.close();
auto bodyString = std::make_shared<std::string const>(builder.slice().toJson()); auto res = sendAsyncRequest(fuerte::RestVerb::Put,
"/_api/aql/initializeCursor/", std::move(buffer));
auto res = sendAsyncRequest(rest::RequestType::PUT,
"/_api/aql/initializeCursor/", bodyString);
if (!res.ok()) { if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
} }
@ -277,20 +288,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
} }
if (!_hasTriggeredShutdown) { if (!_hasTriggeredShutdown) {
// Make sure to cover against the race that the request std::lock_guard<std::mutex> guard(_communicationMutex);
// in flight is not overtaking in the drop phase here. _lastTicket = 0;
// After this lock is released even a response
// will be discarded in the handle response code
MUTEX_LOCKER(locker, _communicationMutex);
if (_lastTicketId != 0) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr only happens on controlled shutdown
return {ExecutionState::DONE, TRI_ERROR_SHUTTING_DOWN};
}
cc->drop(0, _lastTicketId, "");
}
_lastTicketId = 0;
_lastError.reset(TRI_ERROR_NO_ERROR); _lastError.reset(TRI_ERROR_NO_ERROR);
_lastResponse.reset(); _lastResponse.reset();
_hasTriggeredShutdown = true; _hasTriggeredShutdown = true;
@ -318,12 +317,12 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
if (_lastResponse != nullptr) { if (_lastResponse != nullptr) {
TRI_ASSERT(_lastError.ok()); TRI_ASSERT(_lastError.ok());
std::shared_ptr<VPackBuilder> responseBodyBuilder = stealResultBody(); auto response = std::move(_lastResponse);
// both must be reset before return or throw // both must be reset before return or throw
TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr);
VPackSlice slice = responseBodyBuilder->slice(); VPackSlice slice = response->slice();
if (slice.isObject()) { if (slice.isObject()) {
if (slice.hasKey("stats")) { if (slice.hasKey("stats")) {
ExecutionStats newStats(slice.get("stats")); ExecutionStats newStats(slice.get("stats"));
@ -357,187 +356,127 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
_didSendShutdownRequest = true; _didSendShutdownRequest = true;
#endif #endif
// For every call we simply forward via HTTP // For every call we simply forward via HTTP
VPackBuilder bodyBuilder; VPackBuffer<uint8_t> buffer;
bodyBuilder.openObject(); VPackBuilder builder(buffer);
bodyBuilder.add("code", VPackValue(errorCode)); builder.openObject(/*unindexed*/ true);
bodyBuilder.close(); builder.add("code", VPackValue(errorCode));
builder.close();
auto bodyString = std::make_shared<std::string const>(bodyBuilder.slice().toJson()); auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/shutdown/",
std::move(buffer));
auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/shutdown/", bodyString);
if (!res.ok()) { if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
} }
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
} }
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest( namespace {
arangodb::rest::RequestType type, std::string const& urlPart, Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
std::shared_ptr<std::string const> body) { fuerte::Response* response) {
auto cc = ClusterComm::instance(); TRI_ASSERT(err != fuerte::Error::NoError || response->statusCode() >= 400);
if (cc == nullptr) {
std::string msg;
if (spec.shardId.empty()) {
msg.append("Error message received from cluster node '")
.append(spec.serverId)
.append("': ");
} else {
msg.append("Error message received from shard '")
.append(spec.shardId)
.append("' on cluster node '")
.append(spec.serverId)
.append("': ");
}
int res = TRI_ERROR_INTERNAL;
if (err != fuerte::Error::NoError) {
res = network::fuerteToArangoErrorCode(err);
msg.append(TRI_errno_string(res));
} else {
VPackSlice slice = response->slice();
if (slice.isObject()) {
VPackSlice err = slice.get(StaticStrings::Error);
if (err.isBool() && err.getBool()) {
res = VelocyPackHelper::readNumericValue(slice, StaticStrings::ErrorNum, res);
VPackStringRef ref =
VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage,
"(no valid error in response)");
msg.append(ref.data(), ref.size());
}
}
}
return Result(res, std::move(msg));
}
} // namespace
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type,
std::string const& urlPart,
VPackBuffer<uint8_t> body) {
NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
if (!pool) {
// nullptr only happens on controlled shutdown // nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN}; return {TRI_ERROR_SHUTTING_DOWN};
} }
// Later, we probably want to set these sensibly:
CoordTransactionID const coordTransactionId = TRI_NewTickServer();
std::unordered_map<std::string, std::string> headers;
if (!_ownName.empty()) {
headers.emplace("Shard-Id", _ownName);
}
std::string url = std::string("/_db/") + std::string url = std::string("/_db/") +
arangodb::basics::StringUtils::urlEncode( arangodb::basics::StringUtils::urlEncode(
_engine->getQuery()->trx()->vocbase().name()) + _engine->getQuery()->vocbase().name()) +
urlPart + _queryId; urlPart + _queryId;
arangodb::network::EndpointSpec spec;
int res = network::resolveDestination(nf, _server, spec);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
return Result(res);
}
TRI_ASSERT(!spec.endpoint.empty());
auto req = fuerte::createRequest(type, url, {}, std::move(body));
// Later, we probably want to set these sensibly:
req->timeout(kDefaultTimeOutSecs);
if (!_ownName.empty()) {
req->header.addMeta("Shard-Id", _ownName);
}
network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint);
std::lock_guard<std::mutex> guard(_communicationMutex);
unsigned ticket = ++_lastTicket;
std::shared_ptr<fuerte::Connection> conn = ref.connection();
conn->sendRequest(std::move(req),
[=, ref(std::move(ref))](fuerte::Error err,
std::unique_ptr<fuerte::Request>,
std::unique_ptr<fuerte::Response> res) {
_query.sharedState()->execute([&] { // notifies outside
std::lock_guard<std::mutex> guard(_communicationMutex);
if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
}
});
});
++_engine->_stats.requests; ++_engine->_stats.requests;
std::shared_ptr<ClusterCommCallback> callback =
std::make_shared<WakeupQueryCallback>(this, _engine->getQuery());
// Make sure to cover against the race that this
// Request is fullfilled before the register has taken place
// @note the only reason for not using recursive mutext always is due to the
// concern that there might be recursive calls in production
#ifdef ARANGODB_USE_GOOGLE_TESTS
RECURSIVE_MUTEX_LOCKER(_communicationMutex, _communicationMutexOwner);
#else
MUTEX_LOCKER(locker, _communicationMutex);
#endif
// We can only track one request at a time.
// So assert there is no other request in flight!
TRI_ASSERT(_lastTicketId == 0);
_lastTicketId =
cc->asyncRequest(coordTransactionId, _server, type, url, std::move(body),
headers, callback, defaultTimeOut, true);
return {TRI_ERROR_NO_ERROR}; return {TRI_ERROR_NO_ERROR};
} }
bool ExecutionBlockImpl<RemoteExecutor>::handleAsyncResult(ClusterCommResult* result) {
// So we cannot have the response being produced while sending the request.
// Make sure to cover against the race that this
// Request is fullfilled before the register has taken place
// @note the only reason for not using recursive mutext always is due to the
// concern that there might be recursive calls in production
#ifdef ARANGODB_USE_GOOGLE_TESTS
RECURSIVE_MUTEX_LOCKER(_communicationMutex, _communicationMutexOwner);
#else
MUTEX_LOCKER(locker, _communicationMutex);
#endif
if (_lastTicketId == result->operationID) {
// TODO Handle exceptions thrown while we are in this code
// Query will not be woken up again.
_lastError = handleCommErrors(result);
if (_lastError.ok()) {
_lastResponse = result->result;
}
_lastTicketId = 0;
}
return true;
}
arangodb::Result ExecutionBlockImpl<RemoteExecutor>::handleCommErrors(ClusterCommResult* res) const {
if (res->status == CL_COMM_TIMEOUT || res->status == CL_COMM_BACKEND_UNAVAILABLE) {
return {res->getErrorCode(), res->stringifyErrorMessage()};
}
if (res->status == CL_COMM_ERROR) {
std::string errorMessage;
auto const& shardID = res->shardID;
if (shardID.empty()) {
errorMessage = std::string("Error message received from cluster node '") +
std::string(res->serverID) + std::string("': ");
} else {
errorMessage = std::string("Error message received from shard '") +
std::string(shardID) + std::string("' on cluster node '") +
std::string(res->serverID) + std::string("': ");
}
int errorNum = TRI_ERROR_INTERNAL;
if (res->result != nullptr) {
errorNum = TRI_ERROR_NO_ERROR;
arangodb::basics::StringBuffer const& responseBodyBuf(res->result->getBody());
std::shared_ptr<VPackBuilder> builder =
VPackParser::fromJson(responseBodyBuf.c_str(), responseBodyBuf.length());
VPackSlice slice = builder->slice();
if (!slice.hasKey(StaticStrings::Error) ||
slice.get(StaticStrings::Error).getBoolean()) {
errorNum = TRI_ERROR_INTERNAL;
}
if (slice.isObject()) {
VPackSlice v = slice.get(StaticStrings::ErrorNum);
if (v.isNumber()) {
if (v.getNumericValue<int>() != TRI_ERROR_NO_ERROR) {
/* if we've got an error num, error has to be true. */
TRI_ASSERT(errorNum == TRI_ERROR_INTERNAL);
errorNum = v.getNumericValue<int>();
}
}
v = slice.get(StaticStrings::ErrorMessage);
if (v.isString()) {
errorMessage += v.copyString();
} else {
errorMessage += std::string("(no valid error in response)");
}
}
}
// In this case a proper HTTP error was reported by the DBserver,
if (errorNum > 0 && !errorMessage.empty()) {
return {errorNum, errorMessage};
}
// default error
return {TRI_ERROR_CLUSTER_AQL_COMMUNICATION};
}
TRI_ASSERT(res->status == CL_COMM_SENT);
return {TRI_ERROR_NO_ERROR};
}
/**
* @brief Steal the last returned body. Will throw an error if
* there has been an error of any kind, e.g. communication
* or error created by remote server.
* Will reset the lastResponse, so after this call we are
* ready to send a new request.
*
* @return A shared_ptr containing the remote response.
*/
std::shared_ptr<VPackBuilder> ExecutionBlockImpl<RemoteExecutor>::stealResultBody() {
// NOTE: This cannot participate in the race in communication.
// This will not be called after the MUTEX to send was released.
// It can only be called by the next getSome call.
// This getSome however is locked by the QueryRegistery several layers above
if (!_lastError.ok()) {
THROW_ARANGO_EXCEPTION(_lastError);
}
// We have an open result still.
// Result is the response which is an object containing the ErrorCode
std::shared_ptr<VPackBuilder> responseBodyBuilder = _lastResponse->getBodyVelocyPack();
_lastResponse.reset();
return responseBodyBuilder;
}
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest( void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(
std::shared_ptr<const std::string> const& body, size_t const atMost) { VPackSlice slice, size_t const atMost) {
traceRequest("getSome", body, atMost); traceRequest("getSome", slice, atMost);
} }
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest( void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(
std::shared_ptr<const std::string> const& body, size_t const atMost) { VPackSlice slice, size_t const atMost) {
traceRequest("skipSome", body, atMost); traceRequest("skipSome", slice, atMost);
} }
void ExecutionBlockImpl<RemoteExecutor>::traceRequest( void ExecutionBlockImpl<RemoteExecutor>::traceRequest(
const char* rpc, std::shared_ptr<const std::string> const& sharedPtr, size_t atMost) { const char* rpc, VPackSlice slice, size_t atMost) {
if (_profile >= PROFILE_LEVEL_TRACE_1) { if (_profile >= PROFILE_LEVEL_TRACE_1) {
auto const queryId = this->_engine->getQuery()->id(); auto const queryId = this->_engine->getQuery()->id();
auto const remoteQueryId = _queryId; auto const remoteQueryId = _queryId;

View File

@ -24,12 +24,13 @@
#define ARANGOD_AQL_REMOTE_EXECUTOR_H #define ARANGOD_AQL_REMOTE_EXECUTOR_H
#include "Aql/ClusterNodes.h" #include "Aql/ClusterNodes.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutorInfos.h" #include "Aql/ExecutorInfos.h"
#include "Basics/Mutex.h" #include "Aql/ExecutionBlockImpl.h"
#include "Cluster/ClusterComm.h"
#include "Rest/CommonDefines.h" #include <mutex>
#include "SimpleHttpClient/SimpleHttpResult.h"
#include <fuerte/message.h>
#include <fuerte/types.h>
namespace arangodb { namespace arangodb {
namespace aql { namespace aql {
@ -61,9 +62,6 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
std::pair<ExecutionState, Result> shutdown(int errorCode) override; std::pair<ExecutionState, Result> shutdown(int errorCode) override;
/// @brief handleAsyncResult
bool handleAsyncResult(ClusterCommResult* result) override;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// only for asserts: // only for asserts:
public: public:
@ -81,27 +79,12 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
Query const& getQuery() const { return _query; } Query const& getQuery() const { return _query; }
/**
* @brief Handle communication errors in Async case.
*
* @param result The network response we got from cluster comm.
*
* @return A wrapped Result Object, that is either ok() or contains
* the error information to be thrown in get/skip some.
*/
arangodb::Result handleCommErrors(ClusterCommResult* result) const;
/// @brief internal method to send a request. Will register a callback to be /// @brief internal method to send a request. Will register a callback to be
/// reactivated /// reactivated
arangodb::Result sendAsyncRequest(rest::RequestType type, std::string const& urlPart, arangodb::Result sendAsyncRequest(fuerte::RestVerb type, std::string const& urlPart,
std::shared_ptr<std::string const> body); velocypack::Buffer<uint8_t> body);
std::shared_ptr<velocypack::Builder> stealResultBody();
private: private:
/// @brief timeout
static double const defaultTimeOut;
ExecutorInfos _infos; ExecutorInfos _infos;
Query const& _query; Query const& _query;
@ -122,19 +105,14 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
/// @brief the last unprocessed result. Make sure to reset it /// @brief the last unprocessed result. Make sure to reset it
/// after it is processed. /// after it is processed.
std::shared_ptr<httpclient::SimpleHttpResult> _lastResponse; std::unique_ptr<fuerte::Response> _lastResponse;
/// @brief the last remote response Result object, may contain an error. /// @brief the last remote response Result object, may contain an error.
arangodb::Result _lastError; arangodb::Result _lastError;
/// @brief Mutex to cover against the race, that a getSome request std::mutex _communicationMutex;
/// is responded before the ticket id is registered.
arangodb::Mutex _communicationMutex;
#ifdef ARANGODB_USE_GOOGLE_TESTS
std::atomic<std::thread::id> _communicationMutexOwner; // current thread owning '_communicationMutex' lock (workaround for non-recusrive MutexLocker)
#endif
OperationID _lastTicketId; unsigned _lastTicket; /// used to check for canceled requests
bool _hasTriggeredShutdown; bool _hasTriggeredShutdown;
@ -142,9 +120,9 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
bool _didSendShutdownRequest = false; bool _didSendShutdownRequest = false;
#endif #endif
void traceGetSomeRequest(std::shared_ptr<const std::string> const& sharedPtr, size_t atMost); void traceGetSomeRequest(velocypack::Slice slice, size_t atMost);
void traceSkipSomeRequest(std::shared_ptr<const std::string> const& body, size_t atMost); void traceSkipSomeRequest(velocypack::Slice slice, size_t atMost);
void traceRequest(const char* rpc, std::shared_ptr<const std::string> const& sharedPtr, size_t atMost); void traceRequest(const char* rpc, velocypack::Slice slice, size_t atMost);
}; };
} // namespace aql } // namespace aql

View File

@ -63,7 +63,7 @@ class SharedQueryState {
return false; return false;
} }
bool res = std::forward<F>(cb)(); std::forward<F>(cb)();
if (_hasHandler) { if (_hasHandler) {
if (ADB_UNLIKELY(!executeContinueCallback())) { if (ADB_UNLIKELY(!executeContinueCallback())) {
return false; // likely shutting down return false; // likely shutting down
@ -73,8 +73,7 @@ class SharedQueryState {
// simon: bad experience on macOS guard.unloack(); // simon: bad experience on macOS guard.unloack();
_condition.notify_one(); _condition.notify_one();
} }
return true;
return res;
} }
/// this has to stay for a backwards-compatible AQL HTTP API (hasMore). /// this has to stay for a backwards-compatible AQL HTTP API (hasMore).

View File

@ -1,43 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018-2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "WakeupQueryCallback.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/Query.h"
using namespace arangodb;
using namespace arangodb::aql;
WakeupQueryCallback::WakeupQueryCallback(ExecutionBlock* initiator, Query* query)
: _initiator(initiator), _query(query), _sharedState(query->sharedState()) {}
WakeupQueryCallback::~WakeupQueryCallback() {}
bool WakeupQueryCallback::operator()(ClusterCommResult* result) {
return _sharedState->execute([result, this]() {
TRI_ASSERT(_initiator != nullptr);
TRI_ASSERT(_query != nullptr);
// TODO Validate that _initiator and _query have not been deleted (ttl)
// TODO Handle exceptions
return _initiator->handleAsyncResult(result);
});
}

View File

@ -1,51 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018-2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_WAKEUP_QUERY_CALLBACK_H
#define ARANGOD_AQL_WAKEUP_QUERY_CALLBACK_H 1
#include "Basics/Common.h"
#include "Cluster/ClusterComm.h"
namespace arangodb {
namespace aql {
class ExecutionBlock;
class Query;
class SharedQueryState;
struct WakeupQueryCallback : public ClusterCommCallback {
WakeupQueryCallback(ExecutionBlock* initiator, Query* query);
~WakeupQueryCallback();
bool operator()(ClusterCommResult*) override;
private:
ExecutionBlock* _initiator;
Query* _query;
std::shared_ptr<SharedQueryState> _sharedState;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -342,7 +342,6 @@ set(LIB_ARANGO_AQL_SOURCES
Aql/V8Executor.cpp Aql/V8Executor.cpp
Aql/Variable.cpp Aql/Variable.cpp
Aql/VariableGenerator.cpp Aql/VariableGenerator.cpp
Aql/WakeupQueryCallback.cpp
Aql/grammar.cpp Aql/grammar.cpp
Aql/tokens.cpp Aql/tokens.cpp
) )
@ -504,6 +503,14 @@ set(LIB_ARANGO_V8SERVER_SOURCES
V8Server/v8-vocindex.cpp V8Server/v8-vocindex.cpp
) )
set(LIB_ARANGO_NETWORK_SOURCES
Network/ClusterUtils.cpp
Network/ConnectionPool.cpp
Network/Methods.cpp
Network/NetworkFeature.cpp
Network/Utils.cpp
)
set(LIB_ARANGOSERVER_SOURCES set(LIB_ARANGOSERVER_SOURCES
Actions/ActionFeature.cpp Actions/ActionFeature.cpp
Actions/RestActionHandler.cpp Actions/RestActionHandler.cpp
@ -578,10 +585,6 @@ set(LIB_ARANGOSERVER_SOURCES
GeneralServer/SslServerFeature.cpp GeneralServer/SslServerFeature.cpp
GeneralServer/Task.cpp GeneralServer/Task.cpp
GeneralServer/VstCommTask.cpp GeneralServer/VstCommTask.cpp
Network/ConnectionPool.cpp
Network/Methods.cpp
Network/NetworkFeature.cpp
Network/Utils.cpp
RestHandler/RestAdminDatabaseHandler.cpp RestHandler/RestAdminDatabaseHandler.cpp
RestHandler/RestAdminExecuteHandler.cpp RestHandler/RestAdminExecuteHandler.cpp
RestHandler/RestAdminLogHandler.cpp RestHandler/RestAdminLogHandler.cpp
@ -755,6 +758,10 @@ add_library(arangoserver STATIC
${ProductVersionFiles} ${ProductVersionFiles}
) )
add_library(arango_network STATIC
${LIB_ARANGO_NETWORK_SOURCES}
)
add_library(arango_mmfiles STATIC add_library(arango_mmfiles STATIC
${MMFILES_SOURCES} ${MMFILES_SOURCES}
) )
@ -803,6 +810,7 @@ target_link_libraries(arango_aql arango_geo)
target_link_libraries(arango_aql arango_graph) target_link_libraries(arango_aql arango_graph)
target_link_libraries(arango_aql arango_indexes) target_link_libraries(arango_aql arango_indexes)
target_link_libraries(arango_aql arango_iresearch) target_link_libraries(arango_aql arango_iresearch)
target_link_libraries(arango_aql arango_network)
target_link_libraries(arango_cache arango) target_link_libraries(arango_cache arango)
target_link_libraries(arango_cache boost_system) target_link_libraries(arango_cache boost_system)
@ -811,6 +819,7 @@ target_link_libraries(arango_cluster_engine arango_indexes)
target_link_libraries(arango_cluster_engine boost_boost) target_link_libraries(arango_cluster_engine boost_boost)
target_link_libraries(arango_cluster_methods arango) target_link_libraries(arango_cluster_methods arango)
target_link_libraries(arango_cluster_methods arango_network)
target_link_libraries(arango_common_rest_handler arango_cluster_methods) target_link_libraries(arango_common_rest_handler arango_cluster_methods)
target_link_libraries(arango_common_rest_handler arango_utils) target_link_libraries(arango_common_rest_handler arango_utils)
@ -833,6 +842,10 @@ target_link_libraries(arango_mmfiles boost_boost)
target_link_libraries(arango_mmfiles boost_system) target_link_libraries(arango_mmfiles boost_system)
target_link_libraries(arango_mmfiles llhttp) target_link_libraries(arango_mmfiles llhttp)
target_link_libraries(arango_network boost_boost)
target_link_libraries(arango_network fuerte)
target_link_libraries(arango_network llhttp)
target_link_libraries(arango_pregel arango_agency) target_link_libraries(arango_pregel arango_agency)
target_link_libraries(arango_pregel boost_boost) target_link_libraries(arango_pregel boost_boost)
target_link_libraries(arango_pregel boost_system) target_link_libraries(arango_pregel boost_system)
@ -875,6 +888,7 @@ target_link_libraries(arangoserver arango_geo)
target_link_libraries(arangoserver arango_graph) target_link_libraries(arangoserver arango_graph)
target_link_libraries(arangoserver arango_indexes) target_link_libraries(arangoserver arango_indexes)
target_link_libraries(arangoserver arango_iresearch) target_link_libraries(arangoserver arango_iresearch)
target_link_libraries(arangoserver arango_network)
target_link_libraries(arangoserver arango_pregel) target_link_libraries(arangoserver arango_pregel)
target_link_libraries(arangoserver arango_replication) target_link_libraries(arangoserver arango_replication)
target_link_libraries(arangoserver arango_storage_engine) target_link_libraries(arangoserver arango_storage_engine)
@ -882,8 +896,6 @@ target_link_libraries(arangoserver arango_utils)
target_link_libraries(arangoserver arango_v8server) target_link_libraries(arangoserver arango_v8server)
target_link_libraries(arangoserver arango_vocbase) target_link_libraries(arangoserver arango_vocbase)
target_link_libraries(arangoserver boost_boost) target_link_libraries(arangoserver boost_boost)
target_link_libraries(arangoserver fuerte)
target_link_libraries(arangoserver llhttp)
target_link_libraries(arangoserver target_link_libraries(arangoserver
${LINENOISE_LIBS} # Is this ever anything but empty? ${LINENOISE_LIBS} # Is this ever anything but empty?

View File

@ -42,6 +42,7 @@
#include "Cluster/ClusterTrxMethods.h" #include "Cluster/ClusterTrxMethods.h"
#include "Futures/Utilities.h" #include "Futures/Utilities.h"
#include "Graph/Traverser.h" #include "Graph/Traverser.h"
#include "Network/ClusterUtils.h"
#include "Network/Methods.h" #include "Network/Methods.h"
#include "Network/NetworkFeature.h" #include "Network/NetworkFeature.h"
#include "Network/Utils.h" #include "Network/Utils.h"
@ -245,51 +246,6 @@ void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap cons
} }
} }
/// @brief Collect the results from all shards (fastpath variant)
/// All result bodies are stored in resultMap
template <typename T>
static void collectResponsesFromAllShards(
std::map<ShardID, std::vector<T>> const& shardMap,
std::vector<futures::Try<arangodb::network::Response>>& responses,
std::unordered_map<int, size_t>& errorCounter,
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>>& resultMap,
fuerte::StatusCode& code) {
// If none of the shards responds we return a SERVER_ERROR;
code = fuerte::StatusInternalError;
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
auto tmpBuilder = std::make_shared<VPackBuilder>();
// If there was no answer whatsoever, we cannot rely on the shardId
// being present in the result struct:
auto weSend = shardMap.find(sId);
TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
tmpBuilder->openObject();
tmpBuilder->add(StaticStrings::Error, VPackValue(true));
tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError));
tmpBuilder->close();
}
resultMap.emplace(sId, std::move(tmpBuilder));
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
auto tmpBuilder = std::make_shared<VPackBuilder>();
if (!slices.empty()) {
tmpBuilder->add(slices[0]);
}
resultMap.emplace(sId, std::move(tmpBuilder));
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
}
/// @brief iterate over shard responses and compile a result /// @brief iterate over shard responses and compile a result
/// This will take care of checking the fuerte responses. If the response has /// This will take care of checking the fuerte responses. If the response has
/// a body, then the callback will be called on the body, with access to the /// a body, then the callback will be called on the body, with access to the
@ -331,47 +287,7 @@ OperationResult handleResponsesFromAllShards(
post(result, builder); post(result, builder);
return OperationResult(result, builder.steal()); return OperationResult(result, builder.steal());
} }
} // namespace
namespace arangodb {
////////////////////////////////////////////////////////////////////////////////
/// @brief merge the baby-object results.
/// The shard map contains the ordering of elements, the vector in this
/// Map is expected to be sorted from front to back.
/// The second map contains the answers for each shard.
/// The builder in the third parameter will be cleared and will contain
/// the resulting array. It is guaranteed that the resulting array
/// indexes
/// are equal to the original request ordering before it was destructured
/// for babies.
////////////////////////////////////////////////////////////////////////////////
static void mergeResults(std::vector<std::pair<ShardID, VPackValueLength>> const& reverseMapping,
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> const& resultMap,
VPackBuilder& resultBody) {
resultBody.clear();
resultBody.openArray();
for (auto const& pair : reverseMapping) {
VPackSlice arr = resultMap.find(pair.first)->second->slice();
if (arr.isObject() && arr.hasKey(StaticStrings::Error) &&
arr.get(StaticStrings::Error).isBoolean() &&
arr.get(StaticStrings::Error).getBoolean()) {
// an error occurred, now rethrow the error
int res = arr.get(StaticStrings::ErrorNum).getNumericValue<int>();
VPackSlice msg = arr.get(StaticStrings::ErrorMessage);
if (msg.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString());
} else {
THROW_ARANGO_EXCEPTION(res);
}
}
resultBody.add(arr.at(pair.second));
}
resultBody.close();
}
namespace {
// velocypack representation of object // velocypack representation of object
// {"error":true,"errorMessage":"document not found","errorNum":1202} // {"error":true,"errorMessage":"document not found","errorNum":1202}
static const char* notFoundSlice = static const char* notFoundSlice =
@ -437,12 +353,88 @@ void mergeResultsAllShards(std::vector<VPackSlice> const& results, VPackBuilder&
} }
} }
/// @brief handle CRUD api shard responses, slow path
template <typename F, typename CT>
OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
std::vector<Try<network::Response>> const& results) {
std::map<ShardID, VPackSlice> resultMap;
std::map<ShardID, int> shardError;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code = fuerte::StatusInternalError;
// If none of the shards responded we return a SERVER_ERROR;
for (Try<arangodb::network::Response> const& tryRes : results) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
shardError.emplace(sId, commError);
} else {
resultMap.emplace(sId, res.response->slice());
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
// merge the baby-object results. reverseMapping contains
// the ordering of elements, the vector in this
// map is expected to be sorted from front to back.
// resultMap contains the answers for each shard.
// It is guaranteed that the resulting array indexes are
// equal to the original request ordering before it was destructured
VPackBuilder resultBody;
resultBody.openArray();
for (auto const& pair : opCtx.reverseMapping) {
ShardID const& sId = pair.first;
auto const& it = resultMap.find(sId);
if (it == resultMap.end()) { // no answer from this shard
auto const& it2 = shardError.find(sId);
TRI_ASSERT(it2 != shardError.end());
auto weSend = opCtx.shardMap.find(sId);
TRI_ASSERT(weSend != opCtx.shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
resultBody.openObject(/*unindexed*/ true);
resultBody.add(StaticStrings::Error, VPackValue(true));
resultBody.add(StaticStrings::ErrorNum, VPackValue(it2->second));
resultBody.close();
}
} else {
VPackSlice arr = it->second;
// we expect an array of baby-documents, but the response might
// also be an error, if the DBServer threw a hissy fit
if (arr.isObject() && arr.hasKey(StaticStrings::Error) &&
arr.get(StaticStrings::Error).isBoolean() &&
arr.get(StaticStrings::Error).getBoolean()) {
// an error occurred, now rethrow the error
int res = arr.get(StaticStrings::ErrorNum).getNumericValue<int>();
VPackSlice msg = arr.get(StaticStrings::ErrorMessage);
if (msg.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString());
} else {
THROW_ARANGO_EXCEPTION(res);
}
}
resultBody.add(arr.at(pair.second));
}
}
resultBody.close();
return std::forward<F>(func)(code, resultBody.steal(),
std::move(opCtx.options), std::move(errorCounter));
}
/// @brief handle CRUD api shard responses, slow path /// @brief handle CRUD api shard responses, slow path
template <typename F> template <typename F>
OperationResult handleCRUDShardResponsesSlow(F&& func, size_t expectedLen, OperationOptions options, OperationResult handleCRUDShardResponsesSlow(F&& func, size_t expectedLen, OperationOptions options,
std::vector<Try<network::Response>> const& responses) { std::vector<Try<network::Response>> const& responses) {
std::shared_ptr<VPackBuffer<uint8_t>> buffer;
if (expectedLen == 0) { // Only one can answer, we react a bit differently if (expectedLen == 0) { // Only one can answer, we react a bit differently
std::shared_ptr<VPackBuffer<uint8_t>> buffer;
int nrok = 0; int nrok = 0;
int commError = TRI_ERROR_NO_ERROR; int commError = TRI_ERROR_NO_ERROR;
@ -767,6 +759,8 @@ static std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>
return result; return result;
} }
namespace arangodb {
/// @brief convert ClusterComm error into arango error code /// @brief convert ClusterComm error into arango error code
int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) { int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) {
// This function creates an error code from a ClusterCommResult, // This function creates an error code from a ClusterCommResult,
@ -1381,23 +1375,14 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
} }
return network::clusterResultInsert(res.response->statusCode(), return network::clusterResultInsert(res.response->statusCode(),
res.response->copyPayload(), options, {}); res.response->stealPayload(), options, {});
}; };
return std::move(futures[0]).thenValue(cb); return std::move(futures[0]).thenValue(cb);
} }
return futures::collectAll(std::move(futures)) return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult { .thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>> results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap; return handleCRUDShardResponsesFast(network::clusterResultInsert, opCtx, results);
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultInsert(code, resultBody.steal(), std::move(opCtx.options), errorCounter);
}); });
}); });
} }
@ -1503,19 +1488,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
return futures::collectAll(std::move(futures)) return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult { .thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap; return handleCRUDShardResponsesFast(network::clusterResultDelete, opCtx, results);
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
// the cluster operation was OK, however,
// the DBserver could have reported an error.
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultDelete(code, resultBody.steal(),
opCtx.options, errorCounter);
}); });
}); });
@ -1767,18 +1740,9 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
}); });
} }
return futures::collectAll(std::move(futures)).thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult { return futures::collectAll(std::move(futures))
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap; .thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>> results) {
std::unordered_map<int, size_t> errorCounter; return handleCRUDShardResponsesFast(network::clusterResultDocument, opCtx, results);
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultDocument(fuerte::StatusOK, resultBody.steal(),
std::move(opCtx.options), errorCounter);
}); });
}); });
@ -2430,19 +2394,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
return futures::collectAll(std::move(futures)) return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult { .thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap; return handleCRUDShardResponsesFast(network::clusterResultModify, opCtx, results);
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code;
collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code);
TRI_ASSERT(resultMap.size() == results.size());
// the cluster operation was OK, however,
// the DBserver could have reported an error.
VPackBuilder resultBody;
mergeResults(opCtx.reverseMapping, resultMap, resultBody);
return network::clusterResultModify(code, resultBody.steal(),
opCtx.options, errorCounter);
}); });
}); });
} }
@ -3514,6 +3466,7 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
// We keep the currently registered timestamps in Current/ServersRegistered, // We keep the currently registered timestamps in Current/ServersRegistered,
// such that we can wait until all have reregistered and are up: // such that we can wait until all have reregistered and are up:
ci.loadCurrentDBServers(); ci.loadCurrentDBServers();
auto const preServersKnown = ci.rebootIds(); auto const preServersKnown = ci.rebootIds();
@ -3524,6 +3477,13 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
return result; return result;
} }
// no need to keep connections to shut-down servers
auto const& nf = feature.server().getFeature<NetworkFeature>();
auto* pool = nf.pool();
if (pool) {
pool->drainConnections();
}
auto startTime = std::chrono::steady_clock::now(); auto startTime = std::chrono::steady_clock::now();
while (true) { // will be left by a timeout while (true) { // will be left by a timeout
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));

View File

@ -0,0 +1,131 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "ClusterUtils.h"
#include "Network/ConnectionPool.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Logger/LogMacros.h"
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace network {
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusCreated: {
options.waitForSync = true; // wait for sync is abused herea
// operationResult should get a return code.
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_CONFLICT);
case fuerte::StatusBadRequest:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
case fuerte::StatusConflict:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
/// @brief Create Cluster Communication result for document
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(Result(TRI_ERROR_ARANGO_CONFLICT), std::move(body),
std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
/// @brief Create Cluster Communication result for modify
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
options.waitForSync = (code == fuerte::StatusCreated);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusConflict:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED),
body, std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
}
/// @brief Create Cluster Communication result for delete
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
OperationOptions options;
options.waitForSync = (code != fuerte::StatusAccepted);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
}
} // namespace network
} // namespace arangodb

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGOD_NETWORK_CLUSTER_UTILS_H
#define ARANGOD_NETWORK_CLUSTER_UTILS_H 1
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h"
#include <fuerte/types.h>
#include <velocypack/Buffer.h>
#include <velocypack/Slice.h>
namespace arangodb {
namespace network {
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(fuerte::StatusCode responsecode,
std::shared_ptr<velocypack::Buffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
} // namespace network
} // namespace arangodb
#endif

View File

@ -46,7 +46,7 @@ ConnectionPool::~ConnectionPool() { shutdown(); }
/// @brief request a connection for a specific endpoint /// @brief request a connection for a specific endpoint
/// note: it is the callers responsibility to ensure the endpoint /// note: it is the callers responsibility to ensure the endpoint
/// is always the same, we do not do any post-processing /// is always the same, we do not do any post-processing
ConnectionPool::Ref ConnectionPool::leaseConnection(EndpointSpec const& str) { ConnectionPool::Ref ConnectionPool::leaseConnection(std::string const& str) {
fuerte::ConnectionBuilder builder; fuerte::ConnectionBuilder builder;
builder.endpoint(str); builder.endpoint(str);
builder.protocolType(_config.protocol); // always overwrite protocol builder.protocolType(_config.protocol); // always overwrite protocol
@ -69,8 +69,8 @@ ConnectionPool::Ref ConnectionPool::leaseConnection(EndpointSpec const& str) {
return selectConnection(*(it->second), builder); return selectConnection(*(it->second), builder);
} }
/// @brief shutdown all connections /// @brief drain all connections
void ConnectionPool::shutdown() { void ConnectionPool::drainConnections() {
WRITE_LOCKER(guard, _lock); WRITE_LOCKER(guard, _lock);
for (auto& pair : _connections) { for (auto& pair : _connections) {
ConnectionList& list = *(pair.second); ConnectionList& list = *(pair.second);
@ -79,6 +79,13 @@ void ConnectionPool::shutdown() {
c->fuerte->cancel(); c->fuerte->cancel();
} }
} }
}
/// @brief shutdown all connections
void ConnectionPool::shutdown() {
drainConnections();
WRITE_LOCKER(guard, _lock);
_connections.clear(); _connections.clear();
} }
@ -161,15 +168,15 @@ void ConnectionPool::pruneConnections() {
} }
/// @brief cancel connections to this endpoint /// @brief cancel connections to this endpoint
void ConnectionPool::cancelConnections(EndpointSpec const& str) { void ConnectionPool::cancelConnections(std::string const& endpoint) {
fuerte::ConnectionBuilder builder; fuerte::ConnectionBuilder builder;
builder.endpoint(str); builder.endpoint(endpoint);
builder.protocolType(_config.protocol); // always overwrite protocol builder.protocolType(_config.protocol); // always overwrite protocol
std::string endpoint = builder.normalizedEndpoint(); std::string normalized = builder.normalizedEndpoint();
WRITE_LOCKER(guard, _lock); WRITE_LOCKER(guard, _lock);
auto const& it = _connections.find(endpoint); auto const& it = _connections.find(normalized);
if (it != _connections.end()) { if (it != _connections.end()) {
// { // {
// ConnectionList& list = *(it->second); // ConnectionList& list = *(it->second);

View File

@ -84,12 +84,15 @@ class ConnectionPool final {
/// @brief request a connection for a specific endpoint /// @brief request a connection for a specific endpoint
/// note: it is the callers responsibility to ensure the endpoint /// note: it is the callers responsibility to ensure the endpoint
/// is always the same, we do not do any post-processing /// is always the same, we do not do any post-processing
Ref leaseConnection(EndpointSpec const&); Ref leaseConnection(std::string const& endpoint);
/// @brief event loop service to create a connection seperately /// @brief event loop service to create a connection seperately
/// user is responsible for correctly shutting it down /// user is responsible for correctly shutting it down
fuerte::EventLoopService& eventLoopService() { return _loop; } fuerte::EventLoopService& eventLoopService() { return _loop; }
/// @brief shutdown all connections
void drainConnections();
/// @brief shutdown all connections /// @brief shutdown all connections
void shutdown(); void shutdown();
@ -97,7 +100,7 @@ class ConnectionPool final {
void pruneConnections(); void pruneConnections();
/// @brief cancel connections to this endpoint /// @brief cancel connections to this endpoint
void cancelConnections(EndpointSpec const&); void cancelConnections(std::string const& endpoint);
/// @brief return the number of open connections /// @brief return the number of open connections
size_t numOpenConnections() const; size_t numOpenConnections() const;

View File

@ -106,14 +106,13 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
Response{destination, Error::Canceled, nullptr}); Response{destination, Error::Canceled, nullptr});
} }
arangodb::network::EndpointSpec endpoint; arangodb::network::EndpointSpec spec;
int res = resolveDestination(feature, destination, spec);
int res = resolveDestination(feature, destination, endpoint);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?! if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
return futures::makeFuture( return futures::makeFuture(
Response{destination, Error::Canceled, nullptr}); Response{destination, Error::Canceled, nullptr});
} }
TRI_ASSERT(!endpoint.empty()); TRI_ASSERT(!spec.endpoint.empty());
auto req = prepareRequest(type, path, std::move(payload), timeout, std::move(headers)); auto req = prepareRequest(type, path, std::move(payload), timeout, std::move(headers));
@ -126,7 +125,7 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
: destination(dest), ref(std::move(r)), promise() {} : destination(dest), ref(std::move(r)), promise() {}
}; };
static_assert(sizeof(std::shared_ptr<Pack>) <= 2*sizeof(void*), "does not fit in sfo"); static_assert(sizeof(std::shared_ptr<Pack>) <= 2*sizeof(void*), "does not fit in sfo");
auto p = std::make_shared<Pack>(destination, pool->leaseConnection(endpoint)); auto p = std::make_shared<Pack>(destination, pool->leaseConnection(spec.endpoint));
auto conn = p->ref.connection(); auto conn = p->ref.connection();
auto f = p->promise.getFuture(); auto f = p->promise.getFuture();
@ -189,8 +188,9 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
return; // we are done return; // we are done
} }
arangodb::network::EndpointSpec endpoint; // actual server endpoint is always re-evaluated
int res = resolveDestination(_feature, _destination, endpoint); arangodb::network::EndpointSpec spec;
int res = resolveDestination(_feature, _destination, spec);
if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work
callResponse(Error::Canceled, nullptr); callResponse(Error::Canceled, nullptr);
return; return;
@ -206,7 +206,7 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
auto localTO = std::chrono::duration_cast<std::chrono::milliseconds>(_endTime - now); auto localTO = std::chrono::duration_cast<std::chrono::milliseconds>(_endTime - now);
TRI_ASSERT(localTO.count() > 0); TRI_ASSERT(localTO.count() > 0);
auto ref = pool->leaseConnection(endpoint); auto ref = pool->leaseConnection(spec.endpoint);
auto req = prepareRequest(_type, _path, _payload, localTO, _headers); auto req = prepareRequest(_type, _path, _payload, localTO, _headers);
auto self = RequestsState::shared_from_this(); auto self = RequestsState::shared_from_this();
auto cb = [self, ref](fuerte::Error err, auto cb = [self, ref](fuerte::Error err,

View File

@ -148,17 +148,19 @@ void NetworkFeature::beginShutdown() {
_workItem.reset(); _workItem.reset();
} }
_poolPtr.store(nullptr, std::memory_order_release); _poolPtr.store(nullptr, std::memory_order_release);
if (_pool) { if (_pool) { // first cancel all connections
_pool->shutdown(); _pool->drainConnections();
_pool.reset();
} }
} }
void NetworkFeature::stop() { void NetworkFeature::stop() {
{
// we might have posted another workItem during shutdown. // we might have posted another workItem during shutdown.
std::lock_guard<std::mutex> guard(_workItemMutex); std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset(); _workItem.reset();
} }
_pool->drainConnections();
}
arangodb::network::ConnectionPool* NetworkFeature::pool() const { arangodb::network::ConnectionPool* NetworkFeature::pool() const {
return _poolPtr.load(std::memory_order_acquire); return _poolPtr.load(std::memory_order_acquire);

View File

@ -38,42 +38,47 @@
namespace arangodb { namespace arangodb {
namespace network { namespace network {
int resolveDestination(NetworkFeature& feature, DestinationId const& dest, int resolveDestination(NetworkFeature const& feature, DestinationId const& dest,
std::string& endpoint) { network::EndpointSpec& spec) {
using namespace arangodb;
if (dest.find("tcp://") == 0 || dest.find("ssl://") == 0) {
endpoint = dest;
return TRI_ERROR_NO_ERROR; // all good
}
// Now look up the actual endpoint: // Now look up the actual endpoint:
if (!feature.server().hasFeature<ClusterFeature>()) { if (!feature.server().hasFeature<ClusterFeature>()) {
return TRI_ERROR_SHUTTING_DOWN; return TRI_ERROR_SHUTTING_DOWN;
} }
auto& ci = feature.server().getFeature<ClusterFeature>().clusterInfo(); auto& ci = feature.server().getFeature<ClusterFeature>().clusterInfo();
return resolveDestination(ci, dest, spec);
}
int resolveDestination(ClusterInfo& ci, DestinationId const& dest,
network::EndpointSpec& spec) {
using namespace arangodb;
if (dest.find("tcp://") == 0 || dest.find("ssl://") == 0) {
spec.endpoint = dest;
return TRI_ERROR_NO_ERROR; // all good
}
// This sets result.shardId, result.serverId and result.endpoint, // This sets result.shardId, result.serverId and result.endpoint,
// depending on what dest is. Note that if a shardID is given, the // depending on what dest is. Note that if a shardID is given, the
// responsible server is looked up, if a serverID is given, the endpoint // responsible server is looked up, if a serverID is given, the endpoint
// is looked up, both can fail and immediately lead to a CL_COMM_ERROR // is looked up, both can fail and immediately lead to a CL_COMM_ERROR
// state. // state.
ServerID serverID;
if (dest.compare(0, 6, "shard:", 6) == 0) { if (dest.compare(0, 6, "shard:", 6) == 0) {
ShardID shardID = dest.substr(6); spec.shardId = dest.substr(6);
{ {
std::shared_ptr<std::vector<ServerID>> resp = ci.getResponsibleServer(shardID); std::shared_ptr<std::vector<ServerID>> resp = ci.getResponsibleServer(spec.shardId);
if (!resp->empty()) { if (!resp->empty()) {
serverID = (*resp)[0]; spec.serverId = (*resp)[0];
} else { } else {
LOG_TOPIC("60ee8", ERR, Logger::CLUSTER) LOG_TOPIC("60ee8", ERR, Logger::CLUSTER)
<< "cannot find responsible server for shard '" << shardID << "'"; << "cannot find responsible server for shard '" << spec.shardId << "'";
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
} }
} }
LOG_TOPIC("64670", DEBUG, Logger::CLUSTER) << "Responsible server: " << serverID; LOG_TOPIC("64670", DEBUG, Logger::CLUSTER) << "Responsible server: " << spec.serverId;
} else if (dest.compare(0, 7, "server:", 7) == 0) { } else if (dest.compare(0, 7, "server:", 7) == 0) {
serverID = dest.substr(7); spec.serverId = dest.substr(7);
} else { } else {
std::string errorMessage = "did not understand destination '" + dest + "'"; std::string errorMessage = "did not understand destination '" + dest + "'";
LOG_TOPIC("77a84", ERR, Logger::COMMUNICATION) LOG_TOPIC("77a84", ERR, Logger::COMMUNICATION)
@ -81,15 +86,15 @@ int resolveDestination(NetworkFeature& feature, DestinationId const& dest,
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
} }
endpoint = ci.getServerEndpoint(serverID); spec.endpoint = ci.getServerEndpoint(spec.serverId);
if (endpoint.empty()) { if (spec.endpoint.empty()) {
if (serverID.find(',') != std::string::npos) { if (spec.serverId.find(',') != std::string::npos) {
TRI_ASSERT(false); TRI_ASSERT(false);
} }
std::string errorMessage = std::string errorMessage =
"did not find endpoint of server '" + serverID + "'"; "did not find endpoint of server '" + spec.serverId + "'";
LOG_TOPIC("f29ef", ERR, Logger::COMMUNICATION) LOG_TOPIC("f29ef", ERR, Logger::COMMUNICATION)
<< "did not find endpoint of server '" << serverID << "'"; << "did not find endpoint of server '" << spec.serverId << "'";
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
} }
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
@ -118,7 +123,6 @@ Result resultFromBody(std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>> con
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& body, Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& body,
int defaultError) { int defaultError) {
// read the error number from the response and use it if present // read the error number from the response and use it if present
if (body) { if (body) {
return resultFromBody(body->slice(), defaultError); return resultFromBody(body->slice(), defaultError);
@ -127,8 +131,7 @@ Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& body
return Result(defaultError); return Result(defaultError);
} }
Result resultFromBody(arangodb::velocypack::Slice slice, Result resultFromBody(arangodb::velocypack::Slice slice, int defaultError) {
int defaultError) {
// read the error number from the response and use it if present // read the error number from the response and use it if present
if (slice.isObject()) { if (slice.isObject()) {
VPackSlice num = slice.get(StaticStrings::ErrorNum); VPackSlice num = slice.get(StaticStrings::ErrorNum);
@ -171,21 +174,16 @@ void errorCodesFromHeaders(network::Headers headers,
} }
} }
int fuerteToArangoErrorCode(network::Response const& res) { namespace {
return fuerteToArangoErrorCode(res.error);
}
int fuerteToArangoErrorCode(fuerte::Error err) { int toArangoErrorCodeInternal(fuerte::Error err) {
// This function creates an error code from a fuerte::Error,
// This function creates an error code from a ClusterCommResult,
// but only if it is a communication error. If the communication // but only if it is a communication error. If the communication
// was successful and there was an HTTP error code, this function // was successful and there was an HTTP error code, this function
// returns TRI_ERROR_NO_ERROR. // returns TRI_ERROR_NO_ERROR.
// If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED // If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED
// and .answer can safely be inspected. // and .answer can safely be inspected.
// LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, err != fuerte::Error::NoError) << fuerte::to_string(err);
switch (err) { switch (err) {
case fuerte::Error::NoError: case fuerte::Error::NoError:
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
@ -200,10 +198,12 @@ int fuerteToArangoErrorCode(fuerte::Error err) {
case fuerte::Error::Timeout: // No reply, we give up: case fuerte::Error::Timeout: // No reply, we give up:
return TRI_ERROR_CLUSTER_TIMEOUT; return TRI_ERROR_CLUSTER_TIMEOUT;
case fuerte::Error::Canceled:
return TRI_ERROR_REQUEST_CANCELED;
case fuerte::Error::QueueCapacityExceeded: // there is no result case fuerte::Error::QueueCapacityExceeded: // there is no result
case fuerte::Error::ReadError: case fuerte::Error::ReadError:
case fuerte::Error::WriteError: case fuerte::Error::WriteError:
case fuerte::Error::Canceled:
case fuerte::Error::ProtocolError: case fuerte::Error::ProtocolError:
return TRI_ERROR_CLUSTER_CONNECTION_LOST; return TRI_ERROR_CLUSTER_CONNECTION_LOST;
@ -213,98 +213,19 @@ int fuerteToArangoErrorCode(fuerte::Error err) {
return TRI_ERROR_INTERNAL; return TRI_ERROR_INTERNAL;
} }
} // namespace
/// @brief Create Cluster Communication result for insert int fuerteToArangoErrorCode(network::Response const& res) {
OperationResult clusterResultInsert(arangodb::fuerte::StatusCode code, LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, res.error != fuerte::Error::NoError)
std::shared_ptr<VPackBuffer<uint8_t>> body, << "cluster error: '" << fuerte::to_string(res.error)
OperationOptions options, << "' from destination '" << res.destination << "'";
std::unordered_map<int, size_t> const& errorCounter) { return toArangoErrorCodeInternal(res.error);
switch (code) {
case fuerte::StatusAccepted:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusCreated: {
options.waitForSync = true; // wait for sync is abused herea
// operationResult should get a return code.
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_CONFLICT);
case fuerte::StatusBadRequest:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
case fuerte::StatusConflict:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
} }
/// @brief Create Cluster Communication result for document int fuerteToArangoErrorCode(fuerte::Error err) {
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code, LOG_TOPIC_IF("abcdf", ERR, Logger::CLUSTER, err != fuerte::Error::NoError)
std::shared_ptr<VPackBuffer<uint8_t>> body, << "cluster error: '" << fuerte::to_string(err) << "'";
OperationOptions options, return toArangoErrorCodeInternal(err);
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(Result(TRI_ERROR_ARANGO_CONFLICT), std::move(body),
std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default:
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
/// @brief Create Cluster Communication result for modify
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
options.waitForSync = (code == fuerte::StatusCreated);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusConflict:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED),
body, std::move(options), errorCounter);
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
}
/// @brief Create Cluster Communication result for delete
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter) {
switch (code) {
case fuerte::StatusOK:
case fuerte::StatusAccepted:
case fuerte::StatusCreated: {
OperationOptions options;
options.waitForSync = (code != fuerte::StatusAccepted);
return OperationResult(Result(), std::move(body), std::move(options), errorCounter);
}
case fuerte::StatusPreconditionFailed:
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
body, std::move(options), errorCounter);
case fuerte::StatusNotFound:
return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
default: {
return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL);
}
}
} }
} // namespace network } // namespace network
} // namespace arangodb } // namespace arangodb

View File

@ -20,41 +20,39 @@
/// @author Simon Grätzer /// @author Simon Grätzer
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGOD_NETWORK_UTILS_H #ifndef ARANGOD_NETWORK_UTILS_H
#define ARANGOD_NETWORK_UTILS_H 1 #define ARANGOD_NETWORK_UTILS_H 1
#include "Basics/Result.h" #include "Basics/Result.h"
#include "Network/types.h" #include "Network/types.h"
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h" #include "Utils/OperationResult.h"
#include <fuerte/types.h> #include <fuerte/types.h>
#include <velocypack/Buffer.h> #include <velocypack/Buffer.h>
#include <velocypack/Slice.h> #include <velocypack/Slice.h>
#include <chrono>
namespace arangodb { namespace arangodb {
namespace velocypack { namespace velocypack {
class Builder; class Builder;
} }
class NetworkFeature; class NetworkFeature;
class ClusterInfo;
namespace network { namespace network {
/// @brief resolve 'shard:' or 'server:' url to actual endpoint /// @brief resolve 'shard:' or 'server:' url to actual endpoint
int resolveDestination(NetworkFeature&, DestinationId const& dest, std::string&); int resolveDestination(NetworkFeature const&, DestinationId const& dest, network::EndpointSpec&);
int resolveDestination(ClusterInfo&, DestinationId const& dest, network::EndpointSpec&);
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>> const& b, Result resultFromBody(std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>> const& b,
int defaultError); int defaultError);
Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& b, Result resultFromBody(std::shared_ptr<arangodb::velocypack::Builder> const& b, int defaultError);
int defaultError); Result resultFromBody(arangodb::velocypack::Slice b, int defaultError);
Result resultFromBody(arangodb::velocypack::Slice b,
int defaultError);
/// @brief extract the error from a cluster response /// @brief extract the error from a cluster response
template <typename T> template <typename T>
OperationResult opResultFromBody(T const& body, OperationResult opResultFromBody(T const& body, int defaultErrorCode) {
int defaultErrorCode) {
return OperationResult(arangodb::network::resultFromBody(body, defaultErrorCode)); return OperationResult(arangodb::network::resultFromBody(body, defaultErrorCode));
} }
@ -70,24 +68,6 @@ void errorCodesFromHeaders(network::Headers headers,
int fuerteToArangoErrorCode(network::Response const& res); int fuerteToArangoErrorCode(network::Response const& res);
int fuerteToArangoErrorCode(fuerte::Error err); int fuerteToArangoErrorCode(fuerte::Error err);
/// @brief Create Cluster Communication result for insert
OperationResult clusterResultInsert(fuerte::StatusCode responsecode,
std::shared_ptr<velocypack::Buffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
std::shared_ptr<VPackBuffer<uint8_t>> body,
OperationOptions options,
std::unordered_map<int, size_t> const& errorCounter);
} // namespace network } // namespace network
} // namespace arangodb } // namespace arangodb

View File

@ -35,8 +35,12 @@ typedef std::string DestinationId;
using Headers = std::map<std::string, std::string>; using Headers = std::map<std::string, std::string>;
using Timeout = std::chrono::duration<double>; using Timeout = std::chrono::duration<double>;
/// @brief unified endpoint struct EndpointSpec {
typedef std::string EndpointSpec; std::string shardId;
std::string serverId;
std::string endpoint;
};
} // namespace network } // namespace network
} // namespace arangodb } // namespace arangodb