1
0
Fork 0

[3.4] Fix query snippet responsibility for views (#9460)

* Fix query snippet responsibility for views

Select one snippet per node id, instead of parent query id

* Updated CHANGELOG

* Fixed compile error
This commit is contained in:
Tobias Gödderz 2019-07-18 10:59:53 +02:00 committed by Michael Hackstein
parent 83227cb7a1
commit 94a84b6233
3 changed files with 83 additions and 24 deletions

View File

@ -1,6 +1,9 @@
v3.4.8 (XXXX-XX-XX)
-------------------
* Fixed a bug which could lead to some unnecessary HTTP requests during an AQL query in a cluster.
Only occurs with views in the query.
* Prevent rare cases of duplicate DDL actions being executed by Maintenance.
* coordinator code was reporting rocksdb error codes, but not the associated detail message.

View File

@ -22,6 +22,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "EngineInfoContainerDBServer.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/ClusterNodes.h"
#include "Aql/Collection.h"
@ -530,7 +531,7 @@ void EngineInfoContainerDBServer::DBServerInfo::addShardLock(AccessMode::Type co
}
void EngineInfoContainerDBServer::DBServerInfo::addEngine(
std::shared_ptr<EngineInfoContainerDBServer::EngineInfo> info, ShardID const& id) {
std::shared_ptr<EngineInfoContainerDBServer::EngineInfo> const& info, ShardID const& id) {
_engineInfos[info].emplace_back(id);
}
@ -731,7 +732,7 @@ void EngineInfoContainerDBServer::DBServerInfo::combineTraverserEngines(ServerID
void EngineInfoContainerDBServer::DBServerInfo::addTraverserEngine(GraphNode* node,
TraverserEngineShardLists&& shards) {
_traverserEngineInfos.push_back(std::make_pair(node, std::move(shards)));
_traverserEngineInfos.emplace_back(std::make_pair(node, std::move(shards)));
}
std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContainerDBServer::createDBServerMapping(
@ -741,6 +742,26 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
std::map<ServerID, DBServerInfo> dbServerMapping;
// Only one remote block of each remote node is responsible for forwarding the
// initializeCursor and shutDown requests.
// For simplicity, we always use the first remote block if we have more
// than one.
using ExecutionNodeId = size_t;
std::unordered_map<ExecutionNodeId, std::pair<ServerID, ShardID>> responsibleForShutdown{};
auto const chooseResponsibleSnippet =
[&responsibleForShutdown](EngineInfo const& engineInfo,
ServerID const& dbServerId, ShardID const& shardId) {
for (auto const& node : engineInfo.nodes()) {
if (node->getType() == ExecutionNode::NodeType::REMOTE) {
// Try to emplace. We explicitly want to ignore duplicate inserts
// here, as we want one snippet per query!
std::ignore =
responsibleForShutdown.emplace(node->id(),
std::make_pair(dbServerId, shardId));
}
}
};
for (auto const& it : _collectionInfos) {
// it.first => Collection const*
// it.second.lockType => Lock Type
@ -748,34 +769,26 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
// it.second.usedShards => All shards of this collection releveant for this query
auto const& colInfo = it.second;
// only one of the remote blocks is responsible for forwarding the
// initializeCursor and shutDown requests
// for simplicity, we always use the first remote block if we have more
// than one
bool isResponsibleForInitializeCursor = true;
for (auto const& s : colInfo.usedShards) {
lockedShards.emplace(s);
for (auto const& shardId : colInfo.usedShards) {
lockedShards.emplace(shardId);
auto const servers = ci->getResponsibleServer(s);
auto const servers = ci->getResponsibleServer(shardId);
if (!servers || servers->empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE,
"Could not find responsible server for shard " + s);
"Could not find responsible server for shard " + shardId);
}
auto& responsible = (*servers)[0];
auto& mapping = dbServerMapping[responsible];
auto const& dbServerId = (*servers)[0];
if (isResponsibleForInitializeCursor) {
mapping.setShardAsResponsibleForInitializeCursor(s);
isResponsibleForInitializeCursor = false;
}
auto& mapping = dbServerMapping[dbServerId];
mapping.addShardLock(colInfo.lockType, s);
mapping.addShardLock(colInfo.lockType, shardId);
for (auto& e : colInfo.engines) {
mapping.addEngine(e, s);
for (auto const& e : colInfo.engines) {
mapping.addEngine(e, shardId);
chooseResponsibleSnippet(*e, dbServerId, shardId);
}
for (auto const* view : colInfo.views) {
@ -786,12 +799,20 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
}
for (auto const& viewEngine : viewInfo->second.engines) {
mapping.addEngine(viewEngine, s);
mapping.addEngine(viewEngine, shardId);
chooseResponsibleSnippet(*viewEngine, dbServerId, shardId);
}
}
}
}
// Set one DBServer snippet to be responsible for each coordinator query snippet
for (auto const& it : responsibleForShutdown) {
ServerID const& serverId = it.second.first;
ShardID const& shardId = it.second.second;
dbServerMapping[serverId].setShardAsResponsibleForInitializeCursor(shardId);
}
#ifdef USE_ENTERPRISE
prepareSatellites(dbServerMapping);
#endif

View File

@ -31,6 +31,8 @@
#include "Cluster/ClusterInfo.h"
#include "VocBase/AccessMode.h"
#include <map>
#include <set>
#include <stack>
namespace arangodb {
@ -43,6 +45,7 @@ namespace aql {
struct Collection;
class GraphNode;
class GatherNode;
class ScatterNode;
class Query;
@ -79,6 +82,8 @@ class EngineInfoContainerDBServer {
explicit EngineInfo(size_t idOfRemoteNode) noexcept;
EngineInfo(EngineInfo&& other) noexcept;
~EngineInfo();
EngineInfo(EngineInfo&) = delete;
EngineInfo(EngineInfo const& other) = delete;
#if (_MSC_VER != 0)
#pragma warning(disable : 4521) // stfu wintendo.
@ -103,9 +108,39 @@ class EngineInfoContainerDBServer {
LogicalView const* view() const noexcept;
QueryId getParentQueryId() const noexcept { return _otherId; }
std::vector<ExecutionNode*> const& nodes() const noexcept {
return _nodes;
}
private:
EngineInfo(EngineInfo&) = delete;
EngineInfo(EngineInfo const& other) = delete;
struct CollectionSource {
explicit CollectionSource(aql::Collection* collection) noexcept
: collection(collection) {
}
CollectionSource(CollectionSource&&) = default;
CollectionSource& operator=(CollectionSource&&) = default;
aql::Collection* collection{}; // The collection used to connect to this engine
std::string restrictedShard; // The shard this snippet is restricted to
};
struct ViewSource {
ViewSource(
LogicalView const& view,
GatherNode* gather,
ScatterNode* scatter) noexcept
: view(&view),
gather(gather),
scatter(scatter) {
}
LogicalView const* view{}; // The view used to connect to this engine
GatherNode* gather{}; // The gather associated with the engine
ScatterNode* scatter{}; // The scatter associated with the engine
size_t numClients{}; // A number of db servers the engine is distributed across
};
std::vector<ExecutionNode*> _nodes;
size_t _idOfRemoteNode; // id of the remote node
@ -122,7 +157,7 @@ class EngineInfoContainerDBServer {
public:
void addShardLock(AccessMode::Type const& lock, ShardID const& id);
void addEngine(std::shared_ptr<EngineInfo> info, ShardID const& id);
void addEngine(std::shared_ptr<EngineInfo> const& info, ShardID const& id);
void setShardAsResponsibleForInitializeCursor(ShardID const& id);