1
0
Fork 0

Let only on shard per collection, not per DBServer, be responsible for initialize cursor (and shutdown) (#8946)

* Let only on shard per collection, not per DBServer, be responsible for initialize cursor (and shutdown)

* Reverted assertion

* Changed parameter to const&

* Style correction
This commit is contained in:
Tobias Gödderz 2019-05-15 11:21:13 +02:00 committed by Michael Hackstein
parent b26ed84a81
commit 84d3d5dc5b
2 changed files with 46 additions and 25 deletions

View File

@ -255,7 +255,7 @@ void EngineInfoContainerDBServer::EngineInfo::addClient(ServerID const& server)
void EngineInfoContainerDBServer::EngineInfo::serializeSnippet( void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
ServerID const& serverId, Query& query, std::vector<ShardID> const& shards, ServerID const& serverId, Query& query, std::vector<ShardID> const& shards,
VPackBuilder& infoBuilder) const { VPackBuilder& infoBuilder, bool isResponsibleForInitializeCursor) const {
// The Key is required to build up the queryId mapping later // The Key is required to build up the queryId mapping later
// We're using serverId as queryId for the snippet since currently // We're using serverId as queryId for the snippet since currently
// it's impossible to have more than one view per engine // it's impossible to have more than one view per engine
@ -291,13 +291,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
rem->ownName(serverId); rem->ownName(serverId);
rem->queryId(_otherId); rem->queryId(_otherId);
// only one of the remote blocks is responsible for forwarding the rem->isResponsibleForInitializeCursor(isResponsibleForInitializeCursor);
// initializeCursor and shutDown requests
// for simplicity, we always use the first remote block if we have more
// than one
// Do we still need this???
rem->isResponsibleForInitializeCursor(true);
} }
if (previous != nullptr) { if (previous != nullptr) {
@ -316,7 +310,8 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
} }
void EngineInfoContainerDBServer::EngineInfo::serializeSnippet( void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
Query& query, ShardID id, VPackBuilder& infoBuilder, bool isResponsibleForInit) const { Query& query, const ShardID& id, VPackBuilder& infoBuilder,
bool isResponsibleForInitializeCursor) const {
auto* collection = boost::get<CollectionSource>(&_source); auto* collection = boost::get<CollectionSource>(&_source);
TRI_ASSERT(collection); TRI_ASSERT(collection);
auto& restrictedShard = collection->restrictedShard; auto& restrictedShard = collection->restrictedShard;
@ -326,7 +321,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
return; return;
} }
// We only have one shard it has to be responsible! // We only have one shard it has to be responsible!
isResponsibleForInit = true; isResponsibleForInitializeCursor = true;
} }
// The Key is required to build up the queryId mapping later // The Key is required to build up the queryId mapping later
infoBuilder.add(VPackValue(arangodb::basics::StringUtils::itoa(_idOfRemoteNode) + ":" + id)); infoBuilder.add(VPackValue(arangodb::basics::StringUtils::itoa(_idOfRemoteNode) + ":" + id));
@ -382,13 +377,7 @@ void EngineInfoContainerDBServer::EngineInfo::serializeSnippet(
rem->ownName(id); rem->ownName(id);
rem->queryId(_otherId); rem->queryId(_otherId);
// only one of the remote blocks is responsible for forwarding the rem->isResponsibleForInitializeCursor(isResponsibleForInitializeCursor);
// initializeCursor and shutDown requests
// for simplicity, we always use the first remote block if we have more
// than one
// Do we still need this???
rem->isResponsibleForInitializeCursor(isResponsibleForInit);
} }
if (previous != nullptr) { if (previous != nullptr) {
@ -574,6 +563,11 @@ void EngineInfoContainerDBServer::DBServerInfo::addEngine(
_engineInfos[info].emplace_back(id); _engineInfos[info].emplace_back(id);
} }
void EngineInfoContainerDBServer::DBServerInfo::setShardAsResponsibleForInitializeCursor(
ShardID const& id) {
_shardsResponsibleForInitializeCursor.emplace(id);
}
void EngineInfoContainerDBServer::DBServerInfo::buildMessage( void EngineInfoContainerDBServer::DBServerInfo::buildMessage(
ServerID const& serverId, EngineInfoContainerDBServer const& context, ServerID const& serverId, EngineInfoContainerDBServer const& context,
Query& query, VPackBuilder& infoBuilder) const { Query& query, VPackBuilder& infoBuilder) const {
@ -625,6 +619,21 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage(
infoBuilder.add(VPackValue("snippets")); infoBuilder.add(VPackValue("snippets"));
infoBuilder.openObject(); infoBuilder.openObject();
auto isResponsibleForInitializeCursor = [this](ShardID const& id) {
return _shardsResponsibleForInitializeCursor.find(id) !=
_shardsResponsibleForInitializeCursor.end();
};
auto isAnyResponsibleForInitializeCursor =
[&isResponsibleForInitializeCursor](std::vector<ShardID> const& ids) {
for (auto const& id : ids) {
if (isResponsibleForInitializeCursor(id)) {
return true;
}
}
return false;
};
for (auto const& it : _engineInfos) { for (auto const& it : _engineInfos) {
TRI_ASSERT(it.first); TRI_ASSERT(it.first);
EngineInfo& engine = *it.first; EngineInfo& engine = *it.first;
@ -632,16 +641,14 @@ void EngineInfoContainerDBServer::DBServerInfo::buildMessage(
// serialize for the list of shards // serialize for the list of shards
if (engine.type() == EngineInfo::EngineType::View) { if (engine.type() == EngineInfo::EngineType::View) {
engine.serializeSnippet(serverId, query, shards, infoBuilder); engine.serializeSnippet(serverId, query, shards, infoBuilder, isAnyResponsibleForInitializeCursor(shards));
engine.addClient(serverId); engine.addClient(serverId);
continue; continue;
} }
bool isResponsibleForInit = true;
for (auto const& shard : shards) { for (auto const& shard : shards) {
engine.serializeSnippet(query, shard, infoBuilder, isResponsibleForInit); engine.serializeSnippet(query, shard, infoBuilder, isResponsibleForInitializeCursor(shard));
isResponsibleForInit = false;
} }
} }
infoBuilder.close(); // snippets infoBuilder.close(); // snippets
@ -763,6 +770,11 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
// query // query
auto const& colInfo = it.second; auto const& colInfo = it.second;
// only one of the remote blocks is responsible for forwarding the
// initializeCursor and shutDown requests
// for simplicity, we always use the first remote block if we have more
// than one
bool isResponsibleForInitializeCursor = true;
for (auto const& s : colInfo.usedShards) { for (auto const& s : colInfo.usedShards) {
auto const servers = ci->getResponsibleServer(s); auto const servers = ci->getResponsibleServer(s);
@ -776,6 +788,11 @@ std::map<ServerID, EngineInfoContainerDBServer::DBServerInfo> EngineInfoContaine
auto& mapping = dbServerMapping[responsible]; auto& mapping = dbServerMapping[responsible];
if (isResponsibleForInitializeCursor) {
mapping.setShardAsResponsibleForInitializeCursor(s);
isResponsibleForInitializeCursor = false;
}
mapping.addShardLock(colInfo.lockType, s); mapping.addShardLock(colInfo.lockType, s);
for (auto& e : colInfo.engines) { for (auto& e : colInfo.engines) {

View File

@ -97,12 +97,12 @@ class EngineInfoContainerDBServer {
Collection const* collection() const noexcept; Collection const* collection() const noexcept;
void collection(Collection* col) noexcept; void collection(Collection* col) noexcept;
void serializeSnippet(Query& query, ShardID id, velocypack::Builder& infoBuilder, void serializeSnippet(Query& query, ShardID const& id, velocypack::Builder& infoBuilder,
bool isResponsibleForInit) const; bool isResponsibleForInitializeCursor) const;
void serializeSnippet(ServerID const& serverId, Query& query, void serializeSnippet(ServerID const& serverId, Query& query,
std::vector<ShardID> const& shards, std::vector<ShardID> const& shards, VPackBuilder& infoBuilder,
velocypack::Builder& infoBuilder) const; bool isResponsibleForInitializeCursor) const;
/// @returns type of the engine /// @returns type of the engine
EngineType type() const noexcept { EngineType type() const noexcept {
@ -155,6 +155,8 @@ class EngineInfoContainerDBServer {
void addEngine(std::shared_ptr<EngineInfo> info, ShardID const& id); void addEngine(std::shared_ptr<EngineInfo> info, ShardID const& id);
void setShardAsResponsibleForInitializeCursor(ShardID const& id);
void buildMessage(ServerID const& serverId, EngineInfoContainerDBServer const& context, void buildMessage(ServerID const& serverId, EngineInfoContainerDBServer const& context,
Query& query, velocypack::Builder& infoBuilder) const; Query& query, velocypack::Builder& infoBuilder) const;
@ -172,6 +174,8 @@ class EngineInfoContainerDBServer {
// @brief Map of all EngineInfos with their shards // @brief Map of all EngineInfos with their shards
std::unordered_map<std::shared_ptr<EngineInfo>, std::vector<ShardID>> _engineInfos; std::unordered_map<std::shared_ptr<EngineInfo>, std::vector<ShardID>> _engineInfos;
std::unordered_set<ShardID> _shardsResponsibleForInitializeCursor;
// @brief List of all information required for traverser engines // @brief List of all information required for traverser engines
std::vector<std::pair<GraphNode*, TraverserEngineShardLists>> _traverserEngineInfos; std::vector<std::pair<GraphNode*, TraverserEngineShardLists>> _traverserEngineInfos;
}; };