diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index 44b3274396..5f2d553601 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -703,10 +703,17 @@ arangodb::Result RemoteBlock::handleCommErrors(ClusterCommResult* res) const { return {res->getErrorCode(), res->stringifyErrorMessage()}; } if (res->status == CL_COMM_ERROR) { - std::string errorMessage = - std::string("Error message received from shard '") + - std::string(res->shardID) + std::string("' on cluster node '") + + std::string errorMessage; + auto const& shardID = res->shardID; + + if (shardID.empty()) { + errorMessage = std::string("Error message received from cluster node '") + std::string(res->serverID) + std::string("': "); + } else { + errorMessage = std::string("Error message received from shard '") + + std::string(shardID) + std::string("' on cluster node '") + + std::string(res->serverID) + std::string("': "); + } int errorNum = TRI_ERROR_INTERNAL; if (res->result != nullptr) { @@ -1181,12 +1188,13 @@ SortingGatherBlock::SortingGatherBlock(ExecutionEngine& engine, GatherNode const TRI_ASSERT(!en.elements().empty()); switch (en.sortMode()) { - case GatherNode::SortMode::Heap: - _strategy = std::make_unique(_trx, _gatherBlockBuffer, _sortRegisters); - break; case GatherNode::SortMode::MinElement: _strategy = std::make_unique(_trx, _gatherBlockBuffer, _sortRegisters); break; + case GatherNode::SortMode::Heap: + case GatherNode::SortMode::Default: // use heap by default + _strategy = std::make_unique(_trx, _gatherBlockBuffer, _sortRegisters); + break; default: TRI_ASSERT(false); break; diff --git a/arangod/Aql/ClusterNodes.cpp b/arangod/Aql/ClusterNodes.cpp index 52be0bdbe6..e745cba8b4 100644 --- a/arangod/Aql/ClusterNodes.cpp +++ b/arangod/Aql/ClusterNodes.cpp @@ -45,11 +45,12 @@ arangodb::velocypack::StringRef const SortModeMinElement("minelement"); arangodb::velocypack::StringRef const SortModeHeap("heap"); bool toSortMode(arangodb::velocypack::StringRef const& str, GatherNode::SortMode& mode) noexcept { - // std::map ~25-30% faster than std::unordered_map for small number of - // elements + // std::map ~25-30% faster than std::unordered_map for small number of elements static std::map const NameToValue{ {SortModeMinElement, GatherNode::SortMode::MinElement}, - {SortModeHeap, GatherNode::SortMode::Heap}}; + {SortModeHeap, GatherNode::SortMode::Heap}, + {SortModeUnset, GatherNode::SortMode::Default} + }; auto const it = NameToValue.find(str); @@ -68,6 +69,8 @@ arangodb::velocypack::StringRef toString(GatherNode::SortMode mode) noexcept { return SortModeMinElement; case GatherNode::SortMode::Heap: return SortModeHeap; + case GatherNode::SortMode::Default: + return SortModeUnset; default: TRI_ASSERT(false); return {}; diff --git a/arangod/Aql/ClusterNodes.h b/arangod/Aql/ClusterNodes.h index 9199bc125b..b0b452d28f 100644 --- a/arangod/Aql/ClusterNodes.h +++ b/arangod/Aql/ClusterNodes.h @@ -277,7 +277,7 @@ class GatherNode final : public ExecutionNode { friend class RedundantCalculationsReplacer; public: - enum class SortMode : uint32_t { MinElement, Heap }; + enum class SortMode : uint32_t { MinElement, Heap, Default }; /// @brief inspect dependencies starting from a specified 'node' /// and return first corresponding collection within diff --git a/arangod/Aql/EngineInfoContainerDBServer.cpp b/arangod/Aql/EngineInfoContainerDBServer.cpp index 69a77ed46b..acc86fdf89 100644 --- a/arangod/Aql/EngineInfoContainerDBServer.cpp +++ b/arangod/Aql/EngineInfoContainerDBServer.cpp @@ -74,17 +74,45 @@ Result ExtractRemoteAndShard(VPackSlice keySlice, size_t& remoteId, std::string& return {TRI_ERROR_NO_ERROR}; } +GatherNode* findFirstGather(ExecutionNode const& root) { + ExecutionNode* node = root.getFirstParent(); + + // moving down from a given node + // towards a return node + while (node) { + switch (node->getType()) { + case ExecutionNode::REMOTE: + node = node->getFirstParent(); + + if (!node || node->getType() != ExecutionNode::GATHER) { + return nullptr; + } + + return ExecutionNode::castTo(node); + default: + node = node->getFirstParent(); + break; + } + } + + return nullptr; +} + + ScatterNode* findFirstScatter(ExecutionNode const& root) { ExecutionNode* node = root.getFirstDependency(); + // moving up from a given node + // towards a singleton node while (node) { switch (node->getType()) { case ExecutionNode::REMOTE: node = node->getFirstDependency(); - if (node == nullptr) { + if (!node) { return nullptr; } + if (node->getType() != ExecutionNode::SCATTER && node->getType() != ExecutionNode::DISTRIBUTE) { return nullptr; @@ -103,7 +131,7 @@ ScatterNode* findFirstScatter(ExecutionNode const& root) { } // namespace EngineInfoContainerDBServer::EngineInfo::EngineInfo(size_t idOfRemoteNode) noexcept - : _idOfRemoteNode(idOfRemoteNode), _otherId(0), _collection(nullptr) {} + : _idOfRemoteNode(idOfRemoteNode), _otherId(0), _source(CollectionSource(nullptr)) {} EngineInfoContainerDBServer::EngineInfo::~EngineInfo() { // This container is not responsible for nodes @@ -116,56 +144,64 @@ EngineInfoContainerDBServer::EngineInfo::EngineInfo(EngineInfo&& other) noexcept : _nodes(std::move(other._nodes)), _idOfRemoteNode(other._idOfRemoteNode), _otherId(other._otherId), - _collection(other._collection) { + _source(std::move(other._source)) { TRI_ASSERT(!_nodes.empty()); - TRI_ASSERT(_collection != nullptr); + +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + struct { + void operator()(CollectionSource const& source) { + TRI_ASSERT(source.collection); + } + + void operator()(ViewSource const& source) { + TRI_ASSERT(source.view); + } + } visitor; + + boost::apply_visitor(visitor, _source); +#endif } void EngineInfoContainerDBServer::EngineInfo::addNode(ExecutionNode* node) { TRI_ASSERT(node); + + auto setRestrictedShard = [](auto* node, auto& source) { + TRI_ASSERT(node); + + auto* sourceImpl = boost::get(&source); + TRI_ASSERT(sourceImpl); + + if (node->isRestricted()) { + TRI_ASSERT(sourceImpl->restrictedShard.empty()); + sourceImpl->restrictedShard = node->restrictedShard(); + } + }; + switch (node->getType()) { case ExecutionNode::ENUMERATE_COLLECTION: { - TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE); - auto ecNode = ExecutionNode::castTo(node); - if (ecNode->isRestricted()) { - TRI_ASSERT(_restrictedShard.empty()); - _restrictedShard = ecNode->restrictedShard(); - } - - // do not set '_type' of the engine here, - // bacause satellite collections may consists of - // multiple "main nodes" - + TRI_ASSERT(EngineType::Collection == type()); + setRestrictedShard(ExecutionNode::castTo(node), _source); break; } case ExecutionNode::INDEX: { - TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE); - auto idxNode = ExecutionNode::castTo(node); - if (idxNode->isRestricted()) { - TRI_ASSERT(_restrictedShard.empty()); - _restrictedShard = idxNode->restrictedShard(); - } - - // do not set '_type' of the engine here, - // because satellite collections may consist of - // multiple "main nodes" - + TRI_ASSERT(EngineType::Collection == type()); + setRestrictedShard(ExecutionNode::castTo(node), _source); break; } #ifdef USE_IRESEARCH case ExecutionNode::ENUMERATE_IRESEARCH_VIEW: { - TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE); + TRI_ASSERT(EngineType::Collection == type()); auto& viewNode = *ExecutionNode::castTo(node); - // FIXME should we have a separate optimizer rule for that? - // // evaluate node volatility before the distribution // can't do it on DB servers since only parts of the plan will be sent viewNode.volatility(true); - _scatter = findFirstScatter(*node); - _type = ExecutionNode::ENUMERATE_IRESEARCH_VIEW; - _view = viewNode.view().get(); + _source = ViewSource( + *viewNode.view().get(), + findFirstGather(viewNode), + findFirstScatter(viewNode) + ); break; } #endif @@ -174,12 +210,8 @@ void EngineInfoContainerDBServer::EngineInfo::addNode(ExecutionNode* node) { case ExecutionNode::REMOVE: case ExecutionNode::REPLACE: case ExecutionNode::UPSERT: { - TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE); - auto modNode = ExecutionNode::castTo(node); - if (modNode->isRestricted()) { - TRI_ASSERT(_restrictedShard.empty()); - _restrictedShard = modNode->restrictedShard(); - } + TRI_ASSERT(EngineType::Collection == type()); + setRestrictedShard(ExecutionNode::castTo(node), _source); break; } default: @@ -190,21 +222,43 @@ void EngineInfoContainerDBServer::EngineInfo::addNode(ExecutionNode* node) { } Collection const* EngineInfoContainerDBServer::EngineInfo::collection() const noexcept { -#ifdef USE_IRESEARCH - TRI_ASSERT(ExecutionNode::ENUMERATE_IRESEARCH_VIEW != _type); -#endif - return _collection; + TRI_ASSERT(EngineType::Collection == type()); + auto* source = boost::get(&_source); + TRI_ASSERT(source); + return source->collection; +} + +void EngineInfoContainerDBServer::EngineInfo::collection(Collection* col) noexcept { + TRI_ASSERT(EngineType::Collection == type()); + auto* source = boost::get(&_source); + TRI_ASSERT(source); + source->collection = col; } #ifdef USE_IRESEARCH LogicalView const* EngineInfoContainerDBServer::EngineInfo::view() const noexcept { - TRI_ASSERT(ExecutionNode::ENUMERATE_IRESEARCH_VIEW == _type); - return _view; + TRI_ASSERT(EngineType::View == type()); + auto* source = boost::get(&_source); + TRI_ASSERT(source); + return source->view; } -ScatterNode* EngineInfoContainerDBServer::EngineInfo::scatter() const noexcept { - TRI_ASSERT(ExecutionNode::ENUMERATE_IRESEARCH_VIEW == _type); - return _scatter; +void EngineInfoContainerDBServer::EngineInfo::addClient(ServerID const& server) { + TRI_ASSERT(EngineType::View == type()); + + auto* source = boost::get(&_source); + TRI_ASSERT(source); + + if (source->scatter) { + auto& clients = source->scatter->clients(); + TRI_ASSERT(clients.end() == std::find(clients.begin(), clients.end(), server)); + clients.emplace_back(server); + } + + if (source->gather) { + // FIXME introduce a separate step if sort mode detection will become heavy + source->gather->sortMode(GatherNode::evaluateSortMode(++source->numClients)); + } } #endif @@ -275,8 +329,12 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet( void EngineInfoContainerDBServer::EngineInfo::serializeSnippet( Query& query, ShardID id, VPackBuilder& infoBuilder, bool isResponsibleForInit) const { - if (!_restrictedShard.empty()) { - if (id != _restrictedShard) { + auto* collection = boost::get(&_source); + TRI_ASSERT(collection); + auto& restrictedShard = collection->restrictedShard; + + if (!restrictedShard.empty()) { + if (id != restrictedShard) { return; } // We only have one shard it has to be responsible! @@ -293,7 +351,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet( // this clone does the translation collection => shardId implicitly // at the relevant parts of the query. - _collection->setCurrentShard(id); + collection->collection->setCurrentShard(id); ExecutionPlan plan(query.ast()); ExecutionNode* previous = nullptr; @@ -335,7 +393,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet( plan.setVarUsageComputed(); const unsigned flags = ExecutionNode::SERIALIZE_DETAILS; plan.root()->toVelocyPack(infoBuilder, flags, /*keepTopLevelOpen*/ false); - _collection->resetCurrentShard(); + collection->collection->resetCurrentShard(); } void EngineInfoContainerDBServer::CollectionInfo::mergeShards( @@ -439,7 +497,7 @@ void EngineInfoContainerDBServer::closeSnippet(QueryId coordinatorEngineId) { e->connectQueryId(coordinatorEngineId); #ifdef USE_IRESEARCH - if (ExecutionNode::ENUMERATE_IRESEARCH_VIEW == e->type()) { + if (EngineInfo::EngineType::View == e->type()) { _viewInfos[e->view()].engines.emplace_back(std::move(e)); } else #endif @@ -547,7 +605,7 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage( EngineInfo const& engine = *it.first; std::vector const& shards = it.second; - if (engine.type() != ExecutionNode::ENUMERATE_IRESEARCH_VIEW && + if (engine.type() != EngineInfo::EngineType::View && query.trx()->isInaccessibleCollectionId(engine.collection()->getPlanId())) { for (ShardID sid : shards) { opts.inaccessibleCollections.insert(sid); @@ -571,18 +629,14 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage( for (auto const& it : _engineInfos) { TRI_ASSERT(it.first); - EngineInfo const& engine = *it.first; + EngineInfo& engine = *it.first; std::vector const& shards = it.second; #ifdef USE_IRESEARCH // serialize for the list of shards - if (engine.type() == ExecutionNode::ENUMERATE_IRESEARCH_VIEW) { + if (engine.type() == EngineInfo::EngineType::View) { engine.serializeSnippet(serverId, query, shards, infoBuilder); - - if (engine.scatter()) { - // register current DBServer for scatter associated with the view, if any - engine.scatter()->clients().emplace_back(serverId); - } + engine.addClient(serverId); continue; } @@ -726,6 +780,7 @@ std::map EngineInfoContaine } auto& responsible = (*servers)[0]; + auto& mapping = dbServerMapping[responsible]; mapping.addShardLock(colInfo.lockType, s); diff --git a/arangod/Aql/EngineInfoContainerDBServer.h b/arangod/Aql/EngineInfoContainerDBServer.h index 06789c051f..3148f8c25b 100644 --- a/arangod/Aql/EngineInfoContainerDBServer.h +++ b/arangod/Aql/EngineInfoContainerDBServer.h @@ -32,6 +32,7 @@ #include "VocBase/AccessMode.h" #include +#include namespace arangodb { @@ -43,6 +44,7 @@ namespace aql { struct Collection; class GraphNode; +class GatherNode; class ScatterNode; class Query; @@ -76,6 +78,11 @@ class EngineInfoContainerDBServer { struct EngineInfo { public: + enum class EngineType { + Collection, // collection based engine (per-shard) + View // view based engine (per-server) + }; + explicit EngineInfo(size_t idOfRemoteNode) noexcept; EngineInfo(EngineInfo&& other) noexcept; ~EngineInfo(); @@ -88,7 +95,7 @@ class EngineInfoContainerDBServer { void connectQueryId(QueryId id) noexcept { _otherId = id; } Collection const* collection() const noexcept; - void collection(Collection* col) noexcept { _collection = col; } + void collection(Collection* col) noexcept; void serializeSnippet(Query& query, ShardID id, velocypack::Builder& infoBuilder, bool isResponsibleForInit) const; @@ -97,29 +104,51 @@ class EngineInfoContainerDBServer { std::vector const& shards, velocypack::Builder& infoBuilder) const; - /// @returns type of the "main node" if applicable, - /// 'ExecutionNode::MAX_NODE_TYPE_VALUE' otherwise - ExecutionNode::NodeType type() const noexcept { return _type; } + /// @returns type of the engine + EngineType type() const noexcept { + return static_cast(_source.which()); + } #ifdef USE_IRESEARCH LogicalView const* view() const noexcept; - ScatterNode* scatter() const noexcept; + void addClient(ServerID const& server); #endif private: + struct CollectionSource { + explicit CollectionSource(aql::Collection* collection) noexcept + : collection(collection) { + } + CollectionSource(CollectionSource&&) = default; + CollectionSource& operator=(CollectionSource&&) = default; + + aql::Collection* collection{}; // The collection used to connect to this engine + std::string restrictedShard; // The shard this snippet is restricted to + }; + + struct ViewSource { + ViewSource( + LogicalView const& view, + GatherNode* gather, + ScatterNode* scatter) noexcept + : view(&view), + gather(gather), + scatter(scatter) { + } + + LogicalView const* view{}; // The view used to connect to this engine + GatherNode* gather{}; // The gather associated with the engine + ScatterNode* scatter{}; // The scatter associated with the engine + size_t numClients{}; // A number of db servers the engine is distributed accross + }; + EngineInfo(EngineInfo&) = delete; EngineInfo(EngineInfo const& other) = delete; std::vector _nodes; size_t _idOfRemoteNode; // id of the remote node QueryId _otherId; // Id of query engine before this one - union { - Collection* _collection; // The collection used to connect to this engine - LogicalView const* _view; // The view used to connect to this engine - }; - ShardID _restrictedShard; // The shard this snippet is restricted to - ScatterNode* _scatter{}; // The scatter associated with the engine - ExecutionNode::NodeType _type{ExecutionNode::MAX_NODE_TYPE_VALUE}; // type of the "main node" + mutable boost::variant _source; }; struct DBServerInfo { diff --git a/arangod/IResearch/IResearchViewBlock.cpp b/arangod/IResearch/IResearchViewBlock.cpp index 2fb489da22..a4485b2708 100644 --- a/arangod/IResearch/IResearchViewBlock.cpp +++ b/arangod/IResearch/IResearchViewBlock.cpp @@ -195,11 +195,11 @@ void IResearchViewBlockBase::reset() { if (!arangodb::iresearch::FilterFactory::filter(&root, queryCtx, viewNode.filterCondition())) { - LOG_TOPIC(WARN, arangodb::iresearch::TOPIC) - << "failed to build filter while querying arangosearch view , query '" - << viewNode.filterCondition().toVelocyPack(true)->toJson() << "'"; - - THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_BAD_PARAMETER, + "failed to build filter while querying arangosearch view, query '" + + viewNode.filterCondition().toVelocyPack(true)->toJson() + "'" + ); } if (_volatileSort) { diff --git a/arangod/IResearch/IResearchViewOptimizerRules.cpp b/arangod/IResearch/IResearchViewOptimizerRules.cpp index 823931f2da..220001f43b 100644 --- a/arangod/IResearch/IResearchViewOptimizerRules.cpp +++ b/arangod/IResearch/IResearchViewOptimizerRules.cpp @@ -47,20 +47,6 @@ using EN = arangodb::aql::ExecutionNode; namespace { -size_t numberOfShards(arangodb::CollectionNameResolver const& resolver, - arangodb::LogicalView const& view) { - size_t numberOfShards = 0; - - auto visitor = [&numberOfShards](arangodb::LogicalCollection const& collection) noexcept { - numberOfShards += collection.numberOfShards(); - return true; - }; - - resolver.visitCollections(visitor, view.id()); - - return numberOfShards; -} - bool addView(arangodb::LogicalView const& view, arangodb::aql::Query& query) { auto* collections = query.collections(); @@ -257,7 +243,6 @@ void scatterViewInClusterRule(arangodb::aql::Optimizer* opt, } auto& vocbase = viewNode.vocbase(); - auto& view = *viewNode.view(); bool const isRootNode = plan->isRoot(node); plan->unlinkNode(node, true); @@ -283,8 +268,12 @@ void scatterViewInClusterRule(arangodb::aql::Optimizer* opt, TRI_ASSERT(node); remoteNode->addDependency(node); + // so far we don't know the exact number of db servers where + // this query will be distributed, mode will be adjusted + // during query distribution phase by EngineInfoContainerDBServer + auto const sortMode = GatherNode::SortMode::Default; + // insert gather node - auto const sortMode = GatherNode::evaluateSortMode(numberOfShards(*resolver, view)); auto* gatherNode = plan->registerNode( std::make_unique(plan.get(), plan->nextId(), sortMode)); TRI_ASSERT(remoteNode); diff --git a/arangod/MMFiles/MMFilesWalRecoverState.cpp b/arangod/MMFiles/MMFilesWalRecoverState.cpp index fcafd60995..df5b4626cd 100644 --- a/arangod/MMFiles/MMFilesWalRecoverState.cpp +++ b/arangod/MMFiles/MMFilesWalRecoverState.cpp @@ -470,7 +470,7 @@ bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker, ++state->errorCount; } }; - TRI_DEFER(visitRecoveryHelpers); + TRI_DEFER(visitRecoveryHelpers()); #ifdef ARANGODB_ENABLE_FAILURE_TESTS LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)