1
0
Fork 0

evaluate sort mode of view gather block more precisely (#8008)

* evaluate sort mode of view gather block more precisely

* minor cleanup

* add missing line
This commit is contained in:
Andrey Abramov 2019-01-22 12:22:36 +03:00 committed by GitHub
parent cabeb8fc7b
commit 36cfec1cce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 104 deletions

View File

@ -703,10 +703,17 @@ arangodb::Result RemoteBlock::handleCommErrors(ClusterCommResult* res) const {
return {res->getErrorCode(), res->stringifyErrorMessage()};
}
if (res->status == CL_COMM_ERROR) {
std::string errorMessage =
std::string("Error message received from shard '") +
std::string(res->shardID) + std::string("' on cluster node '") +
std::string errorMessage;
auto const& shardID = res->shardID;
if (shardID.empty()) {
errorMessage = std::string("Error message received from cluster node '") +
std::string(res->serverID) + std::string("': ");
} else {
errorMessage = std::string("Error message received from shard '") +
std::string(shardID) + std::string("' on cluster node '") +
std::string(res->serverID) + std::string("': ");
}
int errorNum = TRI_ERROR_INTERNAL;
if (res->result != nullptr) {
@ -1181,12 +1188,13 @@ SortingGatherBlock::SortingGatherBlock(ExecutionEngine& engine, GatherNode const
TRI_ASSERT(!en.elements().empty());
switch (en.sortMode()) {
case GatherNode::SortMode::Heap:
_strategy = std::make_unique<HeapSorting>(_trx, _gatherBlockBuffer, _sortRegisters);
break;
case GatherNode::SortMode::MinElement:
_strategy = std::make_unique<MinElementSorting>(_trx, _gatherBlockBuffer, _sortRegisters);
break;
case GatherNode::SortMode::Heap:
case GatherNode::SortMode::Default: // use heap by default
_strategy = std::make_unique<HeapSorting>(_trx, _gatherBlockBuffer, _sortRegisters);
break;
default:
TRI_ASSERT(false);
break;

View File

@ -45,11 +45,12 @@ arangodb::velocypack::StringRef const SortModeMinElement("minelement");
arangodb::velocypack::StringRef const SortModeHeap("heap");
bool toSortMode(arangodb::velocypack::StringRef const& str, GatherNode::SortMode& mode) noexcept {
// std::map ~25-30% faster than std::unordered_map for small number of
// elements
// std::map ~25-30% faster than std::unordered_map for small number of elements
static std::map<arangodb::velocypack::StringRef, GatherNode::SortMode> const NameToValue{
{SortModeMinElement, GatherNode::SortMode::MinElement},
{SortModeHeap, GatherNode::SortMode::Heap}};
{SortModeHeap, GatherNode::SortMode::Heap},
{SortModeUnset, GatherNode::SortMode::Default}
};
auto const it = NameToValue.find(str);
@ -68,6 +69,8 @@ arangodb::velocypack::StringRef toString(GatherNode::SortMode mode) noexcept {
return SortModeMinElement;
case GatherNode::SortMode::Heap:
return SortModeHeap;
case GatherNode::SortMode::Default:
return SortModeUnset;
default:
TRI_ASSERT(false);
return {};

View File

@ -277,7 +277,7 @@ class GatherNode final : public ExecutionNode {
friend class RedundantCalculationsReplacer;
public:
enum class SortMode : uint32_t { MinElement, Heap };
enum class SortMode : uint32_t { MinElement, Heap, Default };
/// @brief inspect dependencies starting from a specified 'node'
/// and return first corresponding collection within

View File

@ -74,17 +74,45 @@ Result ExtractRemoteAndShard(VPackSlice keySlice, size_t& remoteId, std::string&
return {TRI_ERROR_NO_ERROR};
}
GatherNode* findFirstGather(ExecutionNode const& root) {
ExecutionNode* node = root.getFirstParent();
// moving down from a given node
// towards a return node
while (node) {
switch (node->getType()) {
case ExecutionNode::REMOTE:
node = node->getFirstParent();
if (!node || node->getType() != ExecutionNode::GATHER) {
return nullptr;
}
return ExecutionNode::castTo<GatherNode*>(node);
default:
node = node->getFirstParent();
break;
}
}
return nullptr;
}
ScatterNode* findFirstScatter(ExecutionNode const& root) {
ExecutionNode* node = root.getFirstDependency();
// moving up from a given node
// towards a singleton node
while (node) {
switch (node->getType()) {
case ExecutionNode::REMOTE:
node = node->getFirstDependency();
if (node == nullptr) {
if (!node) {
return nullptr;
}
if (node->getType() != ExecutionNode::SCATTER &&
node->getType() != ExecutionNode::DISTRIBUTE) {
return nullptr;
@ -103,7 +131,7 @@ ScatterNode* findFirstScatter(ExecutionNode const& root) {
} // namespace
EngineInfoContainerDBServer::EngineInfo::EngineInfo(size_t idOfRemoteNode) noexcept
: _idOfRemoteNode(idOfRemoteNode), _otherId(0), _collection(nullptr) {}
: _idOfRemoteNode(idOfRemoteNode), _otherId(0), _source(CollectionSource(nullptr)) {}
EngineInfoContainerDBServer::EngineInfo::~EngineInfo() {
// This container is not responsible for nodes
@ -116,56 +144,64 @@ EngineInfoContainerDBServer::EngineInfo::EngineInfo(EngineInfo&& other) noexcept
: _nodes(std::move(other._nodes)),
_idOfRemoteNode(other._idOfRemoteNode),
_otherId(other._otherId),
_collection(other._collection) {
_source(std::move(other._source)) {
TRI_ASSERT(!_nodes.empty());
TRI_ASSERT(_collection != nullptr);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
struct {
void operator()(CollectionSource const& source) {
TRI_ASSERT(source.collection);
}
void operator()(ViewSource const& source) {
TRI_ASSERT(source.view);
}
} visitor;
boost::apply_visitor(visitor, _source);
#endif
}
void EngineInfoContainerDBServer::EngineInfo::addNode(ExecutionNode* node) {
TRI_ASSERT(node);
auto setRestrictedShard = [](auto* node, auto& source) {
TRI_ASSERT(node);
auto* sourceImpl = boost::get<CollectionSource>(&source);
TRI_ASSERT(sourceImpl);
if (node->isRestricted()) {
TRI_ASSERT(sourceImpl->restrictedShard.empty());
sourceImpl->restrictedShard = node->restrictedShard();
}
};
switch (node->getType()) {
case ExecutionNode::ENUMERATE_COLLECTION: {
TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE);
auto ecNode = ExecutionNode::castTo<EnumerateCollectionNode*>(node);
if (ecNode->isRestricted()) {
TRI_ASSERT(_restrictedShard.empty());
_restrictedShard = ecNode->restrictedShard();
}
// do not set '_type' of the engine here,
// bacause satellite collections may consists of
// multiple "main nodes"
TRI_ASSERT(EngineType::Collection == type());
setRestrictedShard(ExecutionNode::castTo<EnumerateCollectionNode*>(node), _source);
break;
}
case ExecutionNode::INDEX: {
TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE);
auto idxNode = ExecutionNode::castTo<IndexNode*>(node);
if (idxNode->isRestricted()) {
TRI_ASSERT(_restrictedShard.empty());
_restrictedShard = idxNode->restrictedShard();
}
// do not set '_type' of the engine here,
// because satellite collections may consist of
// multiple "main nodes"
TRI_ASSERT(EngineType::Collection == type());
setRestrictedShard(ExecutionNode::castTo<IndexNode*>(node), _source);
break;
}
#ifdef USE_IRESEARCH
case ExecutionNode::ENUMERATE_IRESEARCH_VIEW: {
TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE);
TRI_ASSERT(EngineType::Collection == type());
auto& viewNode = *ExecutionNode::castTo<iresearch::IResearchViewNode*>(node);
// FIXME should we have a separate optimizer rule for that?
//
// evaluate node volatility before the distribution
// can't do it on DB servers since only parts of the plan will be sent
viewNode.volatility(true);
_scatter = findFirstScatter(*node);
_type = ExecutionNode::ENUMERATE_IRESEARCH_VIEW;
_view = viewNode.view().get();
_source = ViewSource(
*viewNode.view().get(),
findFirstGather(viewNode),
findFirstScatter(viewNode)
);
break;
}
#endif
@ -174,12 +210,8 @@ void EngineInfoContainerDBServer::EngineInfo::addNode(ExecutionNode* node) {
case ExecutionNode::REMOVE:
case ExecutionNode::REPLACE:
case ExecutionNode::UPSERT: {
TRI_ASSERT(_type == ExecutionNode::MAX_NODE_TYPE_VALUE);
auto modNode = ExecutionNode::castTo<ModificationNode*>(node);
if (modNode->isRestricted()) {
TRI_ASSERT(_restrictedShard.empty());
_restrictedShard = modNode->restrictedShard();
}
TRI_ASSERT(EngineType::Collection == type());
setRestrictedShard(ExecutionNode::castTo<ModificationNode*>(node), _source);
break;
}
default:
@ -190,21 +222,43 @@ void EngineInfoContainerDBServer::EngineInfo::addNode(ExecutionNode* node) {
}
Collection const* EngineInfoContainerDBServer::EngineInfo::collection() const noexcept {
#ifdef USE_IRESEARCH
TRI_ASSERT(ExecutionNode::ENUMERATE_IRESEARCH_VIEW != _type);
#endif
return _collection;
TRI_ASSERT(EngineType::Collection == type());
auto* source = boost::get<CollectionSource>(&_source);
TRI_ASSERT(source);
return source->collection;
}
void EngineInfoContainerDBServer::EngineInfo::collection(Collection* col) noexcept {
TRI_ASSERT(EngineType::Collection == type());
auto* source = boost::get<CollectionSource>(&_source);
TRI_ASSERT(source);
source->collection = col;
}
#ifdef USE_IRESEARCH
LogicalView const* EngineInfoContainerDBServer::EngineInfo::view() const noexcept {
TRI_ASSERT(ExecutionNode::ENUMERATE_IRESEARCH_VIEW == _type);
return _view;
TRI_ASSERT(EngineType::View == type());
auto* source = boost::get<ViewSource>(&_source);
TRI_ASSERT(source);
return source->view;
}
ScatterNode* EngineInfoContainerDBServer::EngineInfo::scatter() const noexcept {
TRI_ASSERT(ExecutionNode::ENUMERATE_IRESEARCH_VIEW == _type);
return _scatter;
void EngineInfoContainerDBServer::EngineInfo::addClient(ServerID const& server) {
TRI_ASSERT(EngineType::View == type());
auto* source = boost::get<ViewSource>(&_source);
TRI_ASSERT(source);
if (source->scatter) {
auto& clients = source->scatter->clients();
TRI_ASSERT(clients.end() == std::find(clients.begin(), clients.end(), server));
clients.emplace_back(server);
}
if (source->gather) {
// FIXME introduce a separate step if sort mode detection will become heavy
source->gather->sortMode(GatherNode::evaluateSortMode(++source->numClients));
}
}
#endif
@ -275,8 +329,12 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
Query& query, ShardID id, VPackBuilder& infoBuilder, bool isResponsibleForInit) const {
if (!_restrictedShard.empty()) {
if (id != _restrictedShard) {
auto* collection = boost::get<CollectionSource>(&_source);
TRI_ASSERT(collection);
auto& restrictedShard = collection->restrictedShard;
if (!restrictedShard.empty()) {
if (id != restrictedShard) {
return;
}
// We only have one shard it has to be responsible!
@ -293,7 +351,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
// this clone does the translation collection => shardId implicitly
// at the relevant parts of the query.
_collection->setCurrentShard(id);
collection->collection->setCurrentShard(id);
ExecutionPlan plan(query.ast());
ExecutionNode* previous = nullptr;
@ -335,7 +393,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
plan.setVarUsageComputed();
const unsigned flags = ExecutionNode::SERIALIZE_DETAILS;
plan.root()->toVelocyPack(infoBuilder, flags, /*keepTopLevelOpen*/ false);
_collection->resetCurrentShard();
collection->collection->resetCurrentShard();
}
void EngineInfoContainerDBServer::CollectionInfo::mergeShards(
@ -439,7 +497,7 @@ void EngineInfoContainerDBServer::closeSnippet(QueryId coordinatorEngineId) {
e->connectQueryId(coordinatorEngineId);
#ifdef USE_IRESEARCH
if (ExecutionNode::ENUMERATE_IRESEARCH_VIEW == e->type()) {
if (EngineInfo::EngineType::View == e->type()) {
_viewInfos[e->view()].engines.emplace_back(std::move(e));
} else
#endif
@ -547,7 +605,7 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage(
EngineInfo const& engine = *it.first;
std::vector<ShardID> const& shards = it.second;
if (engine.type() != ExecutionNode::ENUMERATE_IRESEARCH_VIEW &&
if (engine.type() != EngineInfo::EngineType::View &&
query.trx()->isInaccessibleCollectionId(engine.collection()->getPlanId())) {
for (ShardID sid : shards) {
opts.inaccessibleCollections.insert(sid);
@ -571,18 +629,14 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage(
for (auto const& it : _engineInfos) {
TRI_ASSERT(it.first);
EngineInfo const& engine = *it.first;
EngineInfo& engine = *it.first;
std::vector<ShardID> const& shards = it.second;
#ifdef USE_IRESEARCH
// serialize for the list of shards
if (engine.type() == ExecutionNode::ENUMERATE_IRESEARCH_VIEW) {
if (engine.type() == EngineInfo::EngineType::View) {
engine.serializeSnippet(serverId, query, shards, infoBuilder);
if (engine.scatter()) {
// register current DBServer for scatter associated with the view, if any
engine.scatter()->clients().emplace_back(serverId);
}
engine.addClient(serverId);
continue;
}
@ -726,6 +780,7 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
}
auto& responsible = (*servers)[0];
auto& mapping = dbServerMapping[responsible];
mapping.addShardLock(colInfo.lockType, s);

View File

@ -32,6 +32,7 @@
#include "VocBase/AccessMode.h"
#include <stack>
#include <boost/variant.hpp>
namespace arangodb {
@ -43,6 +44,7 @@ namespace aql {
struct Collection;
class GraphNode;
class GatherNode;
class ScatterNode;
class Query;
@ -76,6 +78,11 @@ class EngineInfoContainerDBServer {
struct EngineInfo {
public:
enum class EngineType {
Collection, // collection based engine (per-shard)
View // view based engine (per-server)
};
explicit EngineInfo(size_t idOfRemoteNode) noexcept;
EngineInfo(EngineInfo&& other) noexcept;
~EngineInfo();
@ -88,7 +95,7 @@ class EngineInfoContainerDBServer {
void connectQueryId(QueryId id) noexcept { _otherId = id; }
Collection const* collection() const noexcept;
void collection(Collection* col) noexcept { _collection = col; }
void collection(Collection* col) noexcept;
void serializeSnippet(Query& query, ShardID id, velocypack::Builder& infoBuilder,
bool isResponsibleForInit) const;
@ -97,29 +104,51 @@ class EngineInfoContainerDBServer {
std::vector<ShardID> const& shards,
velocypack::Builder& infoBuilder) const;
/// @returns type of the "main node" if applicable,
/// 'ExecutionNode::MAX_NODE_TYPE_VALUE' otherwise
ExecutionNode::NodeType type() const noexcept { return _type; }
/// @returns type of the engine
EngineType type() const noexcept {
return static_cast<EngineType>(_source.which());
}
#ifdef USE_IRESEARCH
LogicalView const* view() const noexcept;
ScatterNode* scatter() const noexcept;
void addClient(ServerID const& server);
#endif
private:
struct CollectionSource {
explicit CollectionSource(aql::Collection* collection) noexcept
: collection(collection) {
}
CollectionSource(CollectionSource&&) = default;
CollectionSource& operator=(CollectionSource&&) = default;
aql::Collection* collection{}; // The collection used to connect to this engine
std::string restrictedShard; // The shard this snippet is restricted to
};
struct ViewSource {
ViewSource(
LogicalView const& view,
GatherNode* gather,
ScatterNode* scatter) noexcept
: view(&view),
gather(gather),
scatter(scatter) {
}
LogicalView const* view{}; // The view used to connect to this engine
GatherNode* gather{}; // The gather associated with the engine
ScatterNode* scatter{}; // The scatter associated with the engine
size_t numClients{}; // A number of db servers the engine is distributed accross
};
EngineInfo(EngineInfo&) = delete;
EngineInfo(EngineInfo const& other) = delete;
std::vector<ExecutionNode*> _nodes;
size_t _idOfRemoteNode; // id of the remote node
QueryId _otherId; // Id of query engine before this one
union {
Collection* _collection; // The collection used to connect to this engine
LogicalView const* _view; // The view used to connect to this engine
};
ShardID _restrictedShard; // The shard this snippet is restricted to
ScatterNode* _scatter{}; // The scatter associated with the engine
ExecutionNode::NodeType _type{ExecutionNode::MAX_NODE_TYPE_VALUE}; // type of the "main node"
mutable boost::variant<CollectionSource, ViewSource> _source;
};
struct DBServerInfo {

View File

@ -195,11 +195,11 @@ void IResearchViewBlockBase::reset() {
if (!arangodb::iresearch::FilterFactory::filter(&root, queryCtx,
viewNode.filterCondition())) {
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "failed to build filter while querying arangosearch view , query '"
<< viewNode.filterCondition().toVelocyPack(true)->toJson() << "'";
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"failed to build filter while querying arangosearch view, query '"
+ viewNode.filterCondition().toVelocyPack(true)->toJson() + "'"
);
}
if (_volatileSort) {

View File

@ -47,20 +47,6 @@ using EN = arangodb::aql::ExecutionNode;
namespace {
size_t numberOfShards(arangodb::CollectionNameResolver const& resolver,
arangodb::LogicalView const& view) {
size_t numberOfShards = 0;
auto visitor = [&numberOfShards](arangodb::LogicalCollection const& collection) noexcept {
numberOfShards += collection.numberOfShards();
return true;
};
resolver.visitCollections(visitor, view.id());
return numberOfShards;
}
bool addView(arangodb::LogicalView const& view, arangodb::aql::Query& query) {
auto* collections = query.collections();
@ -257,7 +243,6 @@ void scatterViewInClusterRule(arangodb::aql::Optimizer* opt,
}
auto& vocbase = viewNode.vocbase();
auto& view = *viewNode.view();
bool const isRootNode = plan->isRoot(node);
plan->unlinkNode(node, true);
@ -283,8 +268,12 @@ void scatterViewInClusterRule(arangodb::aql::Optimizer* opt,
TRI_ASSERT(node);
remoteNode->addDependency(node);
// so far we don't know the exact number of db servers where
// this query will be distributed, mode will be adjusted
// during query distribution phase by EngineInfoContainerDBServer
auto const sortMode = GatherNode::SortMode::Default;
// insert gather node
auto const sortMode = GatherNode::evaluateSortMode(numberOfShards(*resolver, view));
auto* gatherNode = plan->registerNode(
std::make_unique<GatherNode>(plan.get(), plan->nextId(), sortMode));
TRI_ASSERT(remoteNode);

View File

@ -470,7 +470,7 @@ bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker,
++state->errorCount;
}
};
TRI_DEFER(visitRecoveryHelpers);
TRI_DEFER(visitRecoveryHelpers());
#ifdef ARANGODB_ENABLE_FAILURE_TESTS
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)