mirror of https://gitee.com/bigwinds/arangodb
If a collection is not known to a Cluster Traversal. It will now throw an exception to prevent dead-locks and undefined behaviour.
This commit is contained in:
parent
1ebb19d1da
commit
1447d6901f
|
@ -754,8 +754,10 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
// For edgeCollections the Ordering is important for the index access.
|
||||
// Also the same edgeCollection can be included twice (iff direction is ANY)
|
||||
auto clusterInfo = arangodb::ClusterInfo::instance();
|
||||
std::unordered_map<ServerID,
|
||||
std::pair<std::vector<std::vector<ShardID>>, std::vector<ShardID>>>
|
||||
std::unordered_map<
|
||||
ServerID,
|
||||
std::pair<std::vector<std::vector<ShardID>>,
|
||||
std::unordered_map<std::string, std::vector<ShardID>>>>
|
||||
mappingServerToCollections;
|
||||
size_t length = edges.size();
|
||||
for (size_t i = 0; i < length; ++i) {
|
||||
|
@ -773,7 +775,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
}
|
||||
}
|
||||
|
||||
std::vector<std::unique_ptr<arangodb::aql::Collection>> const& vertices = en->vertexColls();
|
||||
std::vector<std::unique_ptr<arangodb::aql::Collection>> const& vertices =
|
||||
en->vertexColls();
|
||||
if (vertices.empty()) {
|
||||
std::unordered_set<std::string> knownEdges;
|
||||
for (auto const& it : edges) {
|
||||
|
@ -783,8 +786,9 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
// ALL collections known to this query.
|
||||
auto cs = query->collections()->collections();
|
||||
for (auto const& collection : (*cs)) {
|
||||
if (knownEdges.find(collection.second->getName()) == knownEdges.end()) {
|
||||
// This collection is not one of the edge collections used in this graph.
|
||||
if (knownEdges.find(collection.second->getName()) == knownEdges.end()) {
|
||||
// This collection is not one of the edge collections used in this
|
||||
// graph.
|
||||
auto shardIds = collection.second->shardIds();
|
||||
for (auto const& shard : *shardIds) {
|
||||
auto serverList = clusterInfo->getResponsibleServer(shard);
|
||||
|
@ -795,7 +799,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
// A server my be responsible for a shard in edge collection 1 but not 0 or 2.
|
||||
pair.first.resize(length);
|
||||
}
|
||||
pair.second.emplace_back(shard);
|
||||
pair.second[collection.second->getName()].emplace_back(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -812,7 +816,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
// A server my be responsible for a shard in edge collection 1 but not 0 or 2.
|
||||
pair.first.resize(length);
|
||||
}
|
||||
pair.second.emplace_back(shard);
|
||||
pair.second[it->getName()].emplace_back(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -832,9 +836,10 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
// [ <shards of edge collection 1> ],
|
||||
// [ <shards of edge collection 2> ]
|
||||
// ],
|
||||
// "vertices" : [
|
||||
// <shards of vertex collections>
|
||||
// ]
|
||||
// "vertices" : {
|
||||
// "v1": [<shards of v1>],
|
||||
// "v2": [<shards of v2>]
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
|
@ -853,10 +858,15 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
|
|||
engineInfo.add(VPackValue("shards"));
|
||||
engineInfo.openObject();
|
||||
engineInfo.add(VPackValue("vertices"));
|
||||
engineInfo.openArray();
|
||||
for (auto const& v : list.second.second) {
|
||||
shardSet.emplace(v);
|
||||
engineInfo.add(VPackValue(v));
|
||||
engineInfo.openObject();
|
||||
for (auto const& col : list.second.second) {
|
||||
engineInfo.add(VPackValue(col.first));
|
||||
engineInfo.openArray();
|
||||
for (auto const& v : col.second) {
|
||||
shardSet.emplace(v);
|
||||
engineInfo.add(VPackValue(v));
|
||||
}
|
||||
engineInfo.close(); // this collection
|
||||
}
|
||||
engineInfo.close(); // vertices
|
||||
|
||||
|
|
|
@ -1635,7 +1635,7 @@ int fetchEdgesFromEngines(
|
|||
/// If no server responds with a document
|
||||
/// a 'null' will be inserted into the result.
|
||||
|
||||
int fetchVerticesFromEngines(
|
||||
void fetchVerticesFromEngines(
|
||||
std::string const& dbname,
|
||||
std::unordered_map<ServerID, traverser::TraverserEngineID> const* engines,
|
||||
std::unordered_set<VPackSlice>& vertexIds,
|
||||
|
@ -1679,21 +1679,29 @@ int fetchVerticesFromEngines(
|
|||
int commError = handleGeneralCommErrors(&res);
|
||||
if (commError != TRI_ERROR_NO_ERROR) {
|
||||
// oh-oh cluster is in a bad state
|
||||
return commError;
|
||||
THROW_ARANGO_EXCEPTION(commError);
|
||||
}
|
||||
TRI_ASSERT(res.answer != nullptr);
|
||||
auto resBody = res.answer->toVelocyPackBuilderPtr(&VPackOptions::Defaults);
|
||||
VPackSlice resSlice = resBody->slice();
|
||||
if (!resSlice.isObject()) {
|
||||
// Response has invalid format
|
||||
return TRI_ERROR_HTTP_CORRUPTED_JSON;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_HTTP_CORRUPTED_JSON);
|
||||
}
|
||||
if (res.answer_code != GeneralResponse::ResponseCode::OK) {
|
||||
int code = arangodb::basics::VelocyPackHelper::getNumericValue<int>(
|
||||
resSlice, "errorNum", TRI_ERROR_INTERNAL);
|
||||
// We have an error case here. Throw it.
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
code, arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
resSlice, "errorMessage", TRI_errno_string(code)));
|
||||
}
|
||||
for (auto const& pair : VPackObjectIterator(resSlice)) {
|
||||
if (vertexIds.erase(pair.key) == 0) {
|
||||
// We either found the same vertex twice,
|
||||
// or found a vertex we did not request.
|
||||
// Anyways something somewhere went seriously wrong
|
||||
return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS;
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS);
|
||||
}
|
||||
TRI_ASSERT(result.find(pair.key) == result.end());
|
||||
auto val = VPackBuilder::clone(pair.value);
|
||||
|
@ -1710,8 +1718,6 @@ int fetchVerticesFromEngines(
|
|||
.steal());
|
||||
}
|
||||
vertexIds.clear();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -164,7 +164,7 @@ int fetchEdgesFromEngines(
|
|||
/// If no server responds with a document
|
||||
/// a 'null' will be inserted into the result.
|
||||
|
||||
int fetchVerticesFromEngines(
|
||||
void fetchVerticesFromEngines(
|
||||
std::string const&,
|
||||
std::unordered_map<ServerID, traverser::TraverserEngineID> const*,
|
||||
std::unordered_set<arangodb::velocypack::Slice>&,
|
||||
|
|
|
@ -103,11 +103,8 @@ bool ClusterTraverser::getSingleVertex(VPackSlice edge, VPackSlice comp,
|
|||
void ClusterTraverser::fetchVertices() {
|
||||
_readDocuments += _verticesToFetch.size();
|
||||
TransactionBuilderLeaser lease(_trx);
|
||||
int res = fetchVerticesFromEngines(_dbname, _engines, _verticesToFetch,
|
||||
_vertices, *(lease.get()));
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
fetchVerticesFromEngines(_dbname, _engines, _verticesToFetch, _vertices,
|
||||
*(lease.get()));
|
||||
_verticesToFetch.clear();
|
||||
#warning Reimplement this. Fetching Documents Coordinator-Case
|
||||
/*
|
||||
|
|
|
@ -69,7 +69,7 @@ TraverserEngine::TraverserEngine(TRI_vocbase_t* vocbase,
|
|||
|
||||
VPackSlice vertexSlice = shardsSlice.get(VERTICES);
|
||||
|
||||
if (vertexSlice.isNone() || !vertexSlice.isArray()) {
|
||||
if (vertexSlice.isNone() || !vertexSlice.isObject()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_BAD_PARAMETER,
|
||||
"The " + SHARDS + " object requires an " + VERTICES + " attribute.");
|
||||
|
@ -85,11 +85,17 @@ TraverserEngine::TraverserEngine(TRI_vocbase_t* vocbase,
|
|||
}
|
||||
|
||||
// Add all Vertex shards to the transaction
|
||||
for (VPackSlice const shard : VPackArrayIterator(vertexSlice)) {
|
||||
TRI_ASSERT(shard.isString());
|
||||
std::string name = shard.copyString();
|
||||
_collections.add(name, TRI_TRANSACTION_READ);
|
||||
_vertexShards.emplace_back(std::move(name));
|
||||
for (auto const& collection : VPackObjectIterator(vertexSlice)) {
|
||||
std::vector<std::string> shards;
|
||||
for (VPackSlice const shard : VPackArrayIterator(collection.value)) {
|
||||
TRI_ASSERT(shard.isString());
|
||||
std::string name = shard.copyString();
|
||||
_collections.add(name, TRI_TRANSACTION_READ);
|
||||
shards.emplace_back(std::move(name));
|
||||
}
|
||||
if (!shards.empty()) {
|
||||
_vertexShards.emplace(collection.key.copyString(), shards);
|
||||
}
|
||||
}
|
||||
|
||||
_trx = new arangodb::AqlTransaction(
|
||||
|
@ -161,14 +167,24 @@ void TraverserEngine::getVertexData(VPackSlice vertex, VPackBuilder& builder) {
|
|||
// Thanks locking
|
||||
TRI_ASSERT(vertex.isString() || vertex.isArray());
|
||||
builder.openObject();
|
||||
VPackBuilder tmpResult;
|
||||
bool found;
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
auto workOnOneDocument = [&](VPackSlice v) {
|
||||
tmpResult.clear();
|
||||
found = false;
|
||||
for (std::string const& shard : _vertexShards) {
|
||||
res = _trx->documentFastPath(shard, v, tmpResult, false);
|
||||
StringRef id(v);
|
||||
std::string name = id.substr(0, id.find('/')).toString();
|
||||
auto shards = _vertexShards.find(name);
|
||||
if (shards == _vertexShards.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED,
|
||||
"Collection not known to Traversal " +
|
||||
name + " please add 'WITH " + name +
|
||||
"' as the first line in your AQL");
|
||||
// The collection is not known here!
|
||||
// Maybe handle differently
|
||||
}
|
||||
builder.add(v);
|
||||
for (std::string const& shard : shards->second) {
|
||||
res = _trx->documentFastPath(shard, v, builder, false);
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
found = true;
|
||||
// FOUND short circuit.
|
||||
|
@ -179,11 +195,8 @@ void TraverserEngine::getVertexData(VPackSlice vertex, VPackBuilder& builder) {
|
|||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
}
|
||||
// TODO FILTERING!
|
||||
// HOWTO Distinguish filtered vs NULL?
|
||||
if (found) {
|
||||
builder.add(v);
|
||||
builder.add(tmpResult.slice());
|
||||
if (!found) {
|
||||
builder.removeLast();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -208,34 +221,20 @@ void TraverserEngine::getVertexData(VPackSlice vertex, size_t depth,
|
|||
bool found = false;
|
||||
builder.openObject();
|
||||
builder.add(VPackValue("vertices"));
|
||||
if (vertex.isArray()) {
|
||||
builder.openArray();
|
||||
for (VPackSlice v : VPackArrayIterator(vertex)) {
|
||||
found = false;
|
||||
for (std::string const& shard : _vertexShards) {
|
||||
res = _trx->documentFastPath(shard, v, builder, false);
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
read++;
|
||||
found = true;
|
||||
// FOUND short circuit.
|
||||
break;
|
||||
}
|
||||
if (res != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) {
|
||||
// We are in a very bad condition here...
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
}
|
||||
// TODO FILTERING!
|
||||
// HOWTO Distinguish filtered vs NULL?
|
||||
if (!found) {
|
||||
builder.add(arangodb::basics::VelocyPackHelper::NullValue());
|
||||
}
|
||||
}
|
||||
builder.close();
|
||||
} else if (vertex.isString()) {
|
||||
|
||||
auto workOnOneDocument = [&](VPackSlice v) {
|
||||
found = false;
|
||||
for (std::string const& shard : _vertexShards) {
|
||||
res = _trx->documentFastPath(shard, vertex, builder, false);
|
||||
StringRef id(v);
|
||||
std::string name = id.substr(0, id.find('/')).toString();
|
||||
auto shards = _vertexShards.find(name);
|
||||
if (shards == _vertexShards.end()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUERY_COLLECTION_LOCK_FAILED,
|
||||
"Collection not known to Traversal " +
|
||||
name + " please add 'WITH " + name +
|
||||
"' as the first line in your AQL");
|
||||
}
|
||||
for (std::string const& shard : shards->second) {
|
||||
res = _trx->documentFastPath(shard, v, builder, false);
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
read++;
|
||||
found = true;
|
||||
|
@ -250,10 +249,18 @@ void TraverserEngine::getVertexData(VPackSlice vertex, size_t depth,
|
|||
// TODO FILTERING!
|
||||
// HOWTO Distinguish filtered vs NULL?
|
||||
if (!found) {
|
||||
builder.add(arangodb::basics::VelocyPackHelper::NullValue());
|
||||
builder.removeLast();
|
||||
}
|
||||
};
|
||||
|
||||
if (vertex.isArray()) {
|
||||
builder.openArray();
|
||||
for (VPackSlice v : VPackArrayIterator(vertex)) {
|
||||
workOnOneDocument(v);
|
||||
}
|
||||
builder.close();
|
||||
} else {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
|
||||
workOnOneDocument(vertex);
|
||||
}
|
||||
builder.add("readIndex", VPackValue(read));
|
||||
builder.add("filtered", VPackValue(filtered));
|
||||
|
|
|
@ -79,7 +79,7 @@ class TraverserEngine {
|
|||
arangodb::Transaction* _trx;
|
||||
arangodb::aql::Collections _collections;
|
||||
std::unordered_set<std::string> _locked;
|
||||
std::vector<std::string> _vertexShards;
|
||||
std::unordered_map<std::string, std::vector<std::string>> _vertexShards;
|
||||
};
|
||||
} // namespace traverser
|
||||
} // namespace arangodb
|
||||
|
|
Loading…
Reference in New Issue