1
0
Fork 0

Added a basic implementation to receive filtered documents in the cluster. Right now it does not yet filter and is inefficent but serves the API

This commit is contained in:
Michael Hackstein 2015-11-27 11:20:12 +01:00
parent ae7ab86bff
commit bba4fe6089
3 changed files with 86 additions and 23 deletions

View File

@ -982,6 +982,54 @@ int getDocumentOnCoordinator (
// the DBserver could have reported an error.
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get a list of filtered documents in a coordinator
/// All found documents will be inserted into result.
/// After execution the documentIds will contain only all those
/// ids that could not be found.
////////////////////////////////////////////////////////////////////////////////
int getFilteredDocumentsOnCoordinator (
std::string const& dbname,
std::vector<traverser::TraverserExpression*> const& expressions,
std::map<std::string, std::string> const& headers,
std::unordered_set<std::string>& documentIds,
std::unordered_map<std::string, TRI_json_t*>& result) {
// TODO Proper implementation
for (auto it = documentIds.begin(); it != documentIds.end(); /* noop */) {
triagens::rest::HttpResponse::HttpResponseCode responseCode;
std::map<std::string, std::string> resultHeaders;
std::vector<std::string> splitId = triagens::basics::StringUtils::split(*it, '/');
TRI_ASSERT(splitId.size() == 2);
std::string vertexResult;
int res = getDocumentOnCoordinator(dbname,
splitId[0],
splitId[1],
0,
headers,
true,
responseCode,
resultHeaders,
vertexResult);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
if (responseCode == triagens::rest::HttpResponse::HttpResponseCode::NOT_FOUND) {
result.emplace(*it, nullptr);
++it;
}
else {
result.emplace(*it, triagens::basics::JsonHelper::fromString(vertexResult));
documentIds.erase(it++);
}
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get all documents in a coordinator
////////////////////////////////////////////////////////////////////////////////
@ -1140,11 +1188,16 @@ int getFilteredEdgesOnCoordinator (
}
for (it = shards.begin(); it != shards.end(); ++it) {
map<string, string>* headers = new map<string, string>;
res = cc->asyncRequest("", coordTransactionID, "shard:" + it->first,
triagens::rest::HttpRequest::HTTP_REQUEST_GET,
"/_db/" + StringUtils::urlEncode(dbname) + "/_api/edges/" + it->first + queryParameters,
0, false, headers, nullptr, 3600.0);
/*
res = cc->asyncRequest("", coordTransactionID, "shard:" + it->first,
triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
"/_db/" + StringUtils::urlEncode(dbname) + "/_api/edges/" + it->first + queryParameters,
&reqBodyString, false, headers, nullptr, 3600.0);
*/
delete res;
}
// Now listen to the results:

View File

@ -160,6 +160,20 @@ namespace triagens {
std::map<std::string, std::string>& resultHeaders,
std::string& resultBody);
////////////////////////////////////////////////////////////////////////////////
/// @brief get a list of filtered documents in a coordinator
/// All found documents will be inserted into result.
/// After execution the documentIds will contain only all those
/// ids that could not be found.
////////////////////////////////////////////////////////////////////////////////
int getFilteredDocumentsOnCoordinator (
std::string const& dbname,
std::vector<traverser::TraverserExpression*> const& expressions,
std::map<std::string, std::string> const& headers,
std::unordered_set<std::string>& documentIds,
std::unordered_map<std::string, TRI_json_t*>& result);
////////////////////////////////////////////////////////////////////////////////
/// @brief get all documents in a coordinator
////////////////////////////////////////////////////////////////////////////////
@ -198,7 +212,7 @@ namespace triagens {
std::vector<traverser::TraverserExpression*> const& expressions,
triagens::rest::HttpResponse::HttpResponseCode& responseCode,
std::string& contentType,
triagens::basics::Json& resultBody);
triagens::basics::Json& resultJson);
////////////////////////////////////////////////////////////////////////////////
/// @brief modify a document in a coordinator

View File

@ -100,17 +100,17 @@ void ClusterTraverser::EdgeGetter::operator() (std::string const& startVertex,
triagens::rest::HttpResponse::HttpResponseCode responseCode;
std::string contentType;
std::string collName = _traverser->_edgeCols[eColIdx];
std::vector<TraverserExpression*> expTmp;
std::vector<TraverserExpression*> expEdges;
auto found = _traverser->_expressions->find(depth);
if (found != _traverser->_expressions->end()) {
expTmp = found->second;
expEdges = found->second;
}
int res = getFilteredEdgesOnCoordinator(_traverser->_dbname,
collName,
startVertex,
_traverser->_opts.direction,
expTmp,
expEdges,
responseCode,
contentType,
resultEdges);
@ -141,25 +141,21 @@ void ClusterTraverser::EdgeGetter::operator() (std::string const& startVertex,
}
_traverser->_edges.emplace(edgeId, edge.copy().steal());
}
std::vector<TraverserExpression*> expVertices;
found = _traverser->_expressions->find(depth + 1);
if (found != _traverser->_expressions->end()) {
expVertices = found->second;
}
std::map<std::string, std::string> headers;
std::map<std::string, std::string> resultHeaders;
for (auto it : verticesToFetch) {
std::vector<std::string> splitId = triagens::basics::StringUtils::split(it, '/');
TRI_ASSERT(splitId.size() == 2);
std::string vertexResult;
int res = getDocumentOnCoordinator(_traverser->_dbname,
splitId[0],
splitId[1],
0,
headers,
true,
responseCode,
resultHeaders,
vertexResult);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
_traverser->_vertices.emplace(it, triagens::basics::JsonHelper::fromString(vertexResult));
res = getFilteredDocumentsOnCoordinator(_traverser->_dbname,
expVertices,
headers,
verticesToFetch,
_traverser->_vertices);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
std::string next = stack.top();
stack.pop();