1
0
Fork 0

Bug fix/fix snippet responsibility for views (#9416)

* Fix query snippet responsibility for views

* Select one snippet per node id, instead of parent query id
This commit is contained in:
Tobias Gödderz 2019-07-09 15:21:43 +02:00 committed by KVS85
parent e049344e15
commit 789ef28a59
3 changed files with 64 additions and 38 deletions

View File

@ -21,6 +21,8 @@
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "EngineInfoContainerDBServer.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/ClusterNodes.h"
#include "Aql/Collection.h"
@ -38,7 +40,6 @@
#include "Cluster/ClusterTrxMethods.h"
#include "Cluster/ServerState.h"
#include "Cluster/TraverserEngineRegistry.h"
#include "EngineInfoContainerDBServer.h"
#include "Graph/BaseOptions.h"
#include "RestServer/QueryRegistryFeature.h"
#include "StorageEngine/TransactionState.h"
@ -585,7 +586,7 @@ void EngineInfoContainerDBServer::DBServerInfo::addShardLock(AccessMode::Type co
}
void EngineInfoContainerDBServer::DBServerInfo::addEngine(
std::shared_ptr<EngineInfoContainerDBServer::EngineInfo> info, ShardID const& id) {
std::shared_ptr<EngineInfoContainerDBServer::EngineInfo> const& info, ShardID const& id) {
_engineInfos[info].emplace_back(id);
}
@ -666,15 +667,18 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage(
std::vector<ShardID> const& shards = it.second;
// serialize for the list of shards
if (engine.type() == EngineInfo::EngineType::View) {
engine.serializeSnippet(serverId, query, shards, infoBuilder, isAnyResponsibleForInitializeCursor(shards));
engine.addClient(serverId);
continue;
}
for (auto const& shard : shards) {
engine.serializeSnippet(query, shard, infoBuilder, isResponsibleForInitializeCursor(shard));
switch (engine.type()) {
case EngineInfo::EngineType::View: {
engine.serializeSnippet(serverId, query, shards, infoBuilder,
isAnyResponsibleForInitializeCursor(shards));
engine.addClient(serverId);
} break;
case EngineInfo::EngineType::Collection: {
for (auto const& shard : shards) {
engine.serializeSnippet(query, shard, infoBuilder,
isResponsibleForInitializeCursor(shard));
}
} break;
}
}
infoBuilder.close(); // snippets
@ -779,7 +783,7 @@ void EngineInfoContainerDBServer::DBServerInfo::combineTraverserEngines(ServerID
void EngineInfoContainerDBServer::DBServerInfo::addTraverserEngine(GraphNode* node,
TraverserEngineShardLists&& shards) {
_traverserEngineInfos.push_back(std::make_pair(node, std::move(shards)));
_traverserEngineInfos.emplace_back(std::make_pair(node, std::move(shards)));
}
std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContainerDBServer::createDBServerMapping() const {
@ -788,6 +792,26 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
std::map<ServerID, DBServerInfo> dbServerMapping;
// Only one remote block of each remote node is responsible for forwarding the
// initializeCursor and shutDown requests.
// For simplicity, we always use the first remote block if we have more
// than one.
using ExecutionNodeId = size_t;
std::unordered_map<ExecutionNodeId, std::pair<ServerID, ShardID>> responsibleForShutdown{};
auto const chooseResponsibleSnippet =
[&responsibleForShutdown](EngineInfo const& engineInfo,
ServerID const& dbServerId, ShardID const& shardId) {
for (auto const& node : engineInfo.nodes()) {
if (node->getType() == ExecutionNode::NodeType::REMOTE) {
// Try to emplace. We explicitly want to ignore duplicate inserts
// here, as we want one snippet per query!
std::ignore =
responsibleForShutdown.emplace(node->id(),
std::make_pair(dbServerId, shardId));
}
}
};
for (auto const& it : _collectionInfos) {
// it.first => Collection const*
// it.second.lockType => Lock Type
@ -796,33 +820,24 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
// query
auto const& colInfo = it.second;
// only one of the remote blocks is responsible for forwarding the
// initializeCursor and shutDown requests
// for simplicity, we always use the first remote block if we have more
// than one
bool isResponsibleForInitializeCursor = true;
for (auto const& s : colInfo.usedShards) {
auto const servers = ci->getResponsibleServer(s);
for (auto const& shardId : colInfo.usedShards) {
auto const servers = ci->getResponsibleServer(shardId);
if (!servers || servers->empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE,
"Could not find responsible server for shard " + s);
"Could not find responsible server for shard " + shardId);
}
auto& responsible = (*servers)[0];
auto const& dbServerId = (*servers)[0];
auto& mapping = dbServerMapping[responsible];
auto& mapping = dbServerMapping[dbServerId];
if (isResponsibleForInitializeCursor) {
mapping.setShardAsResponsibleForInitializeCursor(s);
isResponsibleForInitializeCursor = false;
}
mapping.addShardLock(colInfo.lockType, shardId);
mapping.addShardLock(colInfo.lockType, s);
for (auto& e : colInfo.engines) {
mapping.addEngine(e, s);
for (auto const& e : colInfo.engines) {
mapping.addEngine(e, shardId);
chooseResponsibleSnippet(*e, dbServerId, shardId);
}
for (auto const* view : colInfo.views) {
@ -833,12 +848,20 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
}
for (auto const& viewEngine : viewInfo->second.engines) {
mapping.addEngine(viewEngine, s);
mapping.addEngine(viewEngine, shardId);
chooseResponsibleSnippet(*viewEngine, dbServerId, shardId);
}
}
}
}
// Set one DBServer snippet to be responsible for each coordinator query snippet
for (auto const& it : responsibleForShutdown) {
ServerID const& serverId = it.second.first;
ShardID const& shardId = it.second.second;
dbServerMapping[serverId].setShardAsResponsibleForInitializeCursor(shardId);
}
#ifdef USE_ENTERPRISE
prepareSatellites(dbServerMapping);
#endif

View File

@ -31,6 +31,7 @@
#include "Cluster/ClusterInfo.h"
#include "VocBase/AccessMode.h"
#include <map>
#include <set>
#include <stack>
#include <boost/variant.hpp>
@ -87,6 +88,8 @@ class EngineInfoContainerDBServer {
explicit EngineInfo(size_t idOfRemoteNode) noexcept;
EngineInfo(EngineInfo&& other) noexcept;
~EngineInfo();
EngineInfo(EngineInfo&) = delete;
EngineInfo(EngineInfo const& other) = delete;
#if (_MSC_VER != 0)
#pragma warning(disable : 4521) // stfu wintendo.
@ -113,6 +116,12 @@ class EngineInfoContainerDBServer {
LogicalView const* view() const noexcept;
void addClient(ServerID const& server);
QueryId getParentQueryId() const noexcept { return _otherId; }
std::vector<ExecutionNode*> const& nodes() const noexcept {
return _nodes;
}
private:
struct CollectionSource {
explicit CollectionSource(aql::Collection* collection) noexcept
@ -141,9 +150,6 @@ class EngineInfoContainerDBServer {
size_t numClients{}; // A number of db servers the engine is distributed across
};
EngineInfo(EngineInfo&) = delete;
EngineInfo(EngineInfo const& other) = delete;
std::vector<ExecutionNode*> _nodes;
size_t _idOfRemoteNode; // id of the remote node
QueryId _otherId; // Id of query engine before this one
@ -154,7 +160,7 @@ class EngineInfoContainerDBServer {
public:
void addShardLock(AccessMode::Type const& lock, ShardID const& id);
void addEngine(std::shared_ptr<EngineInfo> info, ShardID const& id);
void addEngine(std::shared_ptr<EngineInfo> const& info, ShardID const& id);
void setShardAsResponsibleForInitializeCursor(ShardID const& id);

View File

@ -1729,7 +1729,6 @@ TEST_F(MoveShardTest, a_pending_moveshard_job_should_also_put_the_original_serve
Mock<AgentInterface> mockAgent;
When(Method(mockAgent, waitFor)).AlwaysReturn();
When(Method(mockAgent, write)).Do([&](query_t const& q, consensus::AgentInterface::WriteMode w) -> write_ret_t {
LOG_DEVEL << q->slice().toJson() << " " << __LINE__;
auto writes = q->slice()[0][0];
EXPECT_TRUE(writes.get("/arango/Target/Pending/1").get("op").copyString() ==
"delete");
@ -2017,8 +2016,6 @@ TEST_F(MoveShardTest, aborting_the_job_while_a_leader_transition_is_in_progress_
Mock<AgentInterface> mockAgent;
When(Method(mockAgent, waitFor)).AlwaysReturn();
When(Method(mockAgent, write)).Do([&](query_t const& q, consensus::AgentInterface::WriteMode w) -> write_ret_t {
LOG_DEVEL << q->slice().toJson() << " " << __LINE__;
auto writes = q->slice()[0][0];
EXPECT_TRUE(writes.get("/arango/Target/Pending/1").get("op").copyString() ==
"delete");