1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2017-03-30 14:56:19 +02:00
commit 90bade1976
1 changed files with 74 additions and 39 deletions

View File

@ -54,6 +54,33 @@
using namespace arangodb;
using namespace arangodb::aql;
// @brief Local struct to create the
// information required to build traverser engines
// on DB servers.
struct TraverserEngineShardLists {
explicit TraverserEngineShardLists(size_t length) {
// Make sure they all have a fixed size.
edgeCollections.resize(length);
}
~TraverserEngineShardLists() {
}
// Mapping for edge collections to shardIds.
// We have to retain the ordering of edge collections, all
// vectors of these in one run need to have identical size.
// This is because the conditions to query those edges have the
// same ordering.
std::vector<std::vector<ShardID>> edgeCollections;
// Mapping for vertexCollections to shardIds.
std::unordered_map<std::string, std::vector<ShardID>> vertexCollections;
};
/// Typedef for a complicated mapping used in TraverserEngines.
typedef std::unordered_map<ServerID, TraverserEngineShardLists> Serv2ColMap;
/// @brief helper function to create a block
static ExecutionBlock* CreateBlock(
ExecutionEngine* engine, ExecutionNode const* en,
@ -841,32 +868,30 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// For edgeCollections the Ordering is important for the index access.
// Also the same edgeCollection can be included twice (iff direction is ANY)
auto clusterInfo = arangodb::ClusterInfo::instance();
std::unordered_map<
ServerID,
std::pair<std::vector<std::vector<ShardID>>,
std::unordered_map<std::string, std::vector<ShardID>>>>
mappingServerToCollections;
auto servers = clusterInfo->getCurrentDBServers();
Serv2ColMap mappingServerToCollections;
size_t length = edges.size();
// Initialize on engine for every server known to this cluster
// Thanks to locking mechanism we cannot leave any out, even it
// is not responsible for anything...
for (auto s : servers) {
// We insert at lease an empty vector for every edge collection
// Used in the traverser.
auto& info = mappingServerToCollections[s];
// We need to exactly maintain the ordering.
// A server my be responsible for a shard in edge collection 1 but not 0 or 2.
info.first.resize(length);
}
auto findServerLists = [&] (ShardID const& shard) -> Serv2ColMap::iterator {
auto serverList = clusterInfo->getResponsibleServer(shard);
if (serverList->empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE,
"Could not find responsible server for shard " + shard);
}
TRI_ASSERT(!serverList->empty());
auto& leader = (*serverList)[0];
auto pair = mappingServerToCollections.find(leader);
if (pair == mappingServerToCollections.end()) {
mappingServerToCollections.emplace(leader, TraverserEngineShardLists{length});
pair = mappingServerToCollections.find(leader);
}
return pair;
};
for (size_t i = 0; i < length; ++i) {
auto shardIds = edges[i]->shardIds(_includedShards);
for (auto const& shard : *shardIds) {
auto serverList = clusterInfo->getResponsibleServer(shard);
TRI_ASSERT(!serverList->empty());
auto& pair = mappingServerToCollections[(*serverList)[0]];
pair.first[i].emplace_back(shard);
auto pair = findServerLists(shard);
pair->second.edgeCollections[i].emplace_back(shard);
}
}
@ -881,35 +906,45 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// ALL collections known to this query.
auto cs = query->collections()->collections();
for (auto const& collection : (*cs)) {
for (auto& entry : mappingServerToCollections) {
entry.second.second.emplace(collection.second->getName(),
std::vector<ShardID>());
}
if (knownEdges.find(collection.second->getName()) == knownEdges.end()) {
// This collection is not one of the edge collections used in this
// graph.
auto shardIds = collection.second->shardIds(_includedShards);
for (auto const& shard : *shardIds) {
auto serverList = clusterInfo->getResponsibleServer(shard);
TRI_ASSERT(!serverList->empty());
auto& pair = mappingServerToCollections[(*serverList)[0]];
pair.second[collection.second->getName()].emplace_back(shard);
auto pair = findServerLists(shard);
pair->second.vertexCollections[collection.second->getName()].emplace_back(shard);
}
}
}
// We have to make sure that all engines at least know all vertex collections.
// Thanks to fanout...
for (auto const& collection : (*cs)) {
for (auto& entry : mappingServerToCollections) {
auto it = entry.second.vertexCollections.find(collection.second->getName());
if (it == entry.second.vertexCollections.end()) {
entry.second.vertexCollections.emplace(collection.second->getName(),
std::vector<ShardID>());
}
}
}
} else {
// This Traversal is started with a GRAPH. It knows all relevant collections.
for (auto const& it : vertices) {
for (auto& entry : mappingServerToCollections) {
entry.second.second.emplace(it->getName(),
std::vector<ShardID>());
}
auto shardIds = it->shardIds(_includedShards);
for (auto const& shard : *shardIds) {
auto serverList = clusterInfo->getResponsibleServer(shard);
TRI_ASSERT(!serverList->empty());
auto& pair = mappingServerToCollections[(*serverList)[0]];
pair.second[it->getName()].emplace_back(shard);
auto pair = findServerLists(shard);
pair->second.vertexCollections[it->getName()].emplace_back(shard);
}
}
// We have to make sure that all engines at least know all vertex collections.
// Thanks to fanout...
for (auto const& it : vertices) {
for (auto& entry : mappingServerToCollections) {
auto vIt = entry.second.vertexCollections.find(it->getName());
if (vIt == entry.second.vertexCollections.end()) {
entry.second.vertexCollections.emplace(it->getName(),
std::vector<ShardID>());
}
}
}
}
@ -975,7 +1010,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
engineInfo.openObject();
engineInfo.add(VPackValue("vertices"));
engineInfo.openObject();
for (auto const& col : list.second.second) {
for (auto const& col : list.second.vertexCollections) {
engineInfo.add(VPackValue(col.first));
engineInfo.openArray();
for (auto const& v : col.second) {
@ -988,7 +1023,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
engineInfo.add(VPackValue("edges"));
engineInfo.openArray();
for (auto const& edgeShards : list.second.first) {
for (auto const& edgeShards : list.second.edgeCollections) {
engineInfo.openArray();
for (auto const& e : edgeShards) {
shardSet.emplace(e);