mirror of https://gitee.com/bigwinds/arangodb
count HTTP requests from traversals (#8645)
This commit is contained in:
parent
616ea94f24
commit
4af7fa8f46
|
@ -1004,6 +1004,8 @@ Result EngineInfoContainerDBServer::buildEngines(MapRemoteToSnippet& queryIds) c
|
|||
ClusterTrxMethods::addAQLTransactionHeader(*trx, /*server*/ it.first, headers);
|
||||
|
||||
CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
||||
|
||||
_query->incHttpRequests(1);
|
||||
auto res = cc->syncRequest(coordTransactionID, serverDest, RequestType::POST,
|
||||
url, infoBuilder.toJson(), headers, SETUP_TIMEOUT);
|
||||
|
||||
|
@ -1140,7 +1142,7 @@ void EngineInfoContainerDBServer::cleanupEngines(std::shared_ptr<ClusterComm> cc
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Shutdown traverser engines
|
||||
url = "/_db/" + arangodb::basics::StringUtils::urlEncode(dbname) +
|
||||
"/_internal/traverser/";
|
||||
|
@ -1153,6 +1155,8 @@ void EngineInfoContainerDBServer::cleanupEngines(std::shared_ptr<ClusterComm> cc
|
|||
}
|
||||
}
|
||||
|
||||
_query->incHttpRequests(requests.size());
|
||||
|
||||
cc->fireAndForgetRequests(requests);
|
||||
queryIds.clear();
|
||||
}
|
||||
|
|
|
@ -277,6 +277,14 @@ void Query::setExecutionTime() {
|
|||
_engine->_stats.setExecutionTime(TRI_microtime() - _startTime);
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief increase number of HTTP requests. this is normally
|
||||
/// called during the setup of a query
|
||||
void Query::incHttpRequests(size_t requests) {
|
||||
if (_engine != nullptr) {
|
||||
_engine->_stats.requests += requests;
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief register an error, with an optional parameter inserted into printf
|
||||
/// this also makes the query abort
|
||||
|
|
|
@ -115,6 +115,10 @@ class Query {
|
|||
/// @brief set the query to killed
|
||||
void kill();
|
||||
|
||||
/// @brief increase number of HTTP requests. this is normally
|
||||
/// called during the setup of a query
|
||||
void incHttpRequests(size_t requests);
|
||||
|
||||
void setExecutionTime();
|
||||
|
||||
QueryString const& queryString() const { return _queryString; }
|
||||
|
|
|
@ -177,6 +177,7 @@ std::pair<ExecutionState, TraversalStats> TraversalExecutor::produceRow(OutputAq
|
|||
// we are done
|
||||
s.addFiltered(_traverser.getAndResetFilteredPaths());
|
||||
s.addScannedIndex(_traverser.getAndResetReadDocuments());
|
||||
s.addHttpRequests(_traverser.getAndResetHttpRequests());
|
||||
return {_rowState, s};
|
||||
}
|
||||
std::tie(_rowState, _input) = _fetcher.fetchRow();
|
||||
|
@ -184,6 +185,7 @@ std::pair<ExecutionState, TraversalStats> TraversalExecutor::produceRow(OutputAq
|
|||
TRI_ASSERT(!_input.isInitialized());
|
||||
s.addFiltered(_traverser.getAndResetFilteredPaths());
|
||||
s.addScannedIndex(_traverser.getAndResetReadDocuments());
|
||||
s.addHttpRequests(_traverser.getAndResetHttpRequests());
|
||||
return {_rowState, s};
|
||||
}
|
||||
|
||||
|
@ -192,6 +194,7 @@ std::pair<ExecutionState, TraversalStats> TraversalExecutor::produceRow(OutputAq
|
|||
TRI_ASSERT(_rowState == ExecutionState::DONE);
|
||||
s.addFiltered(_traverser.getAndResetFilteredPaths());
|
||||
s.addScannedIndex(_traverser.getAndResetReadDocuments());
|
||||
s.addHttpRequests(_traverser.getAndResetHttpRequests());
|
||||
return {_rowState, s};
|
||||
}
|
||||
if (!resetTraverser()) {
|
||||
|
@ -225,6 +228,7 @@ std::pair<ExecutionState, TraversalStats> TraversalExecutor::produceRow(OutputAq
|
|||
}
|
||||
s.addFiltered(_traverser.getAndResetFilteredPaths());
|
||||
s.addScannedIndex(_traverser.getAndResetReadDocuments());
|
||||
s.addHttpRequests(_traverser.getAndResetHttpRequests());
|
||||
return {computeState(), s};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ namespace aql {
|
|||
|
||||
class TraversalStats {
|
||||
public:
|
||||
TraversalStats() noexcept : _filtered(0), _scannedIndex(0) {}
|
||||
TraversalStats() noexcept : _filtered(0), _scannedIndex(0), _httpRequests(0) {}
|
||||
|
||||
void setFiltered(std::size_t filtered) noexcept { _filtered = filtered; }
|
||||
|
||||
|
@ -47,16 +47,24 @@ class TraversalStats {
|
|||
}
|
||||
|
||||
std::size_t getScannedIndex() const noexcept { return _scannedIndex; }
|
||||
|
||||
void addHttpRequests(std::size_t requests) noexcept {
|
||||
_httpRequests += requests;
|
||||
}
|
||||
|
||||
std::size_t getHttpRequests() const noexcept { return _httpRequests; }
|
||||
|
||||
private:
|
||||
std::size_t _filtered;
|
||||
std::size_t _scannedIndex;
|
||||
std::size_t _httpRequests;
|
||||
};
|
||||
|
||||
inline ExecutionStats& operator+=(ExecutionStats& executionStats,
|
||||
TraversalStats const& traversalStats) noexcept {
|
||||
executionStats.filtered += traversalStats.getFiltered();
|
||||
executionStats.scannedIndex += traversalStats.getScannedIndex();
|
||||
executionStats.requests += traversalStats.getHttpRequests();
|
||||
return executionStats;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,8 @@ ClusterEdgeCursor::ClusterEdgeCursor(arangodb::velocypack::StringRef vertexId, u
|
|||
: _position(0),
|
||||
_resolver(opts->trx()->resolver()),
|
||||
_opts(opts),
|
||||
_cache(static_cast<ClusterTraverserCache*>(opts->cache())) {
|
||||
_cache(static_cast<ClusterTraverserCache*>(opts->cache())),
|
||||
_httpRequests(0) {
|
||||
TRI_ASSERT(_cache != nullptr);
|
||||
auto trx = _opts->trx();
|
||||
transaction::BuilderLeaser leased(trx);
|
||||
|
@ -53,6 +54,7 @@ ClusterEdgeCursor::ClusterEdgeCursor(arangodb::velocypack::StringRef vertexId, u
|
|||
fetchEdgesFromEngines(trx->vocbase().name(), _cache->engines(), b->slice(), depth,
|
||||
_cache->cache(), _edgeList, _cache->datalake(), *(leased.get()),
|
||||
_cache->filteredDocuments(), _cache->insertedDocuments());
|
||||
_httpRequests += _cache->engines()->size();
|
||||
}
|
||||
|
||||
// ShortestPath variant
|
||||
|
@ -60,7 +62,8 @@ ClusterEdgeCursor::ClusterEdgeCursor(arangodb::velocypack::StringRef vertexId, b
|
|||
: _position(0),
|
||||
_resolver(opts->trx()->resolver()),
|
||||
_opts(opts),
|
||||
_cache(static_cast<ClusterTraverserCache*>(opts->cache())) {
|
||||
_cache(static_cast<ClusterTraverserCache*>(opts->cache())),
|
||||
_httpRequests(0) {
|
||||
TRI_ASSERT(_cache != nullptr);
|
||||
auto trx = _opts->trx();
|
||||
transaction::BuilderLeaser leased(trx);
|
||||
|
@ -70,6 +73,7 @@ ClusterEdgeCursor::ClusterEdgeCursor(arangodb::velocypack::StringRef vertexId, b
|
|||
fetchEdgesFromEngines(trx->vocbase().name(), _cache->engines(), b->slice(),
|
||||
backward, _cache->cache(), _edgeList, _cache->datalake(),
|
||||
*(leased.get()), _cache->insertedDocuments());
|
||||
_httpRequests += _cache->engines()->size();
|
||||
}
|
||||
|
||||
bool ClusterEdgeCursor::next(EdgeCursor::Callback const& callback) {
|
||||
|
|
|
@ -52,6 +52,9 @@ class ClusterEdgeCursor : public graph::EdgeCursor {
|
|||
|
||||
void readAll(EdgeCursor::Callback const& callback) override;
|
||||
|
||||
/// @brief number of HTTP requests performed.
|
||||
size_t httpRequests() const override { return _httpRequests; }
|
||||
|
||||
private:
|
||||
std::vector<arangodb::velocypack::Slice> _edgeList;
|
||||
|
||||
|
@ -59,6 +62,7 @@ class ClusterEdgeCursor : public graph::EdgeCursor {
|
|||
CollectionNameResolver const* _resolver;
|
||||
arangodb::graph::BaseOptions* _opts;
|
||||
arangodb::graph::ClusterTraverserCache* _cache;
|
||||
size_t _httpRequests;
|
||||
};
|
||||
} // namespace traverser
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -42,7 +42,9 @@ using ClusterTraverser = arangodb::traverser::ClusterTraverser;
|
|||
ClusterTraverser::ClusterTraverser(arangodb::traverser::TraverserOptions* opts,
|
||||
std::unordered_map<ServerID, traverser::TraverserEngineID> const* engines,
|
||||
std::string const& dbname, transaction::Methods* trx)
|
||||
: Traverser(opts, trx), _dbname(dbname), _engines(engines) {
|
||||
: Traverser(opts, trx),
|
||||
_dbname(dbname),
|
||||
_engines(engines) {
|
||||
_opts->linkTraverser(this);
|
||||
}
|
||||
|
||||
|
@ -112,6 +114,9 @@ void ClusterTraverser::fetchVertices() {
|
|||
fetchVerticesFromEngines(_dbname, _engines, _verticesToFetch, _vertices,
|
||||
*(lease.get()));
|
||||
_verticesToFetch.clear();
|
||||
if (_enumerator != nullptr) {
|
||||
_enumerator->incHttpRequests(_engines->size());
|
||||
}
|
||||
}
|
||||
|
||||
aql::AqlValue ClusterTraverser::fetchVertexData(arangodb::velocypack::StringRef idString) {
|
||||
|
@ -155,6 +160,9 @@ void ClusterTraverser::destroyEngines() {
|
|||
"/_db/" + arangodb::basics::StringUtils::urlEncode(_trx->vocbase().name()) +
|
||||
"/_internal/traverser/");
|
||||
|
||||
if (_enumerator != nullptr) {
|
||||
_enumerator->incHttpRequests(_engines->size());
|
||||
}
|
||||
for (auto const& it : *_engines) {
|
||||
arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
||||
std::unordered_map<std::string, std::string> headers;
|
||||
|
|
|
@ -51,7 +51,7 @@ class ClusterTraverser final : public Traverser {
|
|||
~ClusterTraverser() {}
|
||||
|
||||
void setStartVertex(std::string const& id) override;
|
||||
|
||||
|
||||
protected:
|
||||
/// @brief Function to load the other sides vertex of an edge
|
||||
/// Returns true if the vertex passes filtering conditions
|
||||
|
|
|
@ -118,6 +118,7 @@ bool BreadthFirstEnumerator::next() {
|
|||
std::unique_ptr<EdgeCursor> cursor(
|
||||
_opts->nextCursor(nextVertex, _currentDepth));
|
||||
if (cursor != nullptr) {
|
||||
incHttpRequests(cursor->httpRequests());
|
||||
bool shouldReturnPath = _currentDepth + 1 >= _opts->minDepth;
|
||||
bool didInsert = false;
|
||||
|
||||
|
@ -180,6 +181,7 @@ bool BreadthFirstEnumerator::next() {
|
|||
// entry. We compute the path to it.
|
||||
return true;
|
||||
}
|
||||
|
||||
arangodb::aql::AqlValue BreadthFirstEnumerator::lastVertexToAqlValue() {
|
||||
return vertexToAqlValue(_lastReturned);
|
||||
}
|
||||
|
@ -308,7 +310,7 @@ bool BreadthFirstEnumerator::shouldPrune() {
|
|||
aql::AqlValue vertex, edge;
|
||||
aql::AqlValueGuard vertexGuard{vertex, true}, edgeGuard{edge, true};
|
||||
{
|
||||
auto *evaluator = _opts->getPruneEvaluator();
|
||||
auto* evaluator = _opts->getPruneEvaluator();
|
||||
if (evaluator->needsVertex()) {
|
||||
// Note: vertexToAqlValue() copies the original vertex into the AqlValue.
|
||||
// This could be avoided with a function that just returns the slice,
|
||||
|
|
|
@ -51,6 +51,8 @@ class EdgeCursor {
|
|||
virtual bool next(std::function<void(EdgeDocumentToken&&, arangodb::velocypack::Slice, size_t)> const& callback) = 0;
|
||||
|
||||
virtual void readAll(std::function<void(EdgeDocumentToken&&, arangodb::velocypack::Slice, size_t)> const& callback) = 0;
|
||||
|
||||
virtual size_t httpRequests() const = 0;
|
||||
};
|
||||
|
||||
} // namespace graph
|
||||
|
|
|
@ -107,7 +107,10 @@ bool NeighborsEnumerator::next() {
|
|||
|
||||
std::unique_ptr<arangodb::graph::EdgeCursor> cursor(
|
||||
_opts->nextCursor(nextVertex, _searchDepth));
|
||||
cursor->readAll(callback);
|
||||
if (cursor != nullptr) {
|
||||
incHttpRequests(cursor->httpRequests());
|
||||
cursor->readAll(callback);
|
||||
}
|
||||
}
|
||||
if (_currentDepth.empty()) {
|
||||
// Nothing found. Cannot do anything more.
|
||||
|
|
|
@ -37,7 +37,10 @@ using TraverserOptions = arangodb::traverser::TraverserOptions;
|
|||
|
||||
PathEnumerator::PathEnumerator(Traverser* traverser, std::string const& startVertex,
|
||||
TraverserOptions* opts)
|
||||
: _traverser(traverser), _isFirst(true), _opts(opts) {
|
||||
: _traverser(traverser),
|
||||
_isFirst(true),
|
||||
_opts(opts),
|
||||
_httpRequests(0) {
|
||||
arangodb::velocypack::StringRef svId =
|
||||
_opts->cache()->persistString(arangodb::velocypack::StringRef(startVertex));
|
||||
// Guarantee that this vertex _id does not run away
|
||||
|
@ -74,6 +77,7 @@ bool DepthFirstEnumerator::next() {
|
|||
_enumeratedPath.vertices.back()),
|
||||
_enumeratedPath.edges.size());
|
||||
if (cursor != nullptr) {
|
||||
incHttpRequests(cursor->httpRequests());
|
||||
_edgeCursors.emplace(cursor);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -80,7 +80,13 @@ class PathEnumerator {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
EnumeratedPath _enumeratedPath;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Number of HTTP requests made
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t _httpRequests;
|
||||
|
||||
public:
|
||||
PathEnumerator(Traverser* traverser, std::string const& startVertex,
|
||||
TraverserOptions* opts);
|
||||
|
@ -98,6 +104,15 @@ class PathEnumerator {
|
|||
virtual aql::AqlValue lastVertexToAqlValue() = 0;
|
||||
virtual aql::AqlValue lastEdgeToAqlValue() = 0;
|
||||
virtual aql::AqlValue pathToAqlValue(arangodb::velocypack::Builder&) = 0;
|
||||
|
||||
/// @brief return number of HTTP requests made, and reset it to 0
|
||||
size_t getAndResetHttpRequests() {
|
||||
size_t value = _httpRequests;
|
||||
_httpRequests = 0;
|
||||
return value;
|
||||
}
|
||||
|
||||
void incHttpRequests(size_t requests) { _httpRequests += requests; }
|
||||
};
|
||||
|
||||
class DepthFirstEnumerator final : public PathEnumerator {
|
||||
|
|
|
@ -32,7 +32,8 @@ using namespace arangodb;
|
|||
using namespace arangodb::graph;
|
||||
|
||||
ShortestPathFinder::ShortestPathFinder(ShortestPathOptions& options)
|
||||
: _options(options) {}
|
||||
: _options(options),
|
||||
_httpRequests(0) {}
|
||||
|
||||
void ShortestPathFinder::destroyEngines() {
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
|
@ -48,6 +49,7 @@ void ShortestPathFinder::destroyEngines() {
|
|||
"/_internal/traverser/");
|
||||
|
||||
for (auto const& it : *ch->engines()) {
|
||||
incHttpRequests(1);
|
||||
arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer();
|
||||
std::unordered_map<std::string, std::string> headers;
|
||||
auto res = cc->syncRequest(coordTransactionID, "server:" + it.first,
|
||||
|
@ -71,4 +73,4 @@ void ShortestPathFinder::destroyEngines() {
|
|||
}
|
||||
}
|
||||
|
||||
ShortestPathOptions& ShortestPathFinder::options() const { return _options; }
|
||||
ShortestPathOptions& ShortestPathFinder::options() const { return _options; }
|
||||
|
|
|
@ -49,11 +49,25 @@ class ShortestPathFinder {
|
|||
|
||||
ShortestPathOptions& options() const;
|
||||
|
||||
/// @brief return number of HTTP requests made, and reset it to 0
|
||||
size_t getAndResetHttpRequests() {
|
||||
size_t value = _httpRequests;
|
||||
_httpRequests = 0;
|
||||
return value;
|
||||
}
|
||||
|
||||
void incHttpRequests(size_t requests) { _httpRequests += requests; }
|
||||
|
||||
protected:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief The options to modify this shortest path computation
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
ShortestPathOptions& _options;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Number of HTTP requests made
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
size_t _httpRequests;
|
||||
};
|
||||
|
||||
} // namespace graph
|
||||
|
|
|
@ -68,6 +68,9 @@ class SingleServerEdgeCursor final : public EdgeCursor {
|
|||
void readAll(EdgeCursor::Callback const& callback) override;
|
||||
|
||||
std::vector<std::vector<OperationCursor*>>& getCursors() { return _cursors; }
|
||||
|
||||
/// @brief number of HTTP requests performed. always 0 in single server
|
||||
size_t httpRequests() const override { return 0; }
|
||||
|
||||
private:
|
||||
// returns false if cursor can not be further advanced
|
||||
|
|
|
@ -52,7 +52,7 @@ class SingleServerTraverser final : public Traverser {
|
|||
void setStartVertex(std::string const& v) override;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief No eingines on single server
|
||||
/// @brief No engines on single server
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
void destroyEngines() override {}
|
||||
|
||||
|
|
|
@ -195,12 +195,19 @@ TraverserCache* arangodb::traverser::Traverser::traverserCache() {
|
|||
return _opts->cache();
|
||||
}
|
||||
|
||||
size_t arangodb::traverser::Traverser::getAndResetFilteredPaths() {
|
||||
return traverserCache()->getAndResetFilteredDocuments();
|
||||
}
|
||||
|
||||
size_t arangodb::traverser::Traverser::getAndResetReadDocuments() {
|
||||
return traverserCache()->getAndResetInsertedDocuments();
|
||||
}
|
||||
|
||||
size_t arangodb::traverser::Traverser::getAndResetFilteredPaths() {
|
||||
return traverserCache()->getAndResetFilteredDocuments();
|
||||
size_t arangodb::traverser::Traverser::getAndResetHttpRequests() {
|
||||
if (_enumerator != nullptr) {
|
||||
return _enumerator->getAndResetHttpRequests();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void arangodb::traverser::Traverser::allowOptimizedNeighbors() {
|
||||
|
|
|
@ -266,6 +266,12 @@ class Traverser {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t getAndResetReadDocuments();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get the number of HTTP requests made
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t getAndResetHttpRequests();
|
||||
|
||||
TraverserOptions* options() { return _opts; }
|
||||
|
||||
|
|
Loading…
Reference in New Issue