diff --git a/3rdParty/iresearch/core/analysis/ngram_token_stream.cpp b/3rdParty/iresearch/core/analysis/ngram_token_stream.cpp index 409ba2e116..06f644b25d 100644 --- a/3rdParty/iresearch/core/analysis/ngram_token_stream.cpp +++ b/3rdParty/iresearch/core/analysis/ngram_token_stream.cpp @@ -140,7 +140,7 @@ bool parse_json_config(const irs::string_ref& args, stream_bytes_type = itr->second; } - min = std::max(min, size_t(1)); + min = std::max(min, decltype(min)(1)); max = std::max(max, min); options.min_gram = min; diff --git a/CHANGELOG b/CHANGELOG index 3d61d7c1de..3f80128bc9 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,14 @@ devel ----- +* REMOTE and GATHER no longer make subqueries unsuitable for the + `splice-subqueries` optimization. + +* New internal counter and histogram support. + +* Add a Prometheus endpoint for metrics, expose new metrics, old statistics + and RocksDB metrics. + * Fixed known issue #509: ArangoSearch index consolidation does not work during creation of a link on existing collection which may lead to massive file descriptors consumption. diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index a6299767fd..599d275234 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -1,7 +1,7 @@ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// -/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany +/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); @@ -72,7 +72,22 @@ Agent::Agent(ApplicationServer& server, config_t const& config) _agentNeedsWakeup(false), _compactor(this), _ready(false), - _preparing(0) { + _preparing(0), + _write_ok( + _server.getFeature().counter( + "agency_agent_write_ok", 0, "Agency write ok")), + _write_no_leader( + _server.getFeature().counter( + "agency_agent_write_no_leader", 0, "Agency write no leader")), + _read_ok( + _server.getFeature().counter( + "agency_agent_read_ok", 0, "Agency write ok")), + _read_no_leader( + _server.getFeature().counter( + "agency_agent_read_no_leader", 0, "Agency write no leader")), + _write_hist_msec( + _server.getFeature().histogram( + "agency_agent_write_hist", 10, 0., 20., "Agency write histogram [ms]")) { _state.configure(this); _constituent.configure(this); if (size() > 1) { @@ -1124,6 +1139,8 @@ write_ret_t Agent::inquire(query_t const& query) { /// Write new entries to replicated state and store write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) { + + using namespace std::chrono; std::vector applied; std::vector indices; auto multihost = size() > 1; @@ -1134,6 +1151,7 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) { // look at the leaderID. auto leader = _constituent.leaderID(); if (multihost && leader != id()) { + ++_write_no_leader; return write_ret_t(false, leader); } @@ -1165,9 +1183,11 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) { // Check that we are actually still the leader: if (!leading()) { + ++_write_no_leader; return write_ret_t(false, NO_LEADER); } + auto const start = high_resolution_clock::now(); // Apply to spearhead and get indices for log entries // Avoid keeping lock indefinitely for (size_t i = 0, l = 0; i < npacks; ++i) { @@ -1182,11 +1202,13 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) { // Only leader else redirect if (multihost && challengeLeadership()) { resign(); + ++_write_no_leader; return write_ret_t(false, NO_LEADER); } // Check that we are actually still the leader: if (!leading()) { + ++_write_no_leader; return write_ret_t(false, NO_LEADER); } @@ -1197,6 +1219,8 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) { auto tmp = _state.logLeaderMulti(chunk, applied, currentTerm); indices.insert(indices.end(), tmp.begin(), tmp.end()); } + _write_hist_msec.count( + duration(high_resolution_clock::now()-start).count()); } // Maximum log index @@ -1212,6 +1236,7 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) { advanceCommitIndex(); } + ++_write_ok; return write_ret_t(true, id(), applied, indices); } @@ -1223,6 +1248,7 @@ read_ret_t Agent::read(query_t const& query) { // look at the leaderID. auto leader = _constituent.leaderID(); if (leader != id()) { + ++_read_no_leader; return read_ret_t(false, leader); } @@ -1236,10 +1262,12 @@ read_ret_t Agent::read(query_t const& query) { // Only leader else redirect if (challengeLeadership()) { resign(); + ++_read_no_leader; return read_ret_t(false, NO_LEADER); } leader = _constituent.leaderID(); + auto result = std::make_shared(); READ_LOCKER(oLocker, _outputLock); @@ -1247,6 +1275,7 @@ read_ret_t Agent::read(query_t const& query) { // Retrieve data from readDB std::vector success = _readDB.read(query, result); + ++_read_no_leader; return read_ret_t(true, leader, std::move(success), std::move(result)); } @@ -1254,6 +1283,7 @@ read_ret_t Agent::read(query_t const& query) { void Agent::run() { // Only run in case we are in multi-host mode while (!this->isStopping() && size() > 1) { + { // We set the variable to false here, if any change happens during // or after the calls in this loop, this will be set to true to diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 7930f4b83c..f1a3e1c9ee 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -1,7 +1,7 @@ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// -/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany +/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,6 +36,7 @@ #include "Agency/Supervision.h" #include "Basics/ConditionLocker.h" #include "Basics/ReadWriteLock.h" +#include "RestServer/MetricsFeature.h" struct TRI_vocbase_t; @@ -475,8 +476,16 @@ class Agent final : public arangodb::Thread, public AgentInterface { // lock for _ongoingTrxs arangodb::Mutex _trxsLock; + + Counter& _write_ok; + Counter& _write_no_leader; + Counter& _read_ok; + Counter& _read_no_leader; + Histogram& _write_hist_msec; + }; } // namespace consensus } // namespace arangodb #endif + diff --git a/arangod/Aql/AqlItemBlock.cpp b/arangod/Aql/AqlItemBlock.cpp index fa4329230f..6d2c11aaaf 100644 --- a/arangod/Aql/AqlItemBlock.cpp +++ b/arangod/Aql/AqlItemBlock.cpp @@ -453,7 +453,7 @@ SharedAqlItemBlockPtr AqlItemBlock::slice(size_t from, size_t to) const { AqlValue const& a(_data[getAddress(row, col)]); ::CopyValueOver(cache, a, row - from, col, res); } - copySubQueryDepthToOtherBlock(res, row, row - from); + res->copySubQueryDepthFromOtherBlock(row - from, *this, row); } return res; @@ -476,7 +476,7 @@ SharedAqlItemBlockPtr AqlItemBlock::slice(size_t row, AqlValue const& a(_data[getAddress(row, col)]); ::CopyValueOver(cache, a, 0, col, res); } - copySubQueryDepthToOtherBlock(res, row, 0); + res->copySubQueryDepthFromOtherBlock(0, *this, row); return res; } @@ -492,12 +492,14 @@ SharedAqlItemBlockPtr AqlItemBlock::slice(std::vector const& chosen, SharedAqlItemBlockPtr res{_manager.requestBlock(to - from, _nrRegs)}; - for (size_t row = from; row < to; row++) { + size_t resultRowIdx = 0; + for (size_t chosenIdx = from; chosenIdx < to; ++chosenIdx, ++resultRowIdx) { + size_t const rowIdx = chosen[chosenIdx]; for (RegisterId col = 0; col < _nrRegs; col++) { - AqlValue const& a(_data[getAddress(chosen[row], col)]); - ::CopyValueOver(cache, a, row - from, col, res); + AqlValue const& a = _data[getAddress(rowIdx, col)]; + ::CopyValueOver(cache, a, resultRowIdx, col, res); } - copySubQueryDepthToOtherBlock(res, row, row - from); + res->copySubQueryDepthFromOtherBlock(resultRowIdx, *this, rowIdx); } return res; @@ -535,6 +537,13 @@ SharedAqlItemBlockPtr AqlItemBlock::steal(std::vector const& chosen, return res; } +/// @brief toJson, transfer all rows of this AqlItemBlock to Json, the result +/// can be used to recreate the AqlItemBlock via the Json constructor +void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, + VPackBuilder& result) const { + return toVelocyPack(0, size(), trxOptions, result); +} + /// @brief toJson, transfer a whole AqlItemBlock to Json, the result can /// be used to recreate the AqlItemBlock via the Json constructor /// Here is a description of the data format: The resulting Json has @@ -568,7 +577,15 @@ SharedAqlItemBlockPtr AqlItemBlock::steal(std::vector const& chosen, /// corresponding position /// "raw": List of actual values, positions 0 and 1 are always null /// such that actual indices start at 2 -void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPackBuilder& result) const { +void AqlItemBlock::toVelocyPack(size_t from, size_t to, + velocypack::Options const* const trxOptions, + VPackBuilder& result) const { + // Can only have positive slice size + TRI_ASSERT(from < to); + // We cannot slice over the upper bound. + // The lower bound (0) is protected by unsigned number type + TRI_ASSERT(to <= _nrItems); + TRI_ASSERT(result.isOpenObject()); VPackOptions options(VPackOptions::Defaults); options.buildUnindexedArrays = true; @@ -580,7 +597,7 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa raw.add(VPackValue(VPackValueType::Null)); raw.add(VPackValue(VPackValueType::Null)); - result.add("nrItems", VPackValue(_nrItems)); + result.add("nrItems", VPackValue(to - from)); result.add("nrRegs", VPackValue(_nrRegs)); result.add(StaticStrings::Error, VPackValue(false)); @@ -639,7 +656,7 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa startRegister = 1; } for (RegisterId column = startRegister; column < internalNrRegs(); column++) { - for (size_t i = 0; i < _nrItems; i++) { + for (size_t i = from; i < to; i++) { AqlValue const& a(_data[i * internalNrRegs() + column]); // determine current state @@ -701,7 +718,8 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa result.add("raw", raw.slice()); } -void AqlItemBlock::rowToSimpleVPack(size_t const row, velocypack::Options const* options, arangodb::velocypack::Builder& builder) const { +void AqlItemBlock::rowToSimpleVPack(size_t const row, velocypack::Options const* options, + arangodb::velocypack::Builder& builder) const { VPackArrayBuilder rowBuilder{&builder}; if (isShadowRow(row)) { @@ -732,13 +750,14 @@ ResourceMonitor& AqlItemBlock::resourceMonitor() noexcept { return *_manager.resourceMonitor(); } -void AqlItemBlock::copySubQueryDepthToOtherBlock(SharedAqlItemBlockPtr& target, - size_t sourceRow, size_t targetRow) const { - if (isShadowRow(sourceRow)) { - AqlValue const& d = getShadowRowDepth(sourceRow); +void AqlItemBlock::copySubQueryDepthFromOtherBlock(size_t const targetRow, + AqlItemBlock const& source, + size_t const sourceRow) { + if (source.isShadowRow(sourceRow)) { + AqlValue const& d = source.getShadowRowDepth(sourceRow); // Value set, copy it over TRI_ASSERT(!d.requiresDestruction()); - target->setShadowRowDepth(targetRow, d); + setShadowRowDepth(targetRow, d); } } @@ -965,3 +984,27 @@ void AqlItemBlock::copySubqueryDepth(size_t currentRow, size_t fromRow) { _data[currentAddress] = _data[fromAddress]; } } + +size_t AqlItemBlock::moveOtherBlockHere(size_t const targetRow, AqlItemBlock& source) { + TRI_ASSERT(targetRow + source.size() <= this->size()); + TRI_ASSERT(getNrRegs() == source.getNrRegs()); + auto const n = source.size(); + auto const nrRegs = getNrRegs(); + + size_t thisRow = targetRow; + for (size_t sourceRow = 0; sourceRow < n; ++sourceRow, ++thisRow) { + for (RegisterId col = 0; col < nrRegs; ++col) { + // copy over value + AqlValue const& a = source.getValueReference(sourceRow, col); + if (!a.isEmpty()) { + setValue(thisRow, col, a); + } + } + copySubQueryDepthFromOtherBlock(thisRow, source, sourceRow); + } + source.eraseAll(); + + TRI_ASSERT(thisRow == targetRow + n); + + return targetRow + n; +} diff --git a/arangod/Aql/AqlItemBlock.h b/arangod/Aql/AqlItemBlock.h index 611b79bc65..c93d078969 100644 --- a/arangod/Aql/AqlItemBlock.h +++ b/arangod/Aql/AqlItemBlock.h @@ -206,10 +206,18 @@ class AqlItemBlock { /// to which our AqlValues point will vanish. SharedAqlItemBlockPtr steal(std::vector const& chosen, size_t from, size_t to); - /// @brief toJson, transfer a whole AqlItemBlock to Json, the result can - /// be used to recreate the AqlItemBlock via the Json constructor + /// @brief toJson, transfer all rows of this AqlItemBlock to Json, the result + /// can be used to recreate the AqlItemBlock via the Json constructor void toVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const; + /// @brief toJson, transfer a slice of this AqlItemBlock to Json, the result can + /// be used to recreate the AqlItemBlock via the Json constructor + /// The slice will be starting at line `from` (including) and end at line `to` (excluding). + /// Only calls with 0 <= from < to <= this.size() are allowed. + /// If you want to transfer the full block, use from == 0, to == this.size() + void toVelocyPack(size_t from, size_t to, velocypack::Options const*, + arangodb::velocypack::Builder&) const; + /// @brief Creates a human-readable velocypack of the block. Adds an object /// `{nrItems, nrRegs, matrix}` to the builder. /// @@ -250,6 +258,16 @@ class AqlItemBlock { /// @brief Quick test if we have any ShadowRows within this block; bool hasShadowRows() const noexcept; + /// @brief Moves all values *from* source *to* this block. + /// Returns the row index of the last written row plus one (may equal size()). + /// Expects size() - targetRow >= source->size(); and, of course, an equal + /// number of registers. + /// The source block will be cleared after this. + size_t moveOtherBlockHere(size_t targetRow, AqlItemBlock& source); + + void copySubQueryDepthFromOtherBlock(size_t targetRow, AqlItemBlock const& source, + size_t sourceRow); + protected: AqlItemBlockManager& aqlItemBlockManager() noexcept; size_t getRefCount() const noexcept; @@ -267,9 +285,6 @@ class AqlItemBlock { void copySubqueryDepth(size_t currentRow, size_t fromRow); - void copySubQueryDepthToOtherBlock(SharedAqlItemBlockPtr& target, - size_t sourceRow, size_t targetRow) const; - private: /// @brief _data, the actual data as a single vector of dimensions _nrItems /// times _nrRegs diff --git a/arangod/Aql/AqlItemBlockUtils.cpp b/arangod/Aql/AqlItemBlockUtils.cpp index 551d52646e..14c4d7dc25 100644 --- a/arangod/Aql/AqlItemBlockUtils.cpp +++ b/arangod/Aql/AqlItemBlockUtils.cpp @@ -51,23 +51,12 @@ SharedAqlItemBlockPtr itemBlock::concatenate(AqlItemBlockManager& manager, TRI_ASSERT(totalSize > 0); TRI_ASSERT(nrRegs > 0); - auto res = manager.requestBlock(totalSize, nrRegs); + auto resultBlock = manager.requestBlock(totalSize, nrRegs); - size_t pos = 0; - for (auto& it : blocks) { - size_t const n = it->size(); - for (size_t row = 0; row < n; ++row) { - for (RegisterId col = 0; col < nrRegs; ++col) { - // copy over value - AqlValue const& a = it->getValueReference(row, col); - if (!a.isEmpty()) { - res->setValue(pos + row, col, a); - } - } - } - it->eraseAll(); - pos += n; + size_t nextFreeRow = 0; + for (auto& inputBlock : blocks) { + nextFreeRow = resultBlock->moveOtherBlockHere(nextFreeRow, *inputBlock); } - return res; + return resultBlock; } diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 3e01226073..3fcf30649f 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -166,6 +166,7 @@ std::pair ExecutionBlockImpl::g } if (newBlock == nullptr) { TRI_ASSERT(state == ExecutionState::DONE); + _state = InternalState::DONE; // _rowFetcher must be DONE now already return {state, nullptr}; } diff --git a/arangod/Aql/InputAqlItemRow.cpp b/arangod/Aql/InputAqlItemRow.cpp index 3ea9bb944d..47a16e2a3e 100644 --- a/arangod/Aql/InputAqlItemRow.cpp +++ b/arangod/Aql/InputAqlItemRow.cpp @@ -119,135 +119,17 @@ SharedAqlItemBlockPtr InputAqlItemRow::cloneToBlock(AqlItemBlockManager& manager /// corresponding position /// "raw": List of actual values, positions 0 and 1 are always null /// such that actual indices start at 2 -void InputAqlItemRow::toVelocyPack(velocypack::Options const* const trxOptions, VPackBuilder& result) const { +void InputAqlItemRow::toVelocyPack(velocypack::Options const* const trxOptions, + VPackBuilder& result) const { TRI_ASSERT(isInitialized()); TRI_ASSERT(result.isOpenObject()); - VPackOptions options(VPackOptions::Defaults); - options.buildUnindexedArrays = true; - options.buildUnindexedObjects = true; + _block->toVelocyPack(_baseIndex, _baseIndex + 1, trxOptions, result); +} - VPackBuilder raw(&options); - raw.openArray(); - // Two nulls in the beginning such that indices start with 2 - raw.add(VPackValue(VPackValueType::Null)); - raw.add(VPackValue(VPackValueType::Null)); - - result.add("nrItems", VPackValue(1)); - result.add("nrRegs", VPackValue(getNrRegisters())); - result.add(StaticStrings::Error, VPackValue(false)); - - enum State { - Empty, // saw an empty value - Range, // saw a range value - Next, // saw a previously unknown value - Positional, // saw a value previously encountered - }; - - std::unordered_map table; // remember duplicates - size_t lastTablePos = 0; - State lastState = Positional; - - State currentState = Positional; - size_t runLength = 0; - size_t tablePos = 0; - - result.add("data", VPackValue(VPackValueType::Array)); - - // write out data buffered for repeated "empty" or "next" values - auto writeBuffered = [](State lastState, size_t lastTablePos, - VPackBuilder& result, size_t runLength) { - if (lastState == Range) { - return; - } - - if (lastState == Positional) { - if (lastTablePos >= 2) { - if (runLength == 1) { - result.add(VPackValue(lastTablePos)); - } else { - result.add(VPackValue(-4)); - result.add(VPackValue(runLength)); - result.add(VPackValue(lastTablePos)); - } - } - } else { - TRI_ASSERT(lastState == Empty || lastState == Next); - if (runLength == 1) { - // saw exactly one value - result.add(VPackValue(lastState == Empty ? 0 : 1)); - } else { - // saw multiple values - result.add(VPackValue(lastState == Empty ? -1 : -3)); - result.add(VPackValue(runLength)); - } - } - }; - - size_t pos = 2; // write position in raw - // We use column == 0 to simulate a shadowRow - // this is only relevant if all participants can use shadow rows - RegisterId startRegister = 0; - if (block().getFormatType() == SerializationFormat::CLASSIC) { - // Skip over the shadowRows - startRegister = 1; - } - for (RegisterId column = startRegister; column < getNrRegisters() + 1; column++) { - AqlValue const& a = column == 0 ? AqlValue{} : getValue(column - 1); - // determine current state - if (a.isEmpty()) { - currentState = Empty; - } else if (a.isRange()) { - currentState = Range; - } else { - auto it = table.find(a); - - if (it == table.end()) { - currentState = Next; - a.toVelocyPack(trxOptions, raw, false); - table.try_emplace(a, pos++); - } else { - currentState = Positional; - tablePos = it->second; - TRI_ASSERT(tablePos >= 2); - if (lastState != Positional) { - lastTablePos = tablePos; - } - } - } - - // handle state change - if (currentState != lastState || (currentState == Positional && tablePos != lastTablePos)) { - // write out remaining buffered data in case of a state change - writeBuffered(lastState, lastTablePos, result, runLength); - - lastTablePos = 0; - lastState = currentState; - runLength = 0; - } - - switch (currentState) { - case Empty: - case Next: - case Positional: - ++runLength; - lastTablePos = tablePos; - break; - - case Range: - result.add(VPackValue(-2)); - result.add(VPackValue(a.range()->_low)); - result.add(VPackValue(a.range()->_high)); - break; - } - } - - // write out any remaining buffered data - writeBuffered(lastState, lastTablePos, result, runLength); - - result.close(); // closes "data" - - raw.close(); - result.add("raw", raw.slice()); +void InputAqlItemRow::toSimpleVelocyPack(velocypack::Options const* trxOpts, + arangodb::velocypack::Builder& result) const { + TRI_ASSERT(_block != nullptr); + _block->rowToSimpleVPack(_baseIndex, trxOpts, result); } InputAqlItemRow::InputAqlItemRow(SharedAqlItemBlockPtr const& block, size_t baseIndex) @@ -280,7 +162,9 @@ AqlValue InputAqlItemRow::stealValue(RegisterId registerId) { return a; } -RegisterCount InputAqlItemRow::getNrRegisters() const noexcept { return block().getNrRegs(); } +RegisterCount InputAqlItemRow::getNrRegisters() const noexcept { + return block().getNrRegs(); +} bool InputAqlItemRow::operator==(InputAqlItemRow const& other) const noexcept { return this->_block == other._block && this->_baseIndex == other._baseIndex; diff --git a/arangod/Aql/InputAqlItemRow.h b/arangod/Aql/InputAqlItemRow.h index 7a808d82b8..0212b9f877 100644 --- a/arangod/Aql/InputAqlItemRow.h +++ b/arangod/Aql/InputAqlItemRow.h @@ -36,7 +36,7 @@ namespace arangodb { namespace velocypack { class Builder; struct Options; -} +} // namespace velocypack namespace aql { class AqlItemBlock; @@ -133,6 +133,8 @@ class InputAqlItemRow { /// Uses the same API as an AqlItemBlock with only a single row void toVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const; + void toSimpleVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const; + private: AqlItemBlock& block() noexcept; diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index 9b1b600487..637c519f73 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -7282,11 +7282,6 @@ void arangodb::aql::parallelizeGatherRule(Optimizer* opt, std::unique_ptrisModificationNode()) { - return true; - } switch (node->getType()) { case ExecutionNode::CALCULATION: case ExecutionNode::SUBQUERY: @@ -7306,6 +7301,11 @@ bool nodeMakesThisQueryLevelUnsuitableForSubquerySplicing(ExecutionNode const* c case ExecutionNode::GATHER: case ExecutionNode::REMOTE: case ExecutionNode::REMOTESINGLE: + case ExecutionNode::INSERT: + case ExecutionNode::REMOVE: + case ExecutionNode::REPLACE: + case ExecutionNode::UPDATE: + case ExecutionNode::UPSERT: case ExecutionNode::MATERIALIZE: case ExecutionNode::DISTRIBUTE_CONSUMER: case ExecutionNode::SUBQUERY_START: @@ -7323,16 +7323,6 @@ bool nodeMakesThisQueryLevelUnsuitableForSubquerySplicing(ExecutionNode const* c // Collect nodes skip iff using the COUNT method. return collectNode->aggregationMethod() == CollectOptions::CollectMethod::COUNT; } - // TODO Enable modification nodes again, as soon as the corresponding branch - // is merged. - case ExecutionNode::INSERT: - case ExecutionNode::REMOVE: - case ExecutionNode::REPLACE: - case ExecutionNode::UPDATE: - case ExecutionNode::UPSERT: - // These should already have been handled - TRI_ASSERT(false); - return true; case ExecutionNode::MAX_NODE_TYPE_VALUE: break; } diff --git a/arangod/Aql/RemoteExecutor.cpp b/arangod/Aql/RemoteExecutor.cpp index 15d5f91bcf..95dd1cc892 100644 --- a/arangod/Aql/RemoteExecutor.cpp +++ b/arangod/Aql/RemoteExecutor.cpp @@ -83,7 +83,6 @@ std::pair ExecutionBlockImpl ExecutionBlockImpl::getSomeWithoutTrace(size_t atMost) { - // silence tests -- we need to introduce new failure tests for fetchers TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome1") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); @@ -98,9 +97,9 @@ std::pair ExecutionBlockImpl guard(_communicationMutex); - + if (_requestInFlight) { // Already sent a shutdown request, but haven't got an answer yet. return {ExecutionState::WAITING, nullptr}; @@ -167,14 +166,13 @@ std::pair ExecutionBlockImpl::skipSome(s } std::pair ExecutionBlockImpl::skipSomeWithoutTrace(size_t atMost) { - std::unique_lock guard(_communicationMutex); - + if (_requestInFlight) { // Already sent a shutdown request, but haven't got an answer yet. return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } - + if (_lastError.fail()) { TRI_ASSERT(_lastResponse == nullptr); Result res = _lastError; @@ -182,7 +180,7 @@ std::pair ExecutionBlockImpl::skipSomeWi // we were called with an error need to throw it. THROW_ARANGO_EXCEPTION(res); } - + if (getQuery().killed()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); } @@ -255,13 +253,13 @@ std::pair ExecutionBlockImpl::initialize // will initialize the cursor lazily return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } - + if (getQuery().killed()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); } - + std::unique_lock guard(_communicationMutex); - + if (_requestInFlight) { return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } @@ -271,7 +269,7 @@ std::pair ExecutionBlockImpl::initialize auto response = std::move(_lastResponse); // Result is the response which is an object containing the ErrorCode - int errorNumber = TRI_ERROR_INTERNAL; // default error code + int errorNumber = TRI_ERROR_INTERNAL; // default error code VPackSlice slice = response->slice(); VPackSlice errorSlice = slice.get(StaticStrings::ErrorNum); if (!errorSlice.isNumber()) { @@ -297,6 +295,8 @@ std::pair ExecutionBlockImpl::initialize VPackBuilder builder(buffer, &options); builder.openObject(/*unindexed*/ true); + // Required for 3.5.* and earlier, dropped in 3.6.0 + builder.add("exhausted", VPackValue(false)); // Used in 3.4.0 onwards builder.add("done", VPackValue(false)); builder.add(StaticStrings::Code, VPackValue(TRI_ERROR_NO_ERROR)); @@ -306,7 +306,8 @@ std::pair ExecutionBlockImpl::initialize builder.add("pos", VPackValue(0)); builder.add(VPackValue("items")); builder.openObject(/*unindexed*/ true); - input.toVelocyPack(_engine->getQuery()->trx()->transactionContextPtr()->getVPackOptions(), builder); + input.toVelocyPack(_engine->getQuery()->trx()->transactionContextPtr()->getVPackOptions(), + builder); builder.close(); builder.close(); @@ -324,20 +325,19 @@ std::pair ExecutionBlockImpl::initialize /// @brief shutdown, will be called exactly once for the whole query std::pair ExecutionBlockImpl::shutdown(int errorCode) { - // this should make the whole thing idempotent if (!_isResponsibleForInitializeCursor) { // do nothing... return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } - + std::unique_lock guard(_communicationMutex); if (!_hasTriggeredShutdown) { // skip request in progress std::ignore = generateRequestTicket(); _hasTriggeredShutdown = true; - + // For every call we simply forward via HTTP VPackBuffer buffer; VPackBuilder builder(buffer); @@ -352,18 +352,18 @@ std::pair ExecutionBlockImpl::shutdown(i if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } - + return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } - + if (_requestInFlight) { // Already sent a shutdown request, but haven't got an answer yet. return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } - + if (_lastError.fail()) { -// _didReceiveShutdownRequest = true; - + // _didReceiveShutdownRequest = true; + TRI_ASSERT(_lastResponse == nullptr); Result res = std::move(_lastError); _lastError.reset(); @@ -385,7 +385,7 @@ std::pair ExecutionBlockImpl::shutdown(i if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); - + auto response = std::move(_lastResponse); // both must be reset before return or throw @@ -420,7 +420,7 @@ std::pair ExecutionBlockImpl::shutdown(i return {ExecutionState::DONE, TRI_ERROR_INTERNAL}; } - + TRI_ASSERT(false); return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } @@ -442,7 +442,7 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err, .append(spec.serverId) .append("': "); } - + int res = TRI_ERROR_INTERNAL; if (err != fuerte::Error::NoError) { res = network::fuerteToArangoErrorCode(err); @@ -455,7 +455,8 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err, res = VelocyPackHelper::getNumericValue(slice, StaticStrings::ErrorNum, res); VPackStringRef ref = VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage, - VPackStringRef("(no valid error in response)")); + VPackStringRef( + "(no valid error in response)")); msg.append(ref.data(), ref.size()); } } @@ -468,7 +469,6 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err, Result ExecutionBlockImpl::sendAsyncRequest(fuerte::RestVerb type, std::string const& urlPart, VPackBuffer&& body) { - NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature(); network::ConnectionPool* pool = nf.pool(); @@ -485,42 +485,39 @@ Result ExecutionBlockImpl::sendAsyncRequest(fuerte::RestVerb typ TRI_ASSERT(!spec.endpoint.empty()); auto req = fuerte::createRequest(type, fuerte::ContentType::VPack); - req->header.database =_query.vocbase().name(); + req->header.database = _query.vocbase().name(); req->header.path = urlPart + _queryId; req->addVPack(std::move(body)); - + // Later, we probably want to set these sensibly: req->timeout(kDefaultTimeOutSecs); if (!_ownName.empty()) { req->header.addMeta("Shard-Id", _ownName); } - + network::ConnectionPtr conn = pool->leaseConnection(spec.endpoint); - + _requestInFlight = true; auto ticket = generateRequestTicket(); - conn->sendRequest(std::move(req), - [this, ticket, spec, - sqs = _query.sharedState()](fuerte::Error err, - std::unique_ptr req, - std::unique_ptr res) { - - // `this` is only valid as long as sharedState is valid. - // So we must execute this under sharedState's mutex. - sqs->executeAndWakeup([&] { - std::lock_guard guard(_communicationMutex); - if (_lastTicket == ticket) { - if (err != fuerte::Error::NoError || res->statusCode() >= 400) { - _lastError = handleErrorResponse(spec, err, res.get()); - } else { - _lastResponse = std::move(res); - } - _requestInFlight = false; - return true; - } - return false; - }); - }); + conn->sendRequest(std::move(req), [this, ticket, spec, sqs = _query.sharedState()]( + fuerte::Error err, std::unique_ptr req, + std::unique_ptr res) { + // `this` is only valid as long as sharedState is valid. + // So we must execute this under sharedState's mutex. + sqs->executeAndWakeup([&] { + std::lock_guard guard(_communicationMutex); + if (_lastTicket == ticket) { + if (err != fuerte::Error::NoError || res->statusCode() >= 400) { + _lastError = handleErrorResponse(spec, err, res.get()); + } else { + _lastResponse = std::move(res); + } + _requestInFlight = false; + return true; + } + return false; + }); + }); ++_engine->_stats.requests; @@ -559,6 +556,10 @@ void ExecutionBlockImpl::traceRequest(char const* const rpc, LOG_TOPIC("92c71", INFO, Logger::QUERIES) << "[query#" << queryId << "] remote request sent: " << rpc << (args.empty() ? "" : " ") << args << " registryId=" << remoteQueryId; + if (_profile >= PROFILE_LEVEL_TRACE_2) { + LOG_TOPIC("e0ae6", INFO, Logger::QUERIES) + << "[query#" << queryId << "] data: " << slice.toJson(); + } } } diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 0a4b380239..3769afc2c3 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -460,7 +460,13 @@ set(LIB_ARANGO_REPLICATION_SOURCES Replication/TailingSyncer.cpp Replication/common-defines.cpp Replication/utilities.cpp -) + ) + +set (LIB_ARANGO_METRICS_SOURCES + RestServer/Metrics.cpp + RestServer/MetricsFeature.cpp + RestHandler/RestMetricsHandler.cpp + ) set(LIB_ARANGO_AGENCY_SOURCES Agency/AgentConfiguration.cpp @@ -763,6 +769,10 @@ add_library(arango_replication STATIC ${LIB_ARANGO_REPLICATION_SOURCES} ) +add_library(arango_metrics STATIC + ${LIB_ARANGO_METRICS_SOURCES} +) + add_library(arango_agency STATIC ${LIB_ARANGO_AGENCY_SOURCES} ) @@ -820,6 +830,7 @@ target_include_directories(llhttp PUBLIC "${PROJECT_SOURCE_DIR}/3rdParty/llhttp/ target_link_libraries(arango_agency arango) target_link_libraries(arango_agency arango_iresearch) +target_link_libraries(arango_agency arango_metrics) target_link_libraries(arango_aql arango_geo) target_link_libraries(arango_aql arango_graph) @@ -849,6 +860,8 @@ target_link_libraries(arango_indexes boost_boost) target_link_libraries(arango_iresearch arango_indexes) target_link_libraries(arango_iresearch arango_cluster_engine) +target_link_libraries(arango_metrics arango) + target_link_libraries(arango_mmfiles arango_geo) target_link_libraries(arango_mmfiles arango_indexes) target_link_libraries(arango_mmfiles arango_storage_engine_common) @@ -910,6 +923,7 @@ target_link_libraries(arangoserver arango_geo) target_link_libraries(arangoserver arango_graph) target_link_libraries(arangoserver arango_indexes) target_link_libraries(arangoserver arango_iresearch) +target_link_libraries(arangoserver arango_metrics) target_link_libraries(arangoserver arango_network) target_link_libraries(arangoserver arango_pregel) target_link_libraries(arangoserver arango_replication) @@ -990,6 +1004,7 @@ foreach(TARGET arango_common_rest_handler arango_graph arango_indexes + arango_metrics arango_mmfiles arango_pregel arango_replication diff --git a/arangod/ClusterEngine/ClusterTransactionState.cpp b/arangod/ClusterEngine/ClusterTransactionState.cpp index 0a896b06c6..02f7f6f970 100644 --- a/arangod/ClusterEngine/ClusterTransactionState.cpp +++ b/arangod/ClusterEngine/ClusterTransactionState.cpp @@ -29,7 +29,7 @@ #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" -#include "Statistics/ServerStatistics.h" +#include "RestServer/MetricsFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/TransactionCollection.h" #include "Transaction/Manager.h" @@ -61,7 +61,7 @@ Result ClusterTransactionState::beginTransaction(transaction::Hints hints) { auto cleanup = scopeGuard([&] { if (nestingLevel() == 0) { updateStatus(transaction::Status::ABORTED); - ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++; } // free what we have got so far unuseCollections(nestingLevel()); @@ -75,7 +75,7 @@ Result ClusterTransactionState::beginTransaction(transaction::Hints hints) { // all valid if (nestingLevel() == 0) { updateStatus(transaction::Status::RUNNING); - ServerStatistics::statistics()._transactionsStatistics._transactionsStarted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsStarted++; transaction::ManagerFeature::manager()->registerTransaction(id(), nullptr, isReadOnlyTransaction()); setRegistered(); @@ -122,7 +122,7 @@ Result ClusterTransactionState::commitTransaction(transaction::Methods* activeTr arangodb::Result res; if (nestingLevel() == 0) { updateStatus(transaction::Status::COMMITTED); - ServerStatistics::statistics()._transactionsStatistics._transactionsCommitted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsCommitted++; } unuseCollections(nestingLevel()); @@ -136,7 +136,7 @@ Result ClusterTransactionState::abortTransaction(transaction::Methods* activeTrx Result res; if (nestingLevel() == 0) { updateStatus(transaction::Status::ABORTED); - ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++; } unuseCollections(nestingLevel()); diff --git a/arangod/GeneralServer/GeneralServerFeature.cpp b/arangod/GeneralServer/GeneralServerFeature.cpp index b65feeeb7d..77e6c083f3 100644 --- a/arangod/GeneralServer/GeneralServerFeature.cpp +++ b/arangod/GeneralServer/GeneralServerFeature.cpp @@ -74,6 +74,7 @@ #include "RestHandler/RestImportHandler.h" #include "RestHandler/RestIndexHandler.h" #include "RestHandler/RestJobHandler.h" +#include "RestHandler/RestMetricsHandler.h" #include "RestHandler/RestPleaseUpgradeHandler.h" #include "RestHandler/RestPregelHandler.h" #include "RestHandler/RestQueryCacheHandler.h" @@ -437,6 +438,7 @@ void GeneralServerFeature::defineHandlers() { agency.agent()); } + if (cluster.isEnabled()) { // add "/agency-callbacks" handler _handlerFactory->addPrefixHandler( @@ -532,6 +534,9 @@ void GeneralServerFeature::defineHandlers() { _handlerFactory->addHandler("/_admin/statistics", RestHandlerCreator::createNoData); + _handlerFactory->addHandler("/_admin/metrics", + RestHandlerCreator::createNoData); + _handlerFactory->addHandler("/_admin/statistics-description", RestHandlerCreator::createNoData); diff --git a/arangod/GeneralServer/HttpCommTask.cpp b/arangod/GeneralServer/HttpCommTask.cpp index 80fc3242df..714966f791 100644 --- a/arangod/GeneralServer/HttpCommTask.cpp +++ b/arangod/GeneralServer/HttpCommTask.cpp @@ -179,7 +179,7 @@ int HttpCommTask::on_header_complete(llhttp_t* p) { if (ec) { static_cast*>(self.get())->close(); } - }); + }); return HPE_OK; } if (self->_request->requestType() == RequestType::HEAD) { @@ -669,7 +669,7 @@ void HttpCommTask::sendResponse(std::unique_ptr baseRes, } // TODO lease buffers - auto header = std::make_unique>(); + auto header = std::make_shared>(); header->reserve(220); header->append(TRI_CHAR_LENGTH_PAIR("HTTP/1.1 ")); @@ -729,7 +729,7 @@ void HttpCommTask::sendResponse(std::unique_ptr baseRes, } // turn on the keepAlive timer - double secs = GeneralServerFeature::keepAliveTimeout(); + double secs = GeneralServerFeature::keepAliveTimeout(); if (_shouldKeepAlive && secs > 0) { int64_t millis = static_cast(secs * 1000); this->_protocol->timer.expires_after(std::chrono::milliseconds(millis)); @@ -786,7 +786,7 @@ void HttpCommTask::sendResponse(std::unique_ptr baseRes, header->append(std::to_string(len)); header->append("\r\n\r\n", 4); - std::unique_ptr body = response.stealBody(); + std::shared_ptr body = response.stealBody(); // append write buffer and statistics double const totalTime = RequestStatistics::ELAPSED_SINCE_READ_START(stat); @@ -796,38 +796,41 @@ void HttpCommTask::sendResponse(std::unique_ptr baseRes, << "\",\"" << GeneralRequest::translateMethod(::llhttpToRequestType(&_parser)) << "\",\"" << static_cast(response.responseCode()) << "\"," << Logger::FIXED(totalTime, 6); - std::array buffers; - buffers[0] = asio_ns::buffer(header->data(), header->size()); - if (HTTP_HEAD != _parser.method) { - buffers[1] = asio_ns::buffer(body->data(), body->size()); - TRI_ASSERT(len == body->size()); - } + RequestStatistics::SET_WRITE_START(stat); - // FIXME measure performance w/o sync write - asio_ns::async_write(this->_protocol->socket, buffers, - [self = CommTask::shared_from_this(), - h = std::move(header), - b = std::move(body), - stat](asio_ns::error_code ec, size_t nwrite) { - auto* thisPtr = static_cast*>(self.get()); - RequestStatistics::SET_WRITE_END(stat); - RequestStatistics::ADD_SENT_BYTES(stat, h->size() + b->size()); - llhttp_errno_t err = llhttp_get_errno(&thisPtr->_parser); - if (ec || !thisPtr->_shouldKeepAlive || err != HPE_PAUSED) { - if (ec) { - LOG_TOPIC("2b6b4", DEBUG, arangodb::Logger::REQUESTS) - << "asio write error: '" << ec.message() << "'"; - } - thisPtr->close(); - } else { // ec == HPE_PAUSED + this->_protocol->context.io_context.post([this, self = this->shared_from_this(), header = std::move(header), body = std::move(body), stat] () mutable { + std::array buffers; + buffers[0] = asio_ns::buffer(header->data(), header->size()); + if (HTTP_HEAD != _parser.method) { + buffers[1] = asio_ns::buffer(body->data(), body->size()); + } - llhttp_resume(&thisPtr->_parser); - thisPtr->asyncReadSome(); - } - if (stat != nullptr) { - stat->release(); - } + // FIXME measure performance w/o sync write + asio_ns::async_write(this->_protocol->socket, buffers, + [self = std::move(self), + h = std::move(header), + b = std::move(body), + stat](asio_ns::error_code ec, size_t nwrite) { + auto* thisPtr = static_cast*>(self.get()); + RequestStatistics::SET_WRITE_END(stat); + RequestStatistics::ADD_SENT_BYTES(stat, h->size() + b->size()); + llhttp_errno_t err = llhttp_get_errno(&thisPtr->_parser); + if (ec || !thisPtr->_shouldKeepAlive || err != HPE_PAUSED) { + if (ec) { + LOG_TOPIC("2b6b4", DEBUG, arangodb::Logger::REQUESTS) + << "asio write error: '" << ec.message() << "'"; + } + thisPtr->close(); + } else { // ec == HPE_PAUSED + + llhttp_resume(&thisPtr->_parser); + thisPtr->asyncReadSome(); + } + if (stat != nullptr) { + stat->release(); + } + }); }); } diff --git a/arangod/IResearch/IResearchAnalyzerFeature.cpp b/arangod/IResearch/IResearchAnalyzerFeature.cpp index 06a530c616..8c00cce147 100644 --- a/arangod/IResearch/IResearchAnalyzerFeature.cpp +++ b/arangod/IResearch/IResearchAnalyzerFeature.cpp @@ -323,7 +323,7 @@ bool parse_ngram_vpack_config(const irs::string_ref& args, irs::analysis::ngram_ options.stream_bytes_type = itr->second; } - min = std::max(min, size_t(1)); + min = std::max(min, decltype(min)(1)); max = std::max(max, min); options.min_gram = min; diff --git a/arangod/MMFiles/MMFilesTransactionState.cpp b/arangod/MMFiles/MMFilesTransactionState.cpp index 5a47412a1b..e831950227 100644 --- a/arangod/MMFiles/MMFilesTransactionState.cpp +++ b/arangod/MMFiles/MMFilesTransactionState.cpp @@ -33,6 +33,7 @@ #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/MMFilesPersistentIndexFeature.h" #include "MMFiles/MMFilesTransactionCollection.h" +#include "RestServer/MetricsFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionCollection.h" @@ -126,14 +127,14 @@ Result MMFilesTransactionState::beginTransaction(transaction::Hints hints) { // all valid if (nestingLevel() == 0) { updateStatus(transaction::Status::RUNNING); - ServerStatistics::statistics()._transactionsStatistics._transactionsStarted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsStarted++; // defer writing of the begin marker until necessary! } } else { // something is wrong if (nestingLevel() == 0) { updateStatus(transaction::Status::ABORTED); - ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++; } // free what we have got so far @@ -174,7 +175,7 @@ Result MMFilesTransactionState::commitTransaction(transaction::Methods* activeTr } updateStatus(transaction::Status::COMMITTED); - ServerStatistics::statistics()._transactionsStatistics._transactionsCommitted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsCommitted++; // if a write query, clear the query cache for the participating collections if (AccessMode::isWriteOrExclusive(_type) && !_collections.empty() && @@ -203,7 +204,7 @@ Result MMFilesTransactionState::abortTransaction(transaction::Methods* activeTrx result.reset(res); updateStatus(transaction::Status::ABORTED); - ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++; if (_hasOperations) { // must clean up the query cache because the transaction diff --git a/arangod/RestHandler/RestMetricsHandler.cpp b/arangod/RestHandler/RestMetricsHandler.cpp new file mode 100644 index 0000000000..ac3b884ec8 --- /dev/null +++ b/arangod/RestHandler/RestMetricsHandler.cpp @@ -0,0 +1,68 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "RestMetricsHandler.h" + +#include "Agency/AgencyComm.h" +#include "Agency/AgencyFeature.h" +#include "Agency/Agent.h" +#include "ApplicationFeatures/ApplicationServer.h" +#include "Cluster/ServerState.h" +#include "GeneralServer/ServerSecurityFeature.h" +#include "Rest/Version.h" +#include "RestServer/ServerFeature.h" + +#include +#include + +using namespace arangodb; +using namespace arangodb::basics; +using namespace arangodb::rest; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ArangoDB server +//////////////////////////////////////////////////////////////////////////////// + +RestMetricsHandler::RestMetricsHandler( + application_features::ApplicationServer& server, + GeneralRequest* request, GeneralResponse* response) + : RestBaseHandler(server, request, response) {} + +RestStatus RestMetricsHandler::execute() { + auto& server = application_features::ApplicationServer::server(); + ServerSecurityFeature& security = server.getFeature(); + + if (!security.canAccessHardenedApi()) { + // dont leak information about server internals here + generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN); + return RestStatus::DONE; + } + + MetricsFeature& metrics = server.getFeature(); + std::string result; + metrics.toPrometheus(result); + _response->setResponseCode(rest::ResponseCode::OK); + _response->setContentType(rest::ContentType::TEXT); + _response->addRawPayload(VPackStringRef(result)); + + return RestStatus::DONE; +} diff --git a/arangod/RestHandler/RestMetricsHandler.h b/arangod/RestHandler/RestMetricsHandler.h new file mode 100644 index 0000000000..d5452f07b8 --- /dev/null +++ b/arangod/RestHandler/RestMetricsHandler.h @@ -0,0 +1,45 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_REST_HANDLER_REST_METRICS_HANDLER_H +#define ARANGOD_REST_HANDLER_REST_METRICS_HANDLER_H 1 + +#include "RestHandler/RestBaseHandler.h" +#include "RestServer/MetricsFeature.h" + +namespace arangodb { +class RestMetricsHandler : public arangodb::RestBaseHandler { + public: + RestMetricsHandler(application_features::ApplicationServer&, GeneralRequest*, + GeneralResponse*); + + char const* name() const override final { return "RestMetricsHandler"; } + RequestLane lane() const override final { return RequestLane::CLIENT_SLOW; } + RestStatus execute() override; + +}; + + + +} // namespace arangodb + +#endif diff --git a/arangod/RestServer/Metrics.cpp b/arangod/RestServer/Metrics.cpp new file mode 100644 index 0000000000..cc45b3baae --- /dev/null +++ b/arangod/RestServer/Metrics.cpp @@ -0,0 +1,98 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "Metrics.h" +#include "Logger/LogMacros.h" +#include "Logger/Logger.h" +#include "Basics/debugging.h" +#include + +using namespace arangodb; + +std::ostream& operator<< (std::ostream& o, Metrics::counter_type const& s) { + o << s.load(); + return o; +} + +std::ostream& operator<< (std::ostream& o, Counter const& s) { + o << s.load(); + return o; +} + +std::ostream& operator<< (std::ostream& o, Metrics::hist_type const& v) { + o << "["; + for (size_t i = 0; i < v.size(); ++i) { + if (i > 0) { o << ", "; } + o << v.load(i); + } + o << "]"; + return o; +} + +Metric::Metric(std::string const& name, std::string const& help) +: _name(name), _help(help) {}; + +Metric::~Metric() {} + +std::string const& Metric::help() const { return _help; } +std::string const& Metric::name() const { return _name; } + +Counter& Counter::operator++() { + count(); + return *this; +} + +Counter& Counter::operator++(int n) { + count(1); + return *this; +} + +void Counter::count() { + ++_b; +} + +void Counter::count(uint64_t n) { + _b += n; +} + +std::ostream& Counter::print(std::ostream& o) const { + o << _c; + return o; +} + +uint64_t Counter::load() const { + _b.push(); + return _c.load(); +} + +void Counter::toPrometheus(std::string& result) const { + _b.push(); + result += "#TYPE " + name() + " counter\n"; + result += "#HELP " + name() + " " + help() + "\n"; + result += name() + " " + std::to_string(load()) + "\n"; +} + +Counter::Counter(uint64_t const& val, std::string const& name, std::string const& help) : + Metric(name, help), _c(val), _b(_c) {} + +Counter::~Counter() { _b.push(); } + diff --git a/arangod/RestServer/Metrics.h b/arangod/RestServer/Metrics.h new file mode 100644 index 0000000000..1881d264f4 --- /dev/null +++ b/arangod/RestServer/Metrics.h @@ -0,0 +1,234 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_REST_SERVER_METRICS_H +#define ARANGODB_REST_SERVER_METRICS_H 1 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Basics/VelocyPackHelper.h" +#include "Logger/LogMacros.h" +#include "counter.h" + +class Counter; +template class Histogram; + +class Metric { +public: + Metric(std::string const& name, std::string const& help); + virtual ~Metric(); + std::string const& help() const; + std::string const& name() const; + virtual void toPrometheus(std::string& result) const = 0; + void header(std::string& result) const; +protected: + std::string const _name; + std::string const _help; +}; + +struct Metrics { + + enum Type {COUNTER, HISTOGRAM}; + + using counter_type = gcl::counter::simplex; + using hist_type = gcl::counter::simplex_array; + using buffer_type = gcl::counter::buffer; + +}; + + +/** + * @brief Counter functionality + */ +class Counter : public Metric { +public: + Counter(uint64_t const& val, std::string const& name, std::string const& help); + Counter(Counter const&) = delete; + ~Counter(); + std::ostream& print (std::ostream&) const; + Counter& operator++(); + Counter& operator++(int); + void count(); + void count(uint64_t); + uint64_t load() const; + void push(); + virtual void toPrometheus(std::string&) const override; +private: + mutable Metrics::counter_type _c; + mutable Metrics::buffer_type _b; +}; + + +template class Gauge : public Metric { +public: + Gauge() = delete; + Gauge(uint64_t const& val, std::string const& name, std::string const& help) + : Metric(name, help) { + _g.store(val); + } + Gauge(Gauge const&) = delete; + virtual ~Gauge(); + std::ostream& print (std::ostream&) const; + Gauge& operator+=(T const& t) { + _g.store(_g + t); + return *this; + } + Gauge& operator-=(T const& t) { + _g.store(_g - t); + return *this; + } + Gauge& operator*=(T const& t) { + _g.store(_g * t); + return *this; + } + Gauge& operator/=(T const& t) { + _g.store(_g / t); + return *this; + } + Gauge& operator=(T const& t) { + _g.store(t); + return *this; + } + T load() const { + return _g.load(); + }; + virtual void toPrometheus(std::string&) const override {}; +private: + std::atomic _g; +}; + +std::ostream& operator<< (std::ostream&, Metrics::hist_type const&); + +/** + * @brief Histogram functionality + */ +template class Histogram : public Metric { + +public: + + Histogram() = delete; + + Histogram (size_t const& buckets, T const& low, T const& high, std::string const& name, std::string const& help = "") + : Metric(name, help), _c(Metrics::hist_type(buckets)), _low(low), _high(high), + _lowr(std::numeric_limits::max()), _highr(std::numeric_limits::min()) { + TRI_ASSERT(_c.size() > 0); + _n = _c.size() - 1; + _div = std::floor((double)(high - low) / (double)_c.size()); + TRI_ASSERT(_div != 0); + } + + ~Histogram() {} + + void records(T const& t) { + if(t < _lowr) { + _lowr = t; + } else if (t > _highr) { + _highr = t; + } + } + + void count(T const& t) { + if (t < _low) { + ++_c[0]; + } else if (t >= _high) { + ++_c[_n]; + } else { + ++_c[static_cast(std::floor(t / _div))]; + } + records(t); + } + + void count(T const& t, uint64_t n) { + if (t < _low) { + _c[0] += n; + } else if (t >= _high) { + _c[_n] += n; + } else { + _c[static_cast(std::floor(t / _div))] += n; + } + records(t); + } + + T const& low() const { return _low; } + T const& high() const { return _high; } + + Metrics::hist_type::value_type& operator[](size_t n) { + return _c[n]; + } + + std::vector load() const { + std::vector v(size()); + for (size_t i = 0; i < size(); ++i) { + v[i] = load(i); + } + return v; + } + + uint64_t load(size_t i) const { return _c.load(i); }; + + size_t size() const { return _c.size(); } + + virtual void toPrometheus(std::string& result) const override { + result += "#TYPE " + name() + " histogram\n"; + result += "#HELP " + name() + " " + help() + "\n"; + T le = _low; + T sum = T(0); + for (size_t i = 0; i < size(); ++i) { + uint64_t n = load(i); + sum += n; + result += name() + "_bucket{le=\"" + std::to_string(le) + "\"} " + + std::to_string(n) + "\n"; + le += _div; + } + result += name() + "_count " + std::to_string(sum) + "\n"; + } + + std::ostream& print(std::ostream& o) const { + o << "_div: " << _div << ", _c: " << _c << ", _r: [" << _lowr << ", " << _highr << "] " << name(); + return o; + } + + Metrics::hist_type _c; + T _low, _high, _div, _lowr, _highr; + size_t _n; + +}; + + +std::ostream& operator<< (std::ostream&, Metrics::counter_type const&); +template +std::ostream& operator<<(std::ostream& o, Histogram const& h) { + return h.print(o); +} + + +#endif diff --git a/arangod/RestServer/MetricsFeature.cpp b/arangod/RestServer/MetricsFeature.cpp new file mode 100644 index 0000000000..187407ecda --- /dev/null +++ b/arangod/RestServer/MetricsFeature.cpp @@ -0,0 +1,149 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "MetricsFeature.h" +#include "ApplicationFeatures/GreetingsFeaturePhase.h" +#include "Basics/application-exit.h" +#include "Logger/LogMacros.h" +#include "Logger/Logger.h" +#include "Logger/LoggerStream.h" +#include "ProgramOptions/ProgramOptions.h" +#include "ProgramOptions/Section.h" +#include "RestServer/Metrics.h" +#include "RocksDBEngine/RocksDBEngine.h" +#include "MMFiles/MMFilesEngine.h" +#include "Statistics/StatisticsFeature.h" +#include "StorageEngine/EngineSelectorFeature.h" +#include "StorageEngine/StorageEngine.h" +#include "StorageEngine/StorageEngineFeature.h" + +#include +#include + +using namespace arangodb; +using namespace arangodb::application_features; +using namespace arangodb::basics; +using namespace arangodb::options; + +// ----------------------------------------------------------------------------- +// --SECTION-- global variables +// ----------------------------------------------------------------------------- + + +// ----------------------------------------------------------------------------- +// --SECTION-- MetricsFeature +// ----------------------------------------------------------------------------- + +MetricsFeature* MetricsFeature::METRICS = nullptr; + +#include +MetricsFeature::MetricsFeature(application_features::ApplicationServer& server) + : ApplicationFeature(server, "Metrics"), + _enabled(true) { + METRICS = this; + _serverStatistics = new + ServerStatistics(std::chrono::duration( + std::chrono::system_clock::now().time_since_epoch()).count()); + setOptional(false); + startsAfter(); + startsBefore(); +} + +void MetricsFeature::collectOptions(std::shared_ptr options) {} + +void MetricsFeature::validateOptions(std::shared_ptr) {} + +void MetricsFeature::unprepare() { + METRICS = nullptr; +} + +void MetricsFeature::prepare() {} + +double time() { + return std::chrono::duration( // time since epoch in seconds + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +void MetricsFeature::toPrometheus(std::string& result) const { + { + std::lock_guard guard(_lock); + for (auto const& i : _registry) { + i.second->toPrometheus(result); + } + } + + // StatisticsFeature + auto& sf = server().getFeature(); + if (sf.enabled()) { + sf.toPrometheus(result, std::chrono::duration( + std::chrono::system_clock::now().time_since_epoch()).count()); + } + + // RocksDBEngine + auto es = EngineSelectorFeature::ENGINE; + if (es != nullptr) { + std::string const& engineName = es->typeName(); + if (engineName == RocksDBEngine::EngineName) { + es->getStatistics(result); + } + } +} + +Counter& MetricsFeature::counter ( + std::string const& name, uint64_t const& val, std::string const& help) { + std::lock_guard guard(_lock); + auto const it = _registry.find(name); + if (it != _registry.end()) { + LOG_TOPIC("8523d", ERR, Logger::STATISTICS) << "Failed to retrieve histogram " << name; + TRI_ASSERT(false); + } + auto c = std::make_shared(val, name, help); + _registry.emplace(name, std::dynamic_pointer_cast(c)); + return *c; +}; + +Counter& MetricsFeature::counter (std::string const& name) { + std::lock_guard guard(_lock); + auto it = _registry.find(name); + if (it == _registry.end()) { + LOG_TOPIC("32d58", ERR, Logger::STATISTICS) + << "Failed to retrieve counter " << name; + TRI_ASSERT(false); + throw std::exception(); + } + std::shared_ptr h; + try { + h = std::dynamic_pointer_cast(it->second); + } catch (std::exception const& e) { + LOG_TOPIC("853d2", ERR, Logger::STATISTICS) + << "Failed to retrieve counter " << name; + TRI_ASSERT(false); + throw(e); + } + return *h; +}; + + +ServerStatistics& MetricsFeature::serverStatistics() { + return *_serverStatistics; +} diff --git a/arangod/RestServer/MetricsFeature.h b/arangod/RestServer/MetricsFeature.h new file mode 100644 index 0000000000..f2e06b52c9 --- /dev/null +++ b/arangod/RestServer/MetricsFeature.h @@ -0,0 +1,154 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_REST_SERVER_METRICS_FEATURE_H +#define ARANGODB_REST_SERVER_METRICS_FEATURE_H 1 + +#include "ApplicationFeatures/ApplicationFeature.h" +#include "Logger/LoggerFeature.h" +#include "Logger/LogMacros.h" +#include "ProgramOptions/ProgramOptions.h" +#include "RestServer/Metrics.h" +#include "Statistics/ServerStatistics.h" + +namespace arangodb { + +class MetricsFeature final : public application_features::ApplicationFeature { + public: + bool enabled() const { + return _enabled; + } + + static double time(); + + explicit MetricsFeature(application_features::ApplicationServer& server); + + void collectOptions(std::shared_ptr) override final; + void validateOptions(std::shared_ptr) override final; + void prepare() override final; + void unprepare() override final; + + template Histogram& + histogram (std::string const& name, size_t const& buckets, T const& low, + T const& high, std::string const& help = std::string()) { + + std::lock_guard guard(_lock); + auto const it = _registry.find(name); + if (it != _registry.end()) { + LOG_TOPIC("32e85", ERR, Logger::STATISTICS) << "histogram " << name << " alredy exists"; + TRI_ASSERT(false); + } + auto h = std::make_shared>(buckets, low, high, name, help); + _registry.emplace(name, std::dynamic_pointer_cast(h)); + return *h; + }; + + template Histogram& histogram (std::string const& name) { + std::lock_guard guard(_lock); + auto const it = _registry.find(name); + if (it == _registry.end()) { + LOG_TOPIC("32d85", ERR, Logger::STATISTICS) << "No histogram booked as " << name; + TRI_ASSERT(false); + throw std::exception(); + } + std::shared_ptr> h = nullptr; + try { + h = std::dynamic_pointer_cast>(*it->second); + if (h == nullptr) { + LOG_TOPIC("d2358", ERR, Logger::STATISTICS) << "Failed to retrieve histogram " << name; + } + } catch (std::exception const& e) { + LOG_TOPIC("32d75", ERR, Logger::STATISTICS) + << "Failed to retrieve histogram " << name << ": " << e.what(); + } + if (h == nullptr) { + TRI_ASSERT(false); + } + + return *h; + }; + + Counter& counter(std::string const& name, uint64_t const& val, std::string const& help); + Counter& counter(std::string const& name); + + template + Gauge& gauge(std::string const& name, T const& t, std::string const& help) { + std::lock_guard guard(_lock); + auto it = _registry.find(name); + if (it != _registry.end()) { + LOG_TOPIC("c7b37", ERR, Logger::STATISTICS) << "gauge " << name << " alredy exists"; + TRI_ASSERT(false); + throw(std::exception()); + } + auto g = std::make_shared(t, name, help); + _registry.emplace(name, std::dynamic_pointer_cast(g)); + return *g; + } + + template Gauge& gauge(std::string const& name) { + std::lock_guard guard(_lock); + auto it = _registry.find(name); + if (it == _registry.end()) { + LOG_TOPIC("5832d", ERR, Logger::STATISTICS) << "No metric booked as " << name; + TRI_ASSERT(false); + throw std::exception(); + } + std::shared_ptr> g = nullptr; + try { + g = std::dynamic_pointer_cast>(it->second); + if (g == nullptr) { + LOG_TOPIC("d4368", ERR, Logger::STATISTICS) << "Failed to retrieve gauge metric " << name; + TRI_ASSERT(false); + throw std::exception(); + } + } catch (std::exception const& e) { + LOG_TOPIC("c2348", ERR, Logger::STATISTICS) + << "Failed to retrieve gauge metric " << name << ": " << e.what(); + TRI_ASSERT(false); + throw(e); + } + } + + void toPrometheus(std::string& result) const; + + static MetricsFeature* metrics() { + return METRICS; + } + + ServerStatistics& serverStatistics(); + + private: + static MetricsFeature* METRICS; + + bool _enabled; + + std::unordered_map> _registry; + + mutable std::mutex _lock; + + ServerStatistics* _serverStatistics; + +}; + +} // namespace arangodb + +#endif diff --git a/arangod/RestServer/arangod.cpp b/arangod/RestServer/arangod.cpp index 2c82d7eb28..db64272848 100644 --- a/arangod/RestServer/arangod.cpp +++ b/arangod/RestServer/arangod.cpp @@ -89,6 +89,7 @@ #include "RestServer/InitDatabaseFeature.h" #include "RestServer/LanguageCheckFeature.h" #include "RestServer/LockfileFeature.h" +#include "RestServer/MetricsFeature.h" #include "RestServer/QueryRegistryFeature.h" #include "RestServer/ScriptFeature.h" #include "RestServer/ServerFeature.h" @@ -155,6 +156,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) { std::type_index(typeid(GreetingsFeature)), std::type_index(typeid(HttpEndpointProvider)), std::type_index(typeid(LoggerBufferFeature)), + std::type_index(typeid(MetricsFeature)), std::type_index(typeid(pregel::PregelFeature)), std::type_index(typeid(ServerFeature)), std::type_index(typeid(SslServerFeature)), @@ -228,6 +230,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) { std::vector{std::type_index(typeid(ScriptFeature))}); server.addFeature(); server.addFeature(); + server.addFeature(); server.addFeature(); server.addFeature(); server.addFeature(name); diff --git a/arangod/RestServer/counter.h b/arangod/RestServer/counter.h new file mode 100644 index 0000000000..b82a77cb6f --- /dev/null +++ b/arangod/RestServer/counter.h @@ -0,0 +1,949 @@ +// Copyright 2009 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "dynarray.h" +#include + +#include +#include + +namespace gcl { + +/* + + +INTRODUCTION + +This file implements highly concurrent distributed counters. +The intent is to minimize the cost of incrementing the counter, +accepting increased costs to obtain the count. +That is, these counters are appropriate to code with +very frequent counter increments but relatively rare counter reads. + +These counters are parameterized by the base integer type +that maintains the count. +Avoid situations that overflow the integer, +as that may have undefined behavior. +This constraint implies that counters must be sized to their use. + + +GENERAL METHODS + +The general counter methods are as follows. + +( integer ): +The parameter is the initial counter value. + +(): +Equivalent to an initial value of zero. + +void operator +=( integer ): +void operator -=( integer ): +Add/subtract a value to/front the counter. +There is no default value. + +void operator ++(): // prefix +void operator ++(int): // postfix +void operator --(): // prefix +void operator --(int): // postfix +Increment or decrement the counter. + +integer load(): +Returns the value of the counter. + +integer exchange( integer ): +Replaces the existing count by the count in the parameter +and returns the previous count, +which enables safe concurrent count extraction. + +There are no copy or assignment operations. + + +SIMPLEX COUNTERS + +The simplex counters provide low-latency counting. +They implement all the general operations. + + counter::simplex red_count; + + void count_red( Bag bag ) { + for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ ) + if ( is_red( *i ) ) + ++red_count; + } + +Note that this code may have significant cost +because of the repeated global atomic increments. +Using a temporary int to maintain the count within the loop +runs the risk of loosing some counts in the event of an exception. + + +COUNTER BUFFERS + +The cost of incrementing the counter is reduced +by using a buffer as a proxy for the counter. +The counter is a reference parameter to the buffer. +This buffer is typically accessed by a single thread, +and so its cost can be substantially lower. + + counter::simplex red_count; + + void count_red( Bag bag ) { + counter::buffer local_red( red_count ); + for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ ) + if ( is_red( *i ) ) + ++local_red; + } + +The buffer will automatically transfer its count +to the main counter on destruction. +If this latency is too great, +use the push method to transfer the count early. + + void count_red( Bag bag1, Bag bag2 ) { + counter::buffer local_red( red_count ); + for ( Bag::iterator i = bag1.begin(); i != bag1.end(); i++ ) + if ( is_red( *i ) ) + ++local_red; + local_red.push(); + for ( Bag::iterator i = bag2.begin(); i != bag2.end(); i++ ) + if ( is_red( *i ) ) + ++local_red; + } + +Any increments on buffers since the last push +will not be reflected in the value reported by a load of the counter. +The destructor does an implicit push. +The lifetime of the counter must be strictly larger than +the lifetimes of any buffers attached to it. + + +DUPLEX COUNTERS + +The push model of buffers sometimes yields an unacceptable lag +in the observed value of the count. +To avoid this lag, +there are duplex counters. +A duplex counter is paired with a broker, +the counter can query the broker for counts, +thus maintaining a somewhat current view of the count. + + counter::strong_duplex red_count; + + void count_red( Bag bag ) + counter::strong_broker broker( red_count ); + for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ ) + if ( is_red( *i ) ) + ++broker; + } + +Another thread may call red_count.load() and get the current count. +That operation will poll each broker for its count and return the sum. +Naturally, any increments done to a broker after it is polled will be missed, +but no counts will be lost. + +The primary use case for duplex counters +is to enable fast thread-local increments +while still maintaining a decent global count. + + weak_duplex red_count; + thread_local weak_broker thread_red( red_count ); + + void count_red( Bag bag ) + for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ ) + if ( is_red( *i ) ) + ++thread_red; + } + + +WEAK DUPLEX COUNTERS + +The exchange operation works +by atomically transfering broker counts to the main counter, +and then exchanging out of the main counter. +Consequently, +every count will be extracted by one and only one exchange operation. + +However, that exchange operation can be expensive because +it and the broker increment operations +require write atomicity to the same broker object. +To reduce that concurrency cost, +the weak_duplex counter and its weak_broker +do not provide the exchange operation. +This difference +means that the polling is a read-only operation +and requires less synchronization. +Use this counter when you do not intend to exchange values. + + +BUFFERING BROKERS + +Thread-local increments, while cheaper than shared global increments, +are still more expensive than function-local increments. +To mitigate that cost, buffers work with brokers as well. + + weak_duplex red_count; + thread_local weak_broker thread_red( red_count ); + + void count_red( Bag bag ) + buffer local_red( thread_red ); + for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ ) + if ( is_red( *i ) ) + ++local_red; + } + +As with buffers in general, +the count transfers only on destruction a push operation. +Consequently, red_count.load() will not reflect any counts in buffers. +Those counts will not be lost, though. + + +COUNTER ARRAYS + +Counter arrays provide a means to handle many counters with one name. +The size of the counter arrays is fixed at construction time. +Counter arrays have the same structure as single-value counters, +with the following exceptions. + +Increment a counter by indexing its array +and then applying one of the operations. +E.g. ++ctr_ary[i]. + +The load and exchange operations take an additional index parameter. + +Do we want to initialize a counter array with an initializer list? +Do we want to return a dynarray for the load operation? +Do we want to pass and return a dynarray for the exchange operation? + + +ATOMICITY + +In the course of program evolution, debugging and tuning, +a counter may desire implementation with weaker concurrency requirements. +That is accomplished by explicitly specifing the atomicity. +For example, suppose it is discovered that a counter + + counter::simplex red_count; + +is only ever read and written from a single thread. +We can avoid the cost of atomic operations by making the counter serial. + + counter::simplex red_count; + +This approach preserves the programming interface. + +The counters support three levels of atomicity. +These are specified as a (usually defaulted) template parameter. + +counter::atomicity::none // only one thread may access the counter +counter::atomicity::semi // multiple readers, but only one writer +counter::atomicity::full // multiple readers and writers + +Buffers have two template parameters for atomicity, +one for the atomicity of the counter it is modifying, +and one for atomicity of the buffer itself. +By exlicitly specifying this atomicity, +one can build unusual configurations of buffers for unusual situations. +For example, suppose increments of red_count +tend to cluster in tasks with high processor affinity. +By separating those counts with a global intermediate buffer, +counting can exploit processor locality +and avoid substantial inter-processor communication. +For example, + + counter::simplex red_count; + counter::buffer + red_squares( red_count ); + counter::buffer + red_circles( red_count ); + + void count_red_squares( Bag bag ) { + counter::buffer local_red( red_squares ); + for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ ) + if ( is_red( *i ) ) + ++local_red; + } + +The red_squares variable is global, +and will automatically transfer its count to red_count +only on global variable destruction +This transfer is likely to be too late for many purposes. +One solution is explicit programming to call red_squares.push() +at appropriate times. +Another solution is to use duplex counters. + + +GUIDELINES FOR USE + +Use a simplex counter +when you have a low rate of updates or a high read rate +or little tolerance for latency. +Use a strong duplex counter and broker +when your update rate is significantly higher than the load rate, +you can tolerate latency in counting, +and you need the exchange operation. +Use a weak duplex counter and broker +when your update rate is significantly higher than the load rate, +you can tolerate latency in counting, +but you do not need the exchange operation. +Use buffers to collect short-term bursts of counts. + +The operations of the counters, brokers, and buffers +have the following rough costs. + + + strong weak + simplex duplex duplex + +update atomic rmw atomic rmw atomic rmw + +load atomic read mutex + n * mutex + n * + atomic read atomic read + +exchange atomic rmw mutex + n * n/a + atomic rmw + +construction trivial std::set std::set + +destruction trivial std::set std::set + + + == == == + + strong weak + buffer broker broker + +update serial atomic rmw atomic + read & write read & write + +construction pointer mutex + mutex + + assign set.insert set.insert + +destruction pointer mutex + mutex + + assign set.remove set.remove + + +IMPLEMENTATION + +The lowest implementation layer level of the organization is a bumper. +Bumpers provide the interface of simplex counter, +but only the increment and decrement interface is public. +The rest are protected. +Buffer constructors require a reference to a bumper. +Simplex counters, buffers, duplex counters, and buffers +are all derived from a bumper, +which enables buffers to connect to all of them. + + +*/ + +/* Implementation .... */ + +/* + The operation defined out-of-line + break cyclic dependences in the class definitions. + They are expected to be rarely called, and efficiency is less of a concern. +*/ + +namespace counter { + +enum class atomicity +{ + none, // permits access to only one thread + semi, // allows multiple readers, but only one writer + full // allows multiple readers and writers +}; + +/* + The bumper classes provide the minimal increment and decrement interface. + They serve as base classes for the public types. +*/ + +template< typename Integral, atomicity Atomicity > +class bumper; + +template< typename Integral > +class bumper< Integral, atomicity::none > +{ + bumper( const bumper& ) = delete; + bumper& operator=( const bumper& ) = delete; +public: + void operator +=( Integral by ) { value_ += by; } + void operator -=( Integral by ) { value_ -= by; } + void operator ++() { *this += 1; } + void operator ++(int) { *this += 1; } + void operator --() { *this -= 1; } + void operator --(int) { *this -= 1; } +protected: + constexpr bumper( Integral in ) : value_( in ) {} + constexpr bumper() : value_( 0 ) {} + Integral load() const { return value_; } + Integral exchange( Integral to ) + { Integral tmp = value_; value_ = to; return tmp; } + Integral value_; + template< typename, atomicity > + friend class bumper_array; + template< typename, atomicity, atomicity > + friend class buffer_array; + friend struct std::dynarray< bumper >; +}; + +template< typename Integral > +class bumper< Integral, atomicity::semi > +{ + bumper( const bumper& ) = delete; + bumper& operator=( const bumper& ) = delete; +public: + void operator +=( Integral by ) + { value_.store( value_.load( std::memory_order_relaxed ) + by, + std::memory_order_relaxed ); } + void operator -=( Integral by ) + { value_.store( value_.load( std::memory_order_relaxed ) - by, + std::memory_order_relaxed ); } + void operator ++() { *this += 1; } + void operator ++(int) { *this += 1; } + void operator --() { *this -= 1; } + void operator --(int) { *this -= 1; } +protected: + constexpr bumper( Integral in ) : value_( in ) {} + constexpr bumper() : value_( 0 ) {} + Integral load() const { return value_.load( std::memory_order_relaxed ); } + Integral exchange( Integral to ) + { Integral tmp = value_.load( std::memory_order_relaxed ); + value_.store( to, std::memory_order_relaxed ); return tmp; } + std::atomic< Integral > value_; + template< typename, atomicity > + friend class bumper_array; + template< typename, atomicity, atomicity > + friend class buffer_array; + friend struct std::dynarray< bumper >; +}; + +template< typename Integral > +class bumper< Integral, atomicity::full > +{ + bumper( const bumper& ) = delete; + bumper& operator=( const bumper& ) = delete; +public: + void operator +=( Integral by ) + { value_.fetch_add( by, std::memory_order_relaxed ); } + void operator -=( Integral by ) + { value_.fetch_sub( by, std::memory_order_relaxed ); } + void operator ++() { *this += 1; } + void operator ++(int) { *this += 1; } + void operator --() { *this -= 1; } + void operator --(int) { *this -= 1; } +protected: + constexpr bumper( Integral in ) : value_( in ) {} + constexpr bumper() : value_( 0 ) {} + Integral load() const { return value_.load( std::memory_order_relaxed ); } + Integral exchange( Integral to ) + { return value_.exchange( to, std::memory_order_relaxed ); } + std::atomic< Integral > value_; + template< typename, atomicity > + friend class bumper_array; + template< typename, atomicity, atomicity > + friend class buffer_array; + friend struct std::dynarray< bumper >; +}; + +/* + The simplex counters. +*/ + +template< typename Integral, + atomicity Atomicity = atomicity::full > +class simplex +: public bumper< Integral, Atomicity > +{ + typedef bumper< Integral, Atomicity > base_type; +public: + constexpr simplex() : base_type( 0 ) {} + constexpr simplex( Integral in ) : base_type( in ) {} + simplex( const simplex& ) = delete; + simplex& operator=( const simplex& ) = delete; + Integral load() const { return base_type::load(); } + Integral exchange( Integral to ) { return base_type::exchange( to ); } +}; + +/* + Buffers reduce contention on counters. + They require template parameters to specify the atomicity + of the prime counter and of the buffer itself. + The lifetime of the prime must cover the lifetime of the buffer. + Any increments in the buffer since the last buffer::push call + are not reflected in the queries on the prime. +*/ + +template< typename Integral, + atomicity PrimeAtomicity = atomicity::full, + atomicity BufferAtomicity = atomicity::none > +class buffer +: public bumper< Integral, BufferAtomicity > +{ + typedef bumper< Integral, PrimeAtomicity > prime_type; + typedef bumper< Integral, BufferAtomicity > base_type; +public: + buffer() = delete; + buffer( prime_type& p ) : base_type( 0 ), prime_( p ) {} + buffer( const buffer& ) = delete; + buffer& operator=( const buffer& ) = delete; + void push() + { Integral value = base_type::exchange( 0 ); + if ( value != 0 ) prime_ += value; } + ~buffer() { push(); } +private: + prime_type& prime_; +}; + +/* + Duplex counters enable a "pull" model of counting. + Each counter, the prime, may have one or more brokers. + The lifetime of the prime must cover the lifetime of the brokers. + The duplex counter queries may fail to detect any counting done concurrently. + The query operations are expected to be rare relative to the counting. + The weak duplex counter does not support the exchange operation, + which means that you cannot extract counts early. +*/ + +template< typename Integral > class strong_broker; + +template< typename Integral > class strong_duplex +: public bumper< Integral, atomicity::full > +{ + typedef bumper< Integral, atomicity::full > base_type; + typedef strong_broker< Integral > broker_type; + friend class strong_broker< Integral >; +public: + strong_duplex() : base_type( 0 ) {} + strong_duplex( Integral in ) : base_type( in ) {} + Integral load() const; + Integral exchange( Integral to ); + ~strong_duplex(); +private: + void insert( broker_type* child ); + void erase( broker_type* child, Integral by ); + std::mutex serializer_; + typedef std::unordered_set< broker_type* > set_type; + set_type children_; +}; + +template< typename Integral > class strong_broker +: public bumper< Integral, atomicity::full > +{ + typedef bumper< Integral, atomicity::full > base_type; + typedef strong_duplex< Integral > duplex_type; + friend class strong_duplex< Integral >; +public: + strong_broker( duplex_type& p ); + strong_broker() = delete; + strong_broker( const strong_broker& ) = delete; + strong_broker& operator=( const strong_broker& ) = delete; + ~strong_broker(); +private: + Integral poll() const { return base_type::load(); } + Integral drain() { return base_type::exchange( 0 ); } + duplex_type& prime_; +}; + +template< typename Integral > +void strong_duplex< Integral >::insert( broker_type* child ) +{ + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.insert( child ).second ); +} + +template< typename Integral > +void strong_duplex< Integral >::erase( broker_type* child, Integral by ) +{ + this->operator +=( by ); + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.erase( child ) == 1 ); +} + +template< typename Integral > +Integral strong_duplex< Integral >::load() const +{ + typedef typename set_type::iterator iterator; + Integral tmp = 0; + { + std::lock_guard< std::mutex > _( serializer_ ); + iterator rollcall = children_.begin(); + for ( ; rollcall != children_.end(); rollcall++ ) + tmp += (*rollcall)->poll(); + } + return tmp + base_type::load(); +} + +template< typename Integral > +Integral strong_duplex< Integral >::exchange( Integral to ) +{ + typedef typename set_type::iterator iterator; + Integral tmp = 0; + { + std::lock_guard< std::mutex > _( serializer_ ); + iterator rollcall = children_.begin(); + for ( ; rollcall != children_.end(); rollcall++ ) + tmp += (*rollcall)->drain(); + } + return tmp + base_type::exchange( to ); +} + +template< typename Integral > +strong_duplex< Integral >::~strong_duplex() +{ + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.size() == 0 ); +} + +template< typename Integral > +strong_broker< Integral >::strong_broker( duplex_type& p ) +: + base_type( 0 ), + prime_( p ) +{ + prime_.insert( this ); +} + +template< typename Integral > +strong_broker< Integral >::~strong_broker() +{ + prime_.erase( this, base_type::load() ); +} + +template< typename Integral > class weak_broker; + +template< typename Integral > class weak_duplex +: public bumper< Integral, atomicity::full > +{ + typedef bumper< Integral, atomicity::full > base_type; + typedef weak_broker< Integral > broker_type; + friend class weak_broker< Integral >; +public: + weak_duplex() : base_type( 0 ) {} + weak_duplex( Integral in ) : base_type( in ) {} + weak_duplex( const weak_duplex& ) = delete; + weak_duplex& operator=( const weak_duplex& ) = delete; + Integral load() const; + ~weak_duplex(); +private: + void insert( broker_type* child ); + void erase( broker_type* child, Integral by ); + std::mutex serializer_; + typedef std::unordered_set< broker_type* > set_type; + set_type children_; +}; + +template< typename Integral > class weak_broker +: public bumper< Integral, atomicity::semi > +{ + typedef bumper< Integral, atomicity::semi > base_type; + typedef weak_duplex< Integral > duplex_type; + friend class weak_duplex< Integral >; +public: + weak_broker( duplex_type& p ); + weak_broker() = delete; + weak_broker( const weak_broker& ) = delete; + weak_broker& operator=( const weak_broker& ) = delete; + ~weak_broker(); +private: + Integral poll() const { return base_type::load(); } + duplex_type& prime_; +}; + +template< typename Integral > +void weak_duplex< Integral >::insert( broker_type* child ) +{ + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.insert( child ).second ); +} + +template< typename Integral > +void weak_duplex< Integral >::erase( broker_type* child, Integral by ) +{ + std::lock_guard< std::mutex > _( serializer_ ); + this->operator +=( by ); + assert( children_.erase( child ) == 1 ); +} + +template< typename Integral > +Integral weak_duplex< Integral >::load() const +{ + typedef typename set_type::iterator iterator; + Integral tmp = 0; + { + std::lock_guard< std::mutex > _( serializer_ ); + iterator rollcall = children_.begin(); + for ( ; rollcall != children_.end(); rollcall++ ) + tmp += (*rollcall)->poll(); + tmp += base_type::load(); + } + return tmp; +} + +template< typename Integral > +weak_duplex< Integral >::~weak_duplex() +{ + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.size() == 0 ); +} + +template< typename Integral > +weak_broker< Integral >::weak_broker( duplex_type& p ) +: + base_type( 0 ), + prime_( p ) +{ + prime_.insert( this ); +} + +template< typename Integral > +weak_broker< Integral >::~weak_broker() +{ + prime_.erase( this, base_type::load() ); +} + + +// Counter arrays. + + +template< typename Integral, + atomicity Atomicity = atomicity::full > +class bumper_array +{ +public: + typedef bumper< Integral, Atomicity > value_type; +private: + typedef std::dynarray< value_type > storage_type; +public: + typedef typename storage_type::size_type size_type; + bumper_array() = delete; + bumper_array( size_type size ) : storage( size ) {} + bumper_array( const bumper_array& ) = delete; + bumper_array& operator=( const bumper_array& ) = delete; + value_type& operator[]( size_type idx ) { return storage[ idx ]; } + size_type size() const { return storage.size(); } +protected: + Integral load( size_type idx ) const { return storage[ idx ].load(); } + Integral exchange( size_type idx, Integral value ) + { return storage[ idx ].exchange( value ); } +private: + storage_type storage; +}; + +template< typename Integral, + atomicity Atomicity = atomicity::full > +class simplex_array +: public bumper_array< Integral, Atomicity > +{ + typedef bumper_array< Integral, Atomicity > base_type; +public: + typedef typename base_type::value_type value_type; + typedef typename base_type::size_type size_type; + simplex_array() = delete; + constexpr simplex_array( size_type size ) : base_type( size ) {} + simplex_array( const simplex_array& ) = delete; + simplex_array& operator=( const simplex_array& ) = delete; + Integral load( size_type idx ) const{ return base_type::load( idx ); } + Integral exchange( size_type idx, Integral value ) + { return base_type::exchange( idx, value ); } + value_type& operator[]( size_type idx ) { return base_type::operator[]( idx ); } + size_type size() const { return base_type::size(); } +}; + +template< typename Integral, + atomicity PrimeAtomicity = atomicity::full, + atomicity BufferAtomicity = atomicity::full > +class buffer_array +: public bumper_array< Integral, BufferAtomicity > +{ + typedef bumper_array< Integral, BufferAtomicity > base_type; + typedef bumper_array< Integral, PrimeAtomicity > prime_type; +public: + typedef typename base_type::value_type value_type; + typedef typename base_type::size_type size_type; + buffer_array() = delete; + buffer_array( prime_type& p ) : base_type( p.size() ), prime_( p ) {} + buffer_array( const buffer_array& ) = delete; + buffer_array& operator=( const buffer_array& ) = delete; + void push( size_type idx ) + { Integral value = base_type::operator[]( idx ).exchange( 0 ); + if ( value != 0 ) prime_[ idx ] += value; } + void push(); + size_type size() const { return base_type::size(); } + ~buffer_array() { push(); } +private: + prime_type& prime_; +}; + +template< typename Integral, + atomicity BufferAtomicity, atomicity PrimeAtomicity > +void +buffer_array< Integral, BufferAtomicity, PrimeAtomicity >::push() +{ + int size = base_type::size(); + for ( int i = 0; i < size; ++i ) + push( i ); +} + +// Duplex arrays + +template< typename Integral > class strong_broker_array; + +template< typename Integral > class strong_duplex_array +: public bumper_array< Integral, atomicity::full > +{ + typedef bumper_array< Integral, atomicity::full > base_type; + typedef strong_broker_array< Integral > broker_type; + friend class strong_broker_array< Integral >; +public: + typedef typename base_type::value_type value_type; + typedef typename base_type::size_type size_type; + strong_duplex_array() = delete; + constexpr strong_duplex_array( size_type size ) + : base_type( size ) {} + strong_duplex_array( const strong_duplex_array& ) = delete; + strong_duplex_array& operator=( const strong_duplex_array& ) = delete; + value_type& operator[]( size_type idx ) { return base_type::operator[]( idx ); } + size_type size() const { return base_type::size(); } + ~strong_duplex_array(); +private: + void insert( broker_type* child ); + void erase( broker_type* child, Integral by ); + std::mutex serializer_; + typedef std::unordered_set< broker_type* > set_type; + set_type children_; +}; + +template< typename Integral > class strong_broker_array +: public bumper_array< Integral, atomicity::semi > +{ + typedef bumper_array< Integral, atomicity::semi > base_type; + typedef strong_duplex_array< Integral > duplex_type; + friend class strong_duplex_array< Integral >; +public: + typedef typename base_type::value_type value_type; + typedef typename base_type::size_type size_type; + strong_broker_array() = delete; + strong_broker_array( duplex_type& p ); + strong_broker_array( const strong_broker_array& ) = delete; + strong_broker_array& operator=( const strong_broker_array& ) = delete; + size_type size() const { return base_type::size(); } + ~strong_broker_array(); +private: + Integral poll( size_type idx ) + { return base_type::operator[]( idx ).load(); } + Integral drain( size_type idx ) + { return base_type::operator[]( idx ).exchange( 0 ); } + duplex_type& prime_; +}; + +template< typename Integral > +strong_duplex_array< Integral >::~strong_duplex_array() +{ + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.size() == 0 ); +} + +template< typename Integral > +strong_broker_array< Integral >::strong_broker_array( duplex_type& p ) +: + base_type( p.size() ), + prime_( p ) +{ + prime_.insert( this ); +} + +template< typename Integral > +strong_broker_array< Integral >::~strong_broker_array() +{ + prime_.erase( this, base_type::load() ); +} + + +template< typename Integral > class weak_broker_array; + +template< typename Integral > class weak_duplex_array +: public bumper_array< Integral, atomicity::full > +{ + typedef bumper_array< Integral, atomicity::full > base_type; + typedef weak_broker_array< Integral > broker_type; + friend class weak_broker_array< Integral >; +public: + typedef typename base_type::value_type value_type; + typedef typename base_type::size_type size_type; + weak_duplex_array() = delete; + constexpr weak_duplex_array( size_type size ) + : base_type( size ) {} + weak_duplex_array( const weak_duplex_array& ) = delete; + weak_duplex_array& operator=( const weak_duplex_array& ) = delete; + value_type& operator[]( size_type idx ) { return base_type::operator[]( idx ); } + size_type size() const { return base_type::size(); } + ~weak_duplex_array(); +private: + void insert( broker_type* child ); + void erase( broker_type* child, Integral by ); + std::mutex serializer_; + typedef std::unordered_set< broker_type* > set_type; + set_type children_; +}; + +template< typename Integral > class weak_broker_array +: public bumper_array< Integral, atomicity::semi > +{ + typedef bumper_array< Integral, atomicity::semi > base_type; + typedef weak_duplex_array< Integral > duplex_type; + friend class weak_duplex_array< Integral >; +public: + typedef typename base_type::value_type value_type; + typedef typename base_type::size_type size_type; + weak_broker_array() = delete; + weak_broker_array( duplex_type& p ); + weak_broker_array( const weak_broker_array& ) = delete; + weak_broker_array& operator=( const weak_broker_array& ) = delete; + size_type size() const { return base_type::size(); } + ~weak_broker_array(); +private: + Integral poll( size_type idx ) + { return base_type::operator[]( idx ).load(); } + duplex_type& prime_; +}; + +template< typename Integral > +weak_duplex_array< Integral >::~weak_duplex_array() +{ + std::lock_guard< std::mutex > _( serializer_ ); + assert( children_.size() == 0 ); +} + +template< typename Integral > +weak_broker_array< Integral >::weak_broker_array( duplex_type& p ) +: + base_type( p.size() ), + prime_( p ) +{ + prime_.insert( this ); +} + +template< typename Integral > +weak_broker_array< Integral >::~weak_broker_array() +{ + prime_.erase( this, base_type::load() ); +} + + + +} // namespace counter + +} // namespace gcl diff --git a/arangod/RestServer/dynarray.h b/arangod/RestServer/dynarray.h new file mode 100644 index 0000000000..96308ccf63 --- /dev/null +++ b/arangod/RestServer/dynarray.h @@ -0,0 +1,116 @@ +// Copyright 2009 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace std { + +struct bad_array_length_ { }; + +template< class T > +struct dynarray +{ + // types: + typedef T value_type; + typedef T& reference; + typedef const T& const_reference; + typedef T* iterator; + typedef const T* const_iterator; + typedef std::reverse_iterator reverse_iterator; + typedef std::reverse_iterator const_reverse_iterator; + typedef size_t size_type; + typedef ptrdiff_t difference_type; + + // fields: +private: + T* store; + size_type count; + + // helper functions: + void check(size_type n) + { if ( n >= count ) throw out_of_range("dynarray"); } + T* alloc(size_type n) + { if ( n > std::numeric_limits::max()/sizeof(T) ) + throw std::bad_array_length_(); + return reinterpret_cast( new char[ n*sizeof(T) ] ); } + +public: + // construct and destruct: + dynarray() = delete; + const dynarray operator=(const dynarray&) = delete; + + explicit dynarray(size_type c) + : store( alloc( c ) ), count( c ) + { size_type i; + try { + for ( size_type i = 0; i < count; ++i ) + new (store+i) T; + } catch ( ... ) { + for ( ; i > 0; --i ) + (store+(i-1))->~T(); + throw; + } } + + dynarray(const dynarray& d) + : store( alloc( d.count ) ), count( d.count ) + { try { uninitialized_copy( d.begin(), d.end(), begin() ); } + catch ( ... ) { delete store; throw; } } + + ~dynarray() + { for ( size_type i = 0; i < count; ++i ) + (store+i)->~T(); + delete[] store; } + + // iterators: + iterator begin() { return store; } + const_iterator begin() const { return store; } + const_iterator cbegin() const { return store; } + iterator end() { return store + count; } + const_iterator end() const { return store + count; } + const_iterator cend() const { return store + count; } + + reverse_iterator rbegin() + { return reverse_iterator(end()); } + const_reverse_iterator rbegin() const + { return reverse_iterator(end()); } + reverse_iterator rend() + { return reverse_iterator(begin()); } + const_reverse_iterator rend() const + { return reverse_iterator(begin()); } + + // capacity: + size_type size() const { return count; } + size_type max_size() const { return count; } + bool empty() const { return count == 0; } + + // element access: + reference operator[](size_type n) { return store[n]; } + const_reference operator[](size_type n) const { return store[n]; } + + reference front() { return store[0]; } + const_reference front() const { return store[0]; } + reference back() { return store[count-1]; } + const_reference back() const { return store[count-1]; } + + const_reference at(size_type n) const { check(n); return store[n]; } + reference at(size_type n) { check(n); return store[n]; } + + // data access: + T* data() { return store; } + const T* data() const { return store; } +}; + +} // namespace std diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index c51fcff758..f42feb9044 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -2134,6 +2134,26 @@ std::unique_ptr RocksDBEngine::openExistingDatabase( } } +void RocksDBEngine::getStatistics(std::string& result) const { + VPackBuilder stats; + getStatistics(stats); + VPackSlice sslice = stats.slice(); + TRI_ASSERT(sslice.isObject()); + for (auto const& a : VPackObjectIterator(sslice)) { + if (a.value.isNumber()) { + std::string name = a.key.copyString(); + std::replace(name.begin(), name.end(), '.', '_'); + std::replace(name.begin(), name.end(), '-', '_'); + if (name.front() != 'r') { + name = EngineName + "_" + name ; + } + result += "#TYPE " + name + + " counter\n" + "#HELP " + name + " " + name + "\n" + + name + " " + std::to_string(a.value.getNumber()) + "\n"; + } + } +} + void RocksDBEngine::getStatistics(VPackBuilder& builder) const { // add int properties auto addInt = [&](std::string const& s) { diff --git a/arangod/RocksDBEngine/RocksDBEngine.h b/arangod/RocksDBEngine/RocksDBEngine.h index e943cd9a4d..d35a232308 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.h +++ b/arangod/RocksDBEngine/RocksDBEngine.h @@ -160,6 +160,7 @@ class RocksDBEngine final : public StorageEngine { LogicalCollection& collection, velocypack::Slice const& info) override; void getStatistics(velocypack::Builder& builder) const override; + void getStatistics(std::string& result) const override; // inventory functionality // ----------------------- diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 1b0d3a4de2..4aedf9f4ab 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -32,6 +32,7 @@ #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" +#include "RestServer/MetricsFeature.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBEngine.h" @@ -106,7 +107,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { // register with manager transaction::ManagerFeature::manager()->registerTransaction(id(), nullptr, isReadOnlyTransaction()); updateStatus(transaction::Status::RUNNING); - ServerStatistics::statistics()._transactionsStatistics._transactionsStarted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsStarted++; setRegistered(); @@ -397,7 +398,7 @@ Result RocksDBTransactionState::commitTransaction(transaction::Methods* activeTr if (res.ok()) { updateStatus(transaction::Status::COMMITTED); cleanupTransaction(); // deletes trx - ServerStatistics::statistics()._transactionsStatistics._transactionsCommitted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsCommitted++; } else { abortTransaction(activeTrx); // deletes trx } @@ -429,7 +430,7 @@ Result RocksDBTransactionState::abortTransaction(transaction::Methods* activeTrx TRI_ASSERT(!_rocksTransaction && !_cacheTx && !_readSnapshot); } - ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++; unuseCollections(nestingLevel()); return result; } @@ -601,7 +602,7 @@ Result RocksDBTransactionState::triggerIntermediateCommit(bool& hasPerformedInte } hasPerformedIntermediateCommit = true; - ServerStatistics::statistics()._transactionsStatistics._intermediateCommits++; + MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._intermediateCommits++; TRI_IF_FAILURE("FailAfterIntermediateCommit") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); diff --git a/arangod/Statistics/Descriptions.cpp b/arangod/Statistics/Descriptions.cpp index 514ed56285..6f1eebbeaa 100644 --- a/arangod/Statistics/Descriptions.cpp +++ b/arangod/Statistics/Descriptions.cpp @@ -22,6 +22,7 @@ #include "Descriptions.h" #include "Basics/process-utils.h" +#include "RestServer/MetricsFeature.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" #include "Statistics/ConnectionStatistics.h" @@ -410,15 +411,15 @@ stats::Descriptions::Descriptions() } void stats::Descriptions::serverStatistics(velocypack::Builder& b) const { - ServerStatistics const& info = ServerStatistics::statistics(); + ServerStatistics const& info = MetricsFeature::metrics()->serverStatistics(); b.add("uptime", VPackValue(info._uptime)); b.add("physicalMemory", VPackValue(TRI_PhysicalMemory)); b.add("transactions", VPackValue(VPackValueType::Object)); - b.add("started", VPackValue(info._transactionsStatistics._transactionsStarted)); - b.add("aborted", VPackValue(info._transactionsStatistics._transactionsAborted)); - b.add("committed", VPackValue(info._transactionsStatistics._transactionsCommitted)); - b.add("intermediateCommits", VPackValue(info._transactionsStatistics._intermediateCommits)); + b.add("started", VPackValue(info._transactionsStatistics._transactionsStarted.load())); + b.add("aborted", VPackValue(info._transactionsStatistics._transactionsAborted.load())); + b.add("committed", VPackValue(info._transactionsStatistics._transactionsCommitted.load())); + b.add("intermediateCommits", VPackValue(info._transactionsStatistics._intermediateCommits.load())); b.close(); auto& server = application_features::ApplicationServer::server(); diff --git a/arangod/Statistics/ServerStatistics.cpp b/arangod/Statistics/ServerStatistics.cpp index 85e186c6fd..e9107014c8 100644 --- a/arangod/Statistics/ServerStatistics.cpp +++ b/arangod/Statistics/ServerStatistics.cpp @@ -23,25 +23,25 @@ #include "ServerStatistics.h" #include "Statistics/StatisticsFeature.h" +#include "RestServer/MetricsFeature.h" using namespace arangodb; -// ----------------------------------------------------------------------------- -// --SECTION-- static members -// ----------------------------------------------------------------------------- - -ServerStatistics serverStatisticsGlobal(0); +TransactionStatistics::TransactionStatistics() : + _metrics(arangodb::MetricsFeature::metrics()), + _transactionsStarted( + _metrics->counter("arangodb_transactions_started", 0, "Transactions started")), + _transactionsAborted( + _metrics->counter("arangodb_transactions_aborted", 0, "Transactions aborted")), + _transactionsCommitted( + _metrics->counter("arangodb_transactions_committed", 0, "Transactions committed")), + _intermediateCommits( + _metrics->counter("arangodb_intermediate_commits", 0, "Intermediate commits")) {} // ----------------------------------------------------------------------------- // --SECTION-- static public methods // ----------------------------------------------------------------------------- -ServerStatistics& ServerStatistics::statistics() { - //update the uptime for everyone reading the statistics. - serverStatisticsGlobal._uptime = StatisticsFeature::time() - serverStatisticsGlobal._startTime; - return serverStatisticsGlobal; -} - void ServerStatistics::initialize(double startTime) { - serverStatisticsGlobal._startTime = startTime; + _startTime = startTime; } diff --git a/arangod/Statistics/ServerStatistics.h b/arangod/Statistics/ServerStatistics.h index 8c6bf485f7..53875b5f35 100644 --- a/arangod/Statistics/ServerStatistics.h +++ b/arangod/Statistics/ServerStatistics.h @@ -26,15 +26,26 @@ #include #include +#include "RestServer/Metrics.h" + +namespace arangodb { +class MetricsFeature; +} struct TransactionStatistics { - TransactionStatistics() : _transactionsStarted(0), _transactionsAborted(0) - , _transactionsCommitted(0), _intermediateCommits(0) {} - std::atomic _transactionsStarted; - std::atomic _transactionsAborted; - std::atomic _transactionsCommitted; - std::atomic _intermediateCommits; + TransactionStatistics(); + TransactionStatistics(TransactionStatistics const&) = delete; + TransactionStatistics(TransactionStatistics &&) = delete; + TransactionStatistics& operator=(TransactionStatistics const&) = delete; + TransactionStatistics& operator=(TransactionStatistics &&) = delete; + + arangodb::MetricsFeature* _metrics; + + Counter& _transactionsStarted; + Counter& _transactionsAborted; + Counter& _transactionsCommitted; + Counter& _intermediateCommits; }; struct ServerStatistics { @@ -44,14 +55,15 @@ struct ServerStatistics { ServerStatistics& operator=(ServerStatistics const&) = delete; ServerStatistics& operator=(ServerStatistics &&) = delete; - static ServerStatistics& statistics(); - static void initialize(double); + ServerStatistics& statistics(); + void initialize(double); TransactionStatistics _transactionsStatistics; double _startTime; std::atomic _uptime; - explicit ServerStatistics(double start) : _transactionsStatistics(), _startTime(start), _uptime(0.0) {} + explicit ServerStatistics(double start) : + _transactionsStatistics(), _startTime(start), _uptime(0.0) {} }; #endif diff --git a/arangod/Statistics/StatisticsFeature.cpp b/arangod/Statistics/StatisticsFeature.cpp index 3f5057d4cb..7c36d3d02a 100644 --- a/arangod/Statistics/StatisticsFeature.cpp +++ b/arangod/Statistics/StatisticsFeature.cpp @@ -30,6 +30,7 @@ #include "Logger/LoggerStream.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" +#include "RestServer/MetricsFeature.h" #include "RestServer/SystemDatabaseFeature.h" #include "RestServer/DatabaseFeature.h" #include "Statistics/ConnectionStatistics.h" @@ -186,7 +187,7 @@ void StatisticsFeature::prepare() { STATISTICS = this; - ServerStatistics::initialize(StatisticsFeature::time()); + MetricsFeature::metrics()->serverStatistics().initialize(StatisticsFeature::time()); ConnectionStatistics::initialize(); RequestStatistics::initialize(); } diff --git a/arangod/Statistics/StatisticsFeature.h b/arangod/Statistics/StatisticsFeature.h index 5b98ca2c92..c152e8392c 100644 --- a/arangod/Statistics/StatisticsFeature.h +++ b/arangod/Statistics/StatisticsFeature.h @@ -86,6 +86,9 @@ class StatisticsFeature final : public application_features::ApplicationFeature void prepare() override final; void start() override final; void stop() override final; + void toPrometheus(std::string& result, double const& now) { + _statisticsWorker->generateRawStatistics(result, now); + } static stats::Descriptions const* descriptions() { if (STATISTICS != nullptr) { diff --git a/arangod/Statistics/StatisticsWorker.cpp b/arangod/Statistics/StatisticsWorker.cpp index e5ba422351..9ab957e26e 100644 --- a/arangod/Statistics/StatisticsWorker.cpp +++ b/arangod/Statistics/StatisticsWorker.cpp @@ -37,6 +37,7 @@ #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" +#include "RestServer/MetricsFeature.h" #include "RestServer/QueryRegistryFeature.h" #include "RestServer/TtlFeature.h" #include "Scheduler/Scheduler.h" @@ -799,8 +800,8 @@ void StatisticsWorker::avgPercentDistributon(VPackBuilder& builder, VPackSlice c builder.openObject(); builder.add("values", VPackValue(VPackValueType::Array)); - for (auto const& n : result) { - builder.add(VPackValue(n)); + for (auto const& i : result) { + builder.add(VPackValue(i)); } builder.close(); @@ -809,6 +810,257 @@ void StatisticsWorker::avgPercentDistributon(VPackBuilder& builder, VPackSlice c builder.close(); } +std::string const TYPE_("\n\n#TYPE "); +std::string const HELP_("\n#HELP "); + +// local_name: {"prometheus_name", "type", "help"} +std::map> statStrings{ + {"bytesReceived", + {"arangodb_client_connection_statistics_bytes_received_bucket ", "gauge", + "Bytes received for a request.\n"}}, + {"bytesReceivedCount", + {"arangodb_client_connection_statistics_bytes_received_count ", "gauge", + "Bytes received for a request.\n"}}, + {"bytesReceivedSum", + {"arangodb_client_connection_statistics_bytes_received_sum ", "gauge", + "Bytes received for a request.\n"}}, + {"bytesSent", + {"arangodb_client_connection_statistics_bytes_sent_bucket ", "gauge", + "Bytes sent for a request.\n"}}, + {"bytesSentCount", + {"arangodb_client_connection_statistics_bytes_sent_count ", "gauge", + "Bytes sent for a request.\n"}}, + {"bytesSentSum", + {"arangodb_client_connection_statistics_bytes_sent_sum ", "gauge", + "Bytes sent for a request.\n"}}, + {"minorPageFaults", + {"arangodb_process_statistics_minor_page_faults ", "gauge", + "The number of minor faults the process has made which have not required loading a memory page from disk. This figure is not reported on Windows.\n"}}, + {"majorPageFaults", + {"arangodb_process_statistics_major_page_faults ", "gauge", + "On Windows, this figure contains the total number of page faults. On other system, this figure contains the number of major faults the process has made which have required loading a memory page from disk.\n"}}, + {"bytesReceived", + {"arangodb_client_connection_statistics_bytes_received_bucket ", "gauge", + "Bytes received for a request"}}, + {"userTime", + {"arangodb_process_statistics_user_time ", "gauge", + "On Windows, this figure contains the total amount of memory that the memory manager has committed for the arangod process. On other systems, this figure contains The size of the virtual memory the process is using.\n"}}, + {"systemTime", + {"arangodb_process_statistics_system_time ", "gauge", + "Amount of time that this process has been scheduled in kernel mode, measured in seconds.\n"}}, + {"numberOfThreads", + {"arangodb_process_statistics_number_of_threads ", "gauge", + "Number of threads in the arangod process.\n"}}, + {"residentSize", + {"arangodb_process_statistics_resident_set_size ", "gauge", "The total size of the number of pages the process has in real memory. This is just the pages which count toward text, data, or stack space. This does not include pages which have not been demand-loaded in, or which are swapped out. The resident set size is reported in bytes.\n"}}, + {"residentSizePercent", + {"arangodb_process_statistics_resident_set_size_percent ", "gauge", "The relative size of the number of pages the process has in real memory compared to system memory. This is just the pages which count toward text, data, or stack space. This does not include pages which have not been demand-loaded in, or which are swapped out. The value is a ratio between 0.00 and 1.00.\n"}}, + {"virtualSize", + {"arangodb_process_statistics_virtual_memory_size ", "gauge", "On Windows, this figure contains the total amount of memory that the memory manager has committed for the arangod process. On other systems, this figure contains The size of the virtual memory the process is using.\n"}}, + {"clientHttpConnections", + {"arangodb_client_connection_statistics_client_connections ", "guage", + "The number of client connections that are currently open.\n"}}, + {"connectionTimeCounts", + {"arangodb_client_connection_statistics_connection_time_bucket", "gauge", + "Total connection time of a client.\n"}}, + {"connectionTimeCount", + {"arangodb_client_connection_statistics_connection_time_count ", "gauge", + "Total connection time of a client.\n"}}, + {"connectionTimeSum", + {"arangodb_client_connection_statistics_connection_time_sum ", "gauge", + "Total connection time of a client.\n"}} + /*{"", + {"", "", + ""}}*/ +}; + +void StatisticsWorker::generateRawStatistics(std::string& result, double const& now) { + ProcessInfo info = TRI_ProcessInfoSelf(); + uint64_t rss = static_cast(info._residentSize); + double rssp = 0; + + if (TRI_PhysicalMemory != 0) { + rssp = static_cast(rss) / static_cast(TRI_PhysicalMemory); + } + + StatisticsCounter httpConnections; + StatisticsCounter totalRequests; + std::array methodRequests; + StatisticsCounter asyncRequests; + StatisticsDistribution connectionTime; + + ConnectionStatistics::fill(httpConnections, totalRequests, methodRequests, + asyncRequests, connectionTime); + + StatisticsDistribution totalTime; + StatisticsDistribution requestTime; + StatisticsDistribution queueTime; + StatisticsDistribution ioTime; + StatisticsDistribution bytesSent; + StatisticsDistribution bytesReceived; + + RequestStatistics::fill(totalTime, requestTime, queueTime, ioTime, bytesSent, bytesReceived, stats::RequestStatisticsSource::ALL); + + // processStatistics() + result += + TYPE_ + statStrings.at("minorPageFaults")[0] + statStrings.at("minorPageFaults")[1] + + HELP_ + statStrings.at("minorPageFaults")[0] + statStrings.at("minorPageFaults")[2] + + statStrings.at("minorPageFaults")[0] + std::to_string(info._minorPageFaults); + result += + TYPE_ + statStrings.at("majorPageFaults")[0] + statStrings.at("majorPageFaults")[1] + + HELP_ + statStrings.at("majorPageFaults")[0] + statStrings.at("majorPageFaults")[2] + + statStrings.at("majorPageFaults")[0] + std::to_string(info._majorPageFaults); + + if (info._scClkTck != 0) { // prevent division by zero + result += + TYPE_ + statStrings.at("userTime")[0] + statStrings.at("userTime")[1] + + HELP_ + statStrings.at("userTime")[0] + statStrings.at("userTime")[2] + + statStrings.at("userTime")[0] + + std::to_string(static_cast(info._userTime) / static_cast(info._scClkTck)); + result += + TYPE_ + statStrings.at("systemTime")[0] + statStrings.at("systemTime")[1] + + HELP_ + statStrings.at("systemTime")[0] + statStrings.at("systemTime")[2] + + statStrings.at("systemTime")[0] + + std::to_string(static_cast(info._systemTime) / static_cast(info._scClkTck)); + } + + result += + TYPE_ + statStrings.at("numberOfThreads")[0] + statStrings.at("numberOfThreads")[1] + + HELP_ + statStrings.at("numberOfThreads")[0] + statStrings.at("numberOfThreads")[2] + + statStrings.at("numberOfThreads")[0] + std::to_string(info._numberThreads); + result += + TYPE_ + statStrings.at("residentSize")[0] + statStrings.at("residentSize")[1] + + HELP_ + statStrings.at("residentSize")[0] + statStrings.at("residentSize")[2] + + statStrings.at("residentSize")[0] + std::to_string(rss); + result += + TYPE_ + statStrings.at("residentSizePercent")[0] + statStrings.at("residentSizePercent")[1] + + HELP_ + statStrings.at("residentSizePercent")[0] + statStrings.at("residentSizePercent")[2] + + statStrings.at("residentSizePercent")[0] + std::to_string(rssp); + result += + TYPE_ + statStrings.at("virtualSize")[0] + statStrings.at("virtualSize")[1] + + HELP_ + statStrings.at("virtualSize")[0] + statStrings.at("virtualSize")[2] + + statStrings.at("virtualSize")[0] + std::to_string(info._virtualSize); + + + // _clientStatistics() + result += + TYPE_ + statStrings.at("clientHttpConnections")[0] + statStrings.at("clientHttpConnections")[1] + + HELP_ + statStrings.at("clientHttpConnections")[0] + statStrings.at("clientHttpConnections")[2] + + statStrings.at("clientHttpConnections")[0] + std::to_string(httpConnections._count); + + VPackBuilder tmp = fillDistribution(connectionTime); + VPackSlice slc = tmp.slice(); + + result += + TYPE_ + statStrings.at("connectionTimeCounts")[0] + statStrings.at("connectionTimeCounts")[1] + + HELP_ + statStrings.at("connectionTimeCounts")[0] + statStrings.at("connectionTimeCounts")[2] + + statStrings.at("connectionTimeCounts")[0] + "{le=\"0.1\"}" + " " + + std::to_string(slc.get("counts").at(0).getNumber()) + "\n" + + statStrings.at("connectionTimeCounts")[0] + "{le=\"1\"}" + " " + + std::to_string(slc.get("counts").at(1).getNumber()) + "\n" + + statStrings.at("connectionTimeCounts")[0] + "{le=\"60\"}" + " " + + std::to_string(slc.get("counts").at(1).getNumber()) + "\n" + + statStrings.at("connectionTimeCounts")[0] + "{le=\"+Inf\"}" + " " + + std::to_string(slc.get("counts").at(3).getNumber()); + + result += + TYPE_ + statStrings.at("connectionTimeCount")[0] + statStrings.at("connectionTimeCount")[1] + + HELP_ + statStrings.at("connectionTimeCount")[0] + statStrings.at("connectionTimeCount")[2] + + statStrings.at("connectionTimeCount")[0] + std::to_string(slc.get("count").template getNumber()); + + result += + TYPE_ + statStrings.at("connectionTimeSum")[0] + statStrings.at("connectionTimeSum")[1] + + HELP_ + statStrings.at("connectionTimeSum")[0] + statStrings.at("connectionTimeSum")[2] + + statStrings.at("connectionTimeSum")[0] + std::to_string(slc.get("sum").template getNumber()); + + result += "\n"; + + /* + + // _clientStatistics() + + tmp = fillDistribution(totalTime); + builder.add("totalTime", tmp.slice()); + + tmp = fillDistribution(requestTime); + builder.add("requestTime", tmp.slice()); + + tmp = fillDistribution(queueTime); + builder.add("queueTime", tmp.slice()); + + tmp = fillDistribution(ioTime); + builder.add("ioTime", tmp.slice()); + + tmp = fillDistribution(bytesSent); + builder.add("bytesSent", tmp.slice()); + + tmp = fillDistribution(bytesReceived); + builder.add("bytesReceived", tmp.slice()); + builder.close(); + + // _httpStatistics() + builder.add("http", VPackValue(VPackValueType::Object)); + builder.add("requestsTotal", VPackValue(totalRequests._count)); + builder.add("requestsAsync", VPackValue(asyncRequests._count)); + builder.add("requestsGet", + VPackValue(methodRequests[(int)rest::RequestType::GET]._count)); + builder.add("requestsHead", + VPackValue(methodRequests[(int)rest::RequestType::HEAD]._count)); + builder.add("requestsPost", + VPackValue(methodRequests[(int)rest::RequestType::POST]._count)); + builder.add("requestsPut", + VPackValue(methodRequests[(int)rest::RequestType::PUT]._count)); + builder.add("requestsPatch", + VPackValue(methodRequests[(int)rest::RequestType::PATCH]._count)); + builder.add("requestsDelete", + VPackValue(methodRequests[(int)rest::RequestType::DELETE_REQ]._count)); + builder.add("requestsOptions", + VPackValue(methodRequests[(int)rest::RequestType::OPTIONS]._count)); + builder.add("requestsOther", + VPackValue(methodRequests[(int)rest::RequestType::ILLEGAL]._count)); + builder.close(); + + // _serverStatistics() + builder.add("server", VPackValue(VPackValueType::Object)); + builder.add("physicalMemory", VPackValue(TRI_PhysicalMemory)); + builder.add("transactions", VPackValue(VPackValueType::Object)); + builder.close(); + + // export v8 statistics + builder.add("v8Context", VPackValue(VPackValueType::Object)); + V8DealerFeature::Statistics v8Counters{}; + // std::vector memoryStatistics; + // V8 may be turned off on a server + if (_server.hasFeature()) { + V8DealerFeature& dealer = _server.getFeature(); + if (dealer.isEnabled()) { + v8Counters = dealer.getCurrentContextNumbers(); + // see below: memoryStatistics = dealer.getCurrentMemoryNumbers(); + } + } + builder.add("available", VPackValue(v8Counters.available)); + builder.add("busy", VPackValue(v8Counters.busy)); + builder.add("dirty", VPackValue(v8Counters.dirty)); + builder.add("free", VPackValue(v8Counters.free)); + builder.add("max", VPackValue(v8Counters.max)); + builder.close(); + + // export threads statistics + builder.add("threads", VPackValue(VPackValueType::Object)); + SchedulerFeature::SCHEDULER->toVelocyPack(builder); + builder.close(); + + // export ttl statistics + TtlFeature& ttlFeature = _server.getFeature(); + builder.add(VPackValue("ttl")); + ttlFeature.statsToVelocyPack(builder); + + builder.close(); + + builder.close(); + */ +} + void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const& now) { ProcessInfo info = TRI_ProcessInfoSelf(); uint64_t rss = static_cast(info._residentSize); @@ -836,7 +1088,7 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const RequestStatistics::fill(totalTime, requestTime, queueTime, ioTime, bytesSent, bytesReceived, stats::RequestStatisticsSource::ALL); - ServerStatistics const& serverInfo = ServerStatistics::statistics(); + ServerStatistics const& serverInfo = MetricsFeature::metrics()->serverStatistics(); builder.openObject(); if (!_clusterId.empty()) { @@ -851,10 +1103,10 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const builder.add("majorPageFaults", VPackValue(info._majorPageFaults)); if (info._scClkTck != 0) { // prevent division by zero - builder.add("userTime", VPackValue(static_cast(info._userTime) / - static_cast(info._scClkTck))); - builder.add("systemTime", VPackValue(static_cast(info._systemTime) / - static_cast(info._scClkTck))); + builder.add("userTime", VPackValue( + static_cast(info._userTime) / static_cast(info._scClkTck))); + builder.add("systemTime", VPackValue( + static_cast(info._systemTime) / static_cast(info._scClkTck))); } builder.add("numberOfThreads", VPackValue(info._numberThreads)); builder.add("residentSize", VPackValue(rss)); @@ -915,10 +1167,10 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const builder.add("uptime", VPackValue(serverInfo._uptime)); builder.add("physicalMemory", VPackValue(TRI_PhysicalMemory)); builder.add("transactions", VPackValue(VPackValueType::Object)); - builder.add("started", VPackValue(serverInfo._transactionsStatistics._transactionsStarted)); - builder.add("aborted", VPackValue(serverInfo._transactionsStatistics._transactionsAborted)); - builder.add("committed", VPackValue(serverInfo._transactionsStatistics._transactionsCommitted)); - builder.add("intermediateCommits", VPackValue(serverInfo._transactionsStatistics._intermediateCommits)); + builder.add("started", VPackValue(serverInfo._transactionsStatistics._transactionsStarted.load())); + builder.add("aborted", VPackValue(serverInfo._transactionsStatistics._transactionsAborted.load())); + builder.add("committed", VPackValue(serverInfo._transactionsStatistics._transactionsCommitted.load())); + builder.add("intermediateCommits", VPackValue(serverInfo._transactionsStatistics._intermediateCommits.load())); builder.close(); // export v8 statistics diff --git a/arangod/Statistics/StatisticsWorker.h b/arangod/Statistics/StatisticsWorker.h index 1260d2a1c2..9b78578669 100644 --- a/arangod/Statistics/StatisticsWorker.h +++ b/arangod/Statistics/StatisticsWorker.h @@ -41,6 +41,7 @@ class StatisticsWorker final : public Thread { void run() override; void beginShutdown() override; + void generateRawStatistics(std::string& result, double const& now); private: // removes old statistics diff --git a/arangod/StorageEngine/StorageEngine.h b/arangod/StorageEngine/StorageEngine.h index 99bd543239..92093575b1 100644 --- a/arangod/StorageEngine/StorageEngine.h +++ b/arangod/StorageEngine/StorageEngine.h @@ -413,6 +413,8 @@ class StorageEngine : public application_features::ApplicationFeature { builder.close(); } + virtual void getStatistics(std::string& result) const {} + // management methods for synchronizing with external persistent stores virtual TRI_voc_tick_t currentTick() const = 0; virtual TRI_voc_tick_t releasedTick() const = 0; diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index da29242fdc..631b5750f9 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -445,6 +445,7 @@ v8::Handle TRI_RequestCppToV8(v8::Isolate* isolate, optionsWithUniquenessCheck.checkAttributeUniqueness = true; auto parsed = request->payload(&optionsWithUniquenessCheck); if (parsed.isObject() || parsed.isArray()) { + request->setDefaultContentType(); digesteable = true; } } catch ( ... ) {} diff --git a/arangod/V8Server/v8-statistics.cpp b/arangod/V8Server/v8-statistics.cpp index e87daa1d98..ed4c3dc916 100644 --- a/arangod/V8Server/v8-statistics.cpp +++ b/arangod/V8Server/v8-statistics.cpp @@ -28,6 +28,7 @@ #include "Basics/StringUtils.h" #include "Basics/process-utils.h" #include "Rest/GeneralRequest.h" +#include "RestServer/MetricsFeature.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" #include "Statistics/ConnectionStatistics.h" @@ -98,7 +99,7 @@ static void JS_ServerStatistics(v8::FunctionCallbackInfo const& args) TRI_V8_TRY_CATCH_BEGIN(isolate) v8::HandleScope scope(isolate); - ServerStatistics const& info = ServerStatistics::statistics(); + ServerStatistics const& info = MetricsFeature::metrics()->serverStatistics(); v8::Handle result = v8::Object::New(isolate); @@ -111,13 +112,13 @@ static void JS_ServerStatistics(v8::FunctionCallbackInfo const& args) auto const& ts = info._transactionsStatistics; v8::Handle v8TransactionInfoObj = v8::Object::New(isolate); v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "started"), - v8::Number::New(isolate, (double)ts._transactionsStarted)); + v8::Number::New(isolate, (double)ts._transactionsStarted.load())); v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "aborted"), - v8::Number::New(isolate, (double)ts._transactionsAborted)); + v8::Number::New(isolate, (double)ts._transactionsAborted.load())); v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "committed"), - v8::Number::New(isolate, (double)ts._transactionsCommitted)); + v8::Number::New(isolate, (double)ts._transactionsCommitted.load())); v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "intermediateCommits"), - v8::Number::New(isolate, (double)ts._intermediateCommits)); + v8::Number::New(isolate, (double)ts._intermediateCommits.load())); result->Set(TRI_V8_ASCII_STRING(isolate, "transactions"), v8TransactionInfoObj); // v8 counters diff --git a/lib/ApplicationFeatures/GreetingsFeaturePhase.cpp b/lib/ApplicationFeatures/GreetingsFeaturePhase.cpp index 3015007496..e709c378f0 100644 --- a/lib/ApplicationFeatures/GreetingsFeaturePhase.cpp +++ b/lib/ApplicationFeatures/GreetingsFeaturePhase.cpp @@ -51,4 +51,4 @@ GreetingsFeaturePhase::GreetingsFeaturePhase(ApplicationServer& server, bool isC } } // namespace application_features -} // namespace arangodb \ No newline at end of file +} // namespace arangodb diff --git a/lib/Rest/GeneralRequest.h b/lib/Rest/GeneralRequest.h index e15ecec7f6..e99a2c50bb 100644 --- a/lib/Rest/GeneralRequest.h +++ b/lib/Rest/GeneralRequest.h @@ -202,6 +202,7 @@ class GeneralRequest { return std::make_shared(payload()); }; + virtual void setDefaultContentType() = 0; /// @brieg should reflect the Content-Type header ContentType contentType() const { return _contentType; } /// @brief should generally reflect the Accept header diff --git a/lib/Rest/HttpRequest.h b/lib/Rest/HttpRequest.h index d3d426b5cb..1c1cefcf7f 100644 --- a/lib/Rest/HttpRequest.h +++ b/lib/Rest/HttpRequest.h @@ -79,6 +79,9 @@ class HttpRequest final : public GeneralRequest { return _cookies; } + virtual void setDefaultContentType() override { + _contentType = rest::ContentType::JSON; + } /// @brief the body content length size_t contentLength() const override { return _contentLength; } // Payload diff --git a/lib/Rest/VstRequest.h b/lib/Rest/VstRequest.h index 8936c2d18e..29d5fc510a 100644 --- a/lib/Rest/VstRequest.h +++ b/lib/Rest/VstRequest.h @@ -60,6 +60,10 @@ class VstRequest final : public GeneralRequest { arangodb::velocypack::StringRef rawPayload() const override; velocypack::Slice payload(arangodb::velocypack::Options const*) override; + virtual void setDefaultContentType() override { + _contentType = rest::ContentType::VPACK; + } + arangodb::Endpoint::TransportType transportType() override { return arangodb::Endpoint::TransportType::VST; }; diff --git a/tests/Aql/AqlItemBlockTest.cpp b/tests/Aql/AqlItemBlockTest.cpp index 31bffce402..53c8af530b 100644 --- a/tests/Aql/AqlItemBlockTest.cpp +++ b/tests/Aql/AqlItemBlockTest.cpp @@ -351,6 +351,77 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_slices) { } } +TEST_F(AqlItemBlockTest, test_serialization_deserialization_with_ranges) { + SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 3, 2)}; + block->emplaceValue(0, 0, dummyData(4)); + block->emplaceValue(0, 1, dummyData(5)); + block->emplaceValue(1, 0, dummyData(0)); + block->emplaceValue(1, 1, dummyData(1)); + block->emplaceValue(2, 0, dummyData(2)); + block->emplaceValue(2, 1, dummyData(3)); + { + // test range 0->1 + VPackBuilder result; + result.openObject(); + block->toVelocyPack(0, 1, nullptr, result); + ASSERT_TRUE(result.isOpenObject()); + result.close(); + + SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice()); + + // Check exposed attributes + EXPECT_EQ(testee->size(), 1); + EXPECT_EQ(testee->getNrRegs(), block->getNrRegs()); + // check data + compareWithDummy(testee, 0, 0, 4); + compareWithDummy(testee, 0, 1, 5); + + assertShadowRowIndexes(testee, {}); + } + + { + // Test range 1->2 + VPackBuilder result; + result.openObject(); + block->toVelocyPack(1, 2, nullptr, result); + ASSERT_TRUE(result.isOpenObject()); + result.close(); + + SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice()); + + // Check exposed attributes + EXPECT_EQ(testee->size(), 1); + EXPECT_EQ(testee->getNrRegs(), block->getNrRegs()); + // check data + compareWithDummy(testee, 0, 0, 0); + compareWithDummy(testee, 0, 1, 1); + + assertShadowRowIndexes(testee, {}); + } + + { + // Test range 0->2 + VPackBuilder result; + result.openObject(); + block->toVelocyPack(0, 2, nullptr, result); + ASSERT_TRUE(result.isOpenObject()); + result.close(); + + SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice()); + + // Check exposed attributes + EXPECT_EQ(testee->size(), 2); + EXPECT_EQ(testee->getNrRegs(), block->getNrRegs()); + // check data + compareWithDummy(testee, 0, 0, 4); + compareWithDummy(testee, 0, 1, 5); + compareWithDummy(testee, 1, 0, 0); + compareWithDummy(testee, 1, 1, 1); + + assertShadowRowIndexes(testee, {}); + } +} + TEST_F(AqlItemBlockTest, test_serialization_deserialization_input_row) { SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 2, 2)}; block->emplaceValue(0, 0, dummyData(4)); diff --git a/tests/IResearch/RestHandlerMock.h b/tests/IResearch/RestHandlerMock.h index 77837c927d..8ad417b465 100644 --- a/tests/IResearch/RestHandlerMock.h +++ b/tests/IResearch/RestHandlerMock.h @@ -42,6 +42,9 @@ struct GeneralRequestMock: public arangodb::GeneralRequest { ~GeneralRequestMock(); using arangodb::GeneralRequest::addSuffix; virtual size_t contentLength() const override; + virtual void setDefaultContentType() override { + _contentType = arangodb::rest::ContentType::VPACK; + } virtual arangodb::velocypack::StringRef rawPayload() const override; virtual arangodb::velocypack::Slice payload(arangodb::velocypack::Options const* options = &arangodb::velocypack::Options::Defaults) override; virtual arangodb::Endpoint::TransportType transportType() override;