From 1447d6901fd38bb578d243adb237334bb2850f73 Mon Sep 17 00:00:00 2001 From: Michael Hackstein Date: Mon, 8 Aug 2016 09:41:27 +0200 Subject: [PATCH] If a collection is not known to a Cluster Traversal. It will now throw an exception to prevent dead-locks and undefined behaviour. --- arangod/Aql/ExecutionEngine.cpp | 38 +++++++---- arangod/Cluster/ClusterMethods.cpp | 18 ++++-- arangod/Cluster/ClusterMethods.h | 2 +- arangod/Cluster/ClusterTraverser.cpp | 7 +- arangod/Cluster/TraverserEngine.cpp | 95 +++++++++++++++------------- arangod/Cluster/TraverserEngine.h | 2 +- 6 files changed, 91 insertions(+), 71 deletions(-) diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 119186eeeb..84ee8c7d5d 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -754,8 +754,10 @@ struct CoordinatorInstanciator : public WalkerWorker { // For edgeCollections the Ordering is important for the index access. // Also the same edgeCollection can be included twice (iff direction is ANY) auto clusterInfo = arangodb::ClusterInfo::instance(); - std::unordered_map>, std::vector>> + std::unordered_map< + ServerID, + std::pair>, + std::unordered_map>>> mappingServerToCollections; size_t length = edges.size(); for (size_t i = 0; i < length; ++i) { @@ -773,7 +775,8 @@ struct CoordinatorInstanciator : public WalkerWorker { } } - std::vector> const& vertices = en->vertexColls(); + std::vector> const& vertices = + en->vertexColls(); if (vertices.empty()) { std::unordered_set knownEdges; for (auto const& it : edges) { @@ -783,8 +786,9 @@ struct CoordinatorInstanciator : public WalkerWorker { // ALL collections known to this query. auto cs = query->collections()->collections(); for (auto const& collection : (*cs)) { - if (knownEdges.find(collection.second->getName()) == knownEdges.end()) { - // This collection is not one of the edge collections used in this graph. + if (knownEdges.find(collection.second->getName()) == knownEdges.end()) { + // This collection is not one of the edge collections used in this + // graph. auto shardIds = collection.second->shardIds(); for (auto const& shard : *shardIds) { auto serverList = clusterInfo->getResponsibleServer(shard); @@ -795,7 +799,7 @@ struct CoordinatorInstanciator : public WalkerWorker { // A server my be responsible for a shard in edge collection 1 but not 0 or 2. pair.first.resize(length); } - pair.second.emplace_back(shard); + pair.second[collection.second->getName()].emplace_back(shard); } } } @@ -812,7 +816,7 @@ struct CoordinatorInstanciator : public WalkerWorker { // A server my be responsible for a shard in edge collection 1 but not 0 or 2. pair.first.resize(length); } - pair.second.emplace_back(shard); + pair.second[it->getName()].emplace_back(shard); } } } @@ -832,9 +836,10 @@ struct CoordinatorInstanciator : public WalkerWorker { // [ ], // [ ] // ], - // "vertices" : [ - // - // ] + // "vertices" : { + // "v1": [], + // "v2": [] + // } // } // } @@ -853,10 +858,15 @@ struct CoordinatorInstanciator : public WalkerWorker { engineInfo.add(VPackValue("shards")); engineInfo.openObject(); engineInfo.add(VPackValue("vertices")); - engineInfo.openArray(); - for (auto const& v : list.second.second) { - shardSet.emplace(v); - engineInfo.add(VPackValue(v)); + engineInfo.openObject(); + for (auto const& col : list.second.second) { + engineInfo.add(VPackValue(col.first)); + engineInfo.openArray(); + for (auto const& v : col.second) { + shardSet.emplace(v); + engineInfo.add(VPackValue(v)); + } + engineInfo.close(); // this collection } engineInfo.close(); // vertices diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 7860a1a4e6..0789d89fed 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -1635,7 +1635,7 @@ int fetchEdgesFromEngines( /// If no server responds with a document /// a 'null' will be inserted into the result. -int fetchVerticesFromEngines( +void fetchVerticesFromEngines( std::string const& dbname, std::unordered_map const* engines, std::unordered_set& vertexIds, @@ -1679,21 +1679,29 @@ int fetchVerticesFromEngines( int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state - return commError; + THROW_ARANGO_EXCEPTION(commError); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtr(&VPackOptions::Defaults); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format - return TRI_ERROR_HTTP_CORRUPTED_JSON; + THROW_ARANGO_EXCEPTION(TRI_ERROR_HTTP_CORRUPTED_JSON); + } + if (res.answer_code != GeneralResponse::ResponseCode::OK) { + int code = arangodb::basics::VelocyPackHelper::getNumericValue( + resSlice, "errorNum", TRI_ERROR_INTERNAL); + // We have an error case here. Throw it. + THROW_ARANGO_EXCEPTION_MESSAGE( + code, arangodb::basics::VelocyPackHelper::getStringValue( + resSlice, "errorMessage", TRI_errno_string(code))); } for (auto const& pair : VPackObjectIterator(resSlice)) { if (vertexIds.erase(pair.key) == 0) { // We either found the same vertex twice, // or found a vertex we did not request. // Anyways something somewhere went seriously wrong - return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS; + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS); } TRI_ASSERT(result.find(pair.key) == result.end()); auto val = VPackBuilder::clone(pair.value); @@ -1710,8 +1718,6 @@ int fetchVerticesFromEngines( .steal()); } vertexIds.clear(); - - return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index ba6d9cdf1a..e697c62c35 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -164,7 +164,7 @@ int fetchEdgesFromEngines( /// If no server responds with a document /// a 'null' will be inserted into the result. -int fetchVerticesFromEngines( +void fetchVerticesFromEngines( std::string const&, std::unordered_map const*, std::unordered_set&, diff --git a/arangod/Cluster/ClusterTraverser.cpp b/arangod/Cluster/ClusterTraverser.cpp index cff60d0dcb..9f44c70bfd 100644 --- a/arangod/Cluster/ClusterTraverser.cpp +++ b/arangod/Cluster/ClusterTraverser.cpp @@ -103,11 +103,8 @@ bool ClusterTraverser::getSingleVertex(VPackSlice edge, VPackSlice comp, void ClusterTraverser::fetchVertices() { _readDocuments += _verticesToFetch.size(); TransactionBuilderLeaser lease(_trx); - int res = fetchVerticesFromEngines(_dbname, _engines, _verticesToFetch, - _vertices, *(lease.get())); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); - } + fetchVerticesFromEngines(_dbname, _engines, _verticesToFetch, _vertices, + *(lease.get())); _verticesToFetch.clear(); #warning Reimplement this. Fetching Documents Coordinator-Case /* diff --git a/arangod/Cluster/TraverserEngine.cpp b/arangod/Cluster/TraverserEngine.cpp index e9f56ffce5..f104bdc8ed 100644 --- a/arangod/Cluster/TraverserEngine.cpp +++ b/arangod/Cluster/TraverserEngine.cpp @@ -69,7 +69,7 @@ TraverserEngine::TraverserEngine(TRI_vocbase_t* vocbase, VPackSlice vertexSlice = shardsSlice.get(VERTICES); - if (vertexSlice.isNone() || !vertexSlice.isArray()) { + if (vertexSlice.isNone() || !vertexSlice.isObject()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "The " + SHARDS + " object requires an " + VERTICES + " attribute."); @@ -85,11 +85,17 @@ TraverserEngine::TraverserEngine(TRI_vocbase_t* vocbase, } // Add all Vertex shards to the transaction - for (VPackSlice const shard : VPackArrayIterator(vertexSlice)) { - TRI_ASSERT(shard.isString()); - std::string name = shard.copyString(); - _collections.add(name, TRI_TRANSACTION_READ); - _vertexShards.emplace_back(std::move(name)); + for (auto const& collection : VPackObjectIterator(vertexSlice)) { + std::vector shards; + for (VPackSlice const shard : VPackArrayIterator(collection.value)) { + TRI_ASSERT(shard.isString()); + std::string name = shard.copyString(); + _collections.add(name, TRI_TRANSACTION_READ); + shards.emplace_back(std::move(name)); + } + if (!shards.empty()) { + _vertexShards.emplace(collection.key.copyString(), shards); + } } _trx = new arangodb::AqlTransaction( @@ -161,14 +167,24 @@ void TraverserEngine::getVertexData(VPackSlice vertex, VPackBuilder& builder) { // Thanks locking TRI_ASSERT(vertex.isString() || vertex.isArray()); builder.openObject(); - VPackBuilder tmpResult; bool found; int res = TRI_ERROR_NO_ERROR; auto workOnOneDocument = [&](VPackSlice v) { - tmpResult.clear(); found = false; - for (std::string const& shard : _vertexShards) { - res = _trx->documentFastPath(shard, v, tmpResult, false); + StringRef id(v); + std::string name = id.substr(0, id.find('/')).toString(); + auto shards = _vertexShards.find(name); + if (shards == _vertexShards.end()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED, + "Collection not known to Traversal " + + name + " please add 'WITH " + name + + "' as the first line in your AQL"); + // The collection is not known here! + // Maybe handle differently + } + builder.add(v); + for (std::string const& shard : shards->second) { + res = _trx->documentFastPath(shard, v, builder, false); if (res == TRI_ERROR_NO_ERROR) { found = true; // FOUND short circuit. @@ -179,11 +195,8 @@ void TraverserEngine::getVertexData(VPackSlice vertex, VPackBuilder& builder) { THROW_ARANGO_EXCEPTION(res); } } - // TODO FILTERING! - // HOWTO Distinguish filtered vs NULL? - if (found) { - builder.add(v); - builder.add(tmpResult.slice()); + if (!found) { + builder.removeLast(); } }; @@ -208,34 +221,20 @@ void TraverserEngine::getVertexData(VPackSlice vertex, size_t depth, bool found = false; builder.openObject(); builder.add(VPackValue("vertices")); - if (vertex.isArray()) { - builder.openArray(); - for (VPackSlice v : VPackArrayIterator(vertex)) { - found = false; - for (std::string const& shard : _vertexShards) { - res = _trx->documentFastPath(shard, v, builder, false); - if (res == TRI_ERROR_NO_ERROR) { - read++; - found = true; - // FOUND short circuit. - break; - } - if (res != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) { - // We are in a very bad condition here... - THROW_ARANGO_EXCEPTION(res); - } - } - // TODO FILTERING! - // HOWTO Distinguish filtered vs NULL? - if (!found) { - builder.add(arangodb::basics::VelocyPackHelper::NullValue()); - } - } - builder.close(); - } else if (vertex.isString()) { + + auto workOnOneDocument = [&](VPackSlice v) { found = false; - for (std::string const& shard : _vertexShards) { - res = _trx->documentFastPath(shard, vertex, builder, false); + StringRef id(v); + std::string name = id.substr(0, id.find('/')).toString(); + auto shards = _vertexShards.find(name); + if (shards == _vertexShards.end()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED, + "Collection not known to Traversal " + + name + " please add 'WITH " + name + + "' as the first line in your AQL"); + } + for (std::string const& shard : shards->second) { + res = _trx->documentFastPath(shard, v, builder, false); if (res == TRI_ERROR_NO_ERROR) { read++; found = true; @@ -250,10 +249,18 @@ void TraverserEngine::getVertexData(VPackSlice vertex, size_t depth, // TODO FILTERING! // HOWTO Distinguish filtered vs NULL? if (!found) { - builder.add(arangodb::basics::VelocyPackHelper::NullValue()); + builder.removeLast(); } + }; + + if (vertex.isArray()) { + builder.openArray(); + for (VPackSlice v : VPackArrayIterator(vertex)) { + workOnOneDocument(v); + } + builder.close(); } else { - THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + workOnOneDocument(vertex); } builder.add("readIndex", VPackValue(read)); builder.add("filtered", VPackValue(filtered)); diff --git a/arangod/Cluster/TraverserEngine.h b/arangod/Cluster/TraverserEngine.h index 9b42ca6e16..4e6f68f5fa 100644 --- a/arangod/Cluster/TraverserEngine.h +++ b/arangod/Cluster/TraverserEngine.h @@ -79,7 +79,7 @@ class TraverserEngine { arangodb::Transaction* _trx; arangodb::aql::Collections _collections; std::unordered_set _locked; - std::vector _vertexShards; + std::unordered_map> _vertexShards; }; } // namespace traverser } // namespace arangodb