1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into bug-fix/cppcheck

* 'devel' of https://github.com/arangodb/arangodb:
  Enabled modification nodes in spliced subqueries again (#10499)
  Set internal state to DONE when upstream is DONE (#10513)
  Put the write into the IoContext. (#10511)
  fix the default case, if we guess that we have structured post data, actually parse it. (#10508)
  metricsfeature cleanup in unprepare (#10514)
  Updated CHANGELOG (#10509)
  Bug fix/input row serialization (#10502)
  feature/metrics (#10423)
  fix compilation on mac (#10505)
This commit is contained in:
Jan Christoph Uhde 2019-11-25 06:54:28 +01:00
commit 7695403388
46 changed files with 2516 additions and 326 deletions

View File

@ -140,7 +140,7 @@ bool parse_json_config(const irs::string_ref& args,
stream_bytes_type = itr->second;
}
min = std::max(min, size_t(1));
min = std::max(min, decltype(min)(1));
max = std::max(max, min);
options.min_gram = min;

View File

@ -1,6 +1,14 @@
devel
-----
* REMOTE and GATHER no longer make subqueries unsuitable for the
`splice-subqueries` optimization.
* New internal counter and histogram support.
* Add a Prometheus endpoint for metrics, expose new metrics, old statistics
and RocksDB metrics.
* Fixed known issue #509: ArangoSearch index consolidation does not work during creation of a link
on existing collection which may lead to massive file descriptors consumption.

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -72,7 +72,22 @@ Agent::Agent(ApplicationServer& server, config_t const& config)
_agentNeedsWakeup(false),
_compactor(this),
_ready(false),
_preparing(0) {
_preparing(0),
_write_ok(
_server.getFeature<arangodb::MetricsFeature>().counter(
"agency_agent_write_ok", 0, "Agency write ok")),
_write_no_leader(
_server.getFeature<arangodb::MetricsFeature>().counter(
"agency_agent_write_no_leader", 0, "Agency write no leader")),
_read_ok(
_server.getFeature<arangodb::MetricsFeature>().counter(
"agency_agent_read_ok", 0, "Agency write ok")),
_read_no_leader(
_server.getFeature<arangodb::MetricsFeature>().counter(
"agency_agent_read_no_leader", 0, "Agency write no leader")),
_write_hist_msec(
_server.getFeature<arangodb::MetricsFeature>().histogram(
"agency_agent_write_hist", 10, 0., 20., "Agency write histogram [ms]")) {
_state.configure(this);
_constituent.configure(this);
if (size() > 1) {
@ -1124,6 +1139,8 @@ write_ret_t Agent::inquire(query_t const& query) {
/// Write new entries to replicated state and store
write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
using namespace std::chrono;
std::vector<apply_ret_t> applied;
std::vector<index_t> indices;
auto multihost = size() > 1;
@ -1134,6 +1151,7 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
// look at the leaderID.
auto leader = _constituent.leaderID();
if (multihost && leader != id()) {
++_write_no_leader;
return write_ret_t(false, leader);
}
@ -1165,9 +1183,11 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
// Check that we are actually still the leader:
if (!leading()) {
++_write_no_leader;
return write_ret_t(false, NO_LEADER);
}
auto const start = high_resolution_clock::now();
// Apply to spearhead and get indices for log entries
// Avoid keeping lock indefinitely
for (size_t i = 0, l = 0; i < npacks; ++i) {
@ -1182,11 +1202,13 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
// Only leader else redirect
if (multihost && challengeLeadership()) {
resign();
++_write_no_leader;
return write_ret_t(false, NO_LEADER);
}
// Check that we are actually still the leader:
if (!leading()) {
++_write_no_leader;
return write_ret_t(false, NO_LEADER);
}
@ -1197,6 +1219,8 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
auto tmp = _state.logLeaderMulti(chunk, applied, currentTerm);
indices.insert(indices.end(), tmp.begin(), tmp.end());
}
_write_hist_msec.count(
duration<double,std::milli>(high_resolution_clock::now()-start).count());
}
// Maximum log index
@ -1212,6 +1236,7 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
advanceCommitIndex();
}
++_write_ok;
return write_ret_t(true, id(), applied, indices);
}
@ -1223,6 +1248,7 @@ read_ret_t Agent::read(query_t const& query) {
// look at the leaderID.
auto leader = _constituent.leaderID();
if (leader != id()) {
++_read_no_leader;
return read_ret_t(false, leader);
}
@ -1236,10 +1262,12 @@ read_ret_t Agent::read(query_t const& query) {
// Only leader else redirect
if (challengeLeadership()) {
resign();
++_read_no_leader;
return read_ret_t(false, NO_LEADER);
}
leader = _constituent.leaderID();
auto result = std::make_shared<arangodb::velocypack::Builder>();
READ_LOCKER(oLocker, _outputLock);
@ -1247,6 +1275,7 @@ read_ret_t Agent::read(query_t const& query) {
// Retrieve data from readDB
std::vector<bool> success = _readDB.read(query, result);
++_read_no_leader;
return read_ret_t(true, leader, std::move(success), std::move(result));
}
@ -1254,6 +1283,7 @@ read_ret_t Agent::read(query_t const& query) {
void Agent::run() {
// Only run in case we are in multi-host mode
while (!this->isStopping() && size() > 1) {
{
// We set the variable to false here, if any change happens during
// or after the calls in this loop, this will be set to true to

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -36,6 +36,7 @@
#include "Agency/Supervision.h"
#include "Basics/ConditionLocker.h"
#include "Basics/ReadWriteLock.h"
#include "RestServer/MetricsFeature.h"
struct TRI_vocbase_t;
@ -475,8 +476,16 @@ class Agent final : public arangodb::Thread, public AgentInterface {
// lock for _ongoingTrxs
arangodb::Mutex _trxsLock;
Counter& _write_ok;
Counter& _write_no_leader;
Counter& _read_ok;
Counter& _read_no_leader;
Histogram<double>& _write_hist_msec;
};
} // namespace consensus
} // namespace arangodb
#endif

View File

@ -453,7 +453,7 @@ SharedAqlItemBlockPtr AqlItemBlock::slice(size_t from, size_t to) const {
AqlValue const& a(_data[getAddress(row, col)]);
::CopyValueOver(cache, a, row - from, col, res);
}
copySubQueryDepthToOtherBlock(res, row, row - from);
res->copySubQueryDepthFromOtherBlock(row - from, *this, row);
}
return res;
@ -476,7 +476,7 @@ SharedAqlItemBlockPtr AqlItemBlock::slice(size_t row,
AqlValue const& a(_data[getAddress(row, col)]);
::CopyValueOver(cache, a, 0, col, res);
}
copySubQueryDepthToOtherBlock(res, row, 0);
res->copySubQueryDepthFromOtherBlock(0, *this, row);
return res;
}
@ -492,12 +492,14 @@ SharedAqlItemBlockPtr AqlItemBlock::slice(std::vector<size_t> const& chosen,
SharedAqlItemBlockPtr res{_manager.requestBlock(to - from, _nrRegs)};
for (size_t row = from; row < to; row++) {
size_t resultRowIdx = 0;
for (size_t chosenIdx = from; chosenIdx < to; ++chosenIdx, ++resultRowIdx) {
size_t const rowIdx = chosen[chosenIdx];
for (RegisterId col = 0; col < _nrRegs; col++) {
AqlValue const& a(_data[getAddress(chosen[row], col)]);
::CopyValueOver(cache, a, row - from, col, res);
AqlValue const& a = _data[getAddress(rowIdx, col)];
::CopyValueOver(cache, a, resultRowIdx, col, res);
}
copySubQueryDepthToOtherBlock(res, row, row - from);
res->copySubQueryDepthFromOtherBlock(resultRowIdx, *this, rowIdx);
}
return res;
@ -535,6 +537,13 @@ SharedAqlItemBlockPtr AqlItemBlock::steal(std::vector<size_t> const& chosen,
return res;
}
/// @brief toJson, transfer all rows of this AqlItemBlock to Json, the result
/// can be used to recreate the AqlItemBlock via the Json constructor
void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions,
VPackBuilder& result) const {
return toVelocyPack(0, size(), trxOptions, result);
}
/// @brief toJson, transfer a whole AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// Here is a description of the data format: The resulting Json has
@ -568,7 +577,15 @@ SharedAqlItemBlockPtr AqlItemBlock::steal(std::vector<size_t> const& chosen,
/// corresponding position
/// "raw": List of actual values, positions 0 and 1 are always null
/// such that actual indices start at 2
void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPackBuilder& result) const {
void AqlItemBlock::toVelocyPack(size_t from, size_t to,
velocypack::Options const* const trxOptions,
VPackBuilder& result) const {
// Can only have positive slice size
TRI_ASSERT(from < to);
// We cannot slice over the upper bound.
// The lower bound (0) is protected by unsigned number type
TRI_ASSERT(to <= _nrItems);
TRI_ASSERT(result.isOpenObject());
VPackOptions options(VPackOptions::Defaults);
options.buildUnindexedArrays = true;
@ -580,7 +597,7 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa
raw.add(VPackValue(VPackValueType::Null));
raw.add(VPackValue(VPackValueType::Null));
result.add("nrItems", VPackValue(_nrItems));
result.add("nrItems", VPackValue(to - from));
result.add("nrRegs", VPackValue(_nrRegs));
result.add(StaticStrings::Error, VPackValue(false));
@ -639,7 +656,7 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa
startRegister = 1;
}
for (RegisterId column = startRegister; column < internalNrRegs(); column++) {
for (size_t i = 0; i < _nrItems; i++) {
for (size_t i = from; i < to; i++) {
AqlValue const& a(_data[i * internalNrRegs() + column]);
// determine current state
@ -701,7 +718,8 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa
result.add("raw", raw.slice());
}
void AqlItemBlock::rowToSimpleVPack(size_t const row, velocypack::Options const* options, arangodb::velocypack::Builder& builder) const {
void AqlItemBlock::rowToSimpleVPack(size_t const row, velocypack::Options const* options,
arangodb::velocypack::Builder& builder) const {
VPackArrayBuilder rowBuilder{&builder};
if (isShadowRow(row)) {
@ -732,13 +750,14 @@ ResourceMonitor& AqlItemBlock::resourceMonitor() noexcept {
return *_manager.resourceMonitor();
}
void AqlItemBlock::copySubQueryDepthToOtherBlock(SharedAqlItemBlockPtr& target,
size_t sourceRow, size_t targetRow) const {
if (isShadowRow(sourceRow)) {
AqlValue const& d = getShadowRowDepth(sourceRow);
void AqlItemBlock::copySubQueryDepthFromOtherBlock(size_t const targetRow,
AqlItemBlock const& source,
size_t const sourceRow) {
if (source.isShadowRow(sourceRow)) {
AqlValue const& d = source.getShadowRowDepth(sourceRow);
// Value set, copy it over
TRI_ASSERT(!d.requiresDestruction());
target->setShadowRowDepth(targetRow, d);
setShadowRowDepth(targetRow, d);
}
}
@ -965,3 +984,27 @@ void AqlItemBlock::copySubqueryDepth(size_t currentRow, size_t fromRow) {
_data[currentAddress] = _data[fromAddress];
}
}
size_t AqlItemBlock::moveOtherBlockHere(size_t const targetRow, AqlItemBlock& source) {
TRI_ASSERT(targetRow + source.size() <= this->size());
TRI_ASSERT(getNrRegs() == source.getNrRegs());
auto const n = source.size();
auto const nrRegs = getNrRegs();
size_t thisRow = targetRow;
for (size_t sourceRow = 0; sourceRow < n; ++sourceRow, ++thisRow) {
for (RegisterId col = 0; col < nrRegs; ++col) {
// copy over value
AqlValue const& a = source.getValueReference(sourceRow, col);
if (!a.isEmpty()) {
setValue(thisRow, col, a);
}
}
copySubQueryDepthFromOtherBlock(thisRow, source, sourceRow);
}
source.eraseAll();
TRI_ASSERT(thisRow == targetRow + n);
return targetRow + n;
}

View File

@ -206,10 +206,18 @@ class AqlItemBlock {
/// to which our AqlValues point will vanish.
SharedAqlItemBlockPtr steal(std::vector<size_t> const& chosen, size_t from, size_t to);
/// @brief toJson, transfer a whole AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// @brief toJson, transfer all rows of this AqlItemBlock to Json, the result
/// can be used to recreate the AqlItemBlock via the Json constructor
void toVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const;
/// @brief toJson, transfer a slice of this AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// The slice will be starting at line `from` (including) and end at line `to` (excluding).
/// Only calls with 0 <= from < to <= this.size() are allowed.
/// If you want to transfer the full block, use from == 0, to == this.size()
void toVelocyPack(size_t from, size_t to, velocypack::Options const*,
arangodb::velocypack::Builder&) const;
/// @brief Creates a human-readable velocypack of the block. Adds an object
/// `{nrItems, nrRegs, matrix}` to the builder.
///
@ -250,6 +258,16 @@ class AqlItemBlock {
/// @brief Quick test if we have any ShadowRows within this block;
bool hasShadowRows() const noexcept;
/// @brief Moves all values *from* source *to* this block.
/// Returns the row index of the last written row plus one (may equal size()).
/// Expects size() - targetRow >= source->size(); and, of course, an equal
/// number of registers.
/// The source block will be cleared after this.
size_t moveOtherBlockHere(size_t targetRow, AqlItemBlock& source);
void copySubQueryDepthFromOtherBlock(size_t targetRow, AqlItemBlock const& source,
size_t sourceRow);
protected:
AqlItemBlockManager& aqlItemBlockManager() noexcept;
size_t getRefCount() const noexcept;
@ -267,9 +285,6 @@ class AqlItemBlock {
void copySubqueryDepth(size_t currentRow, size_t fromRow);
void copySubQueryDepthToOtherBlock(SharedAqlItemBlockPtr& target,
size_t sourceRow, size_t targetRow) const;
private:
/// @brief _data, the actual data as a single vector of dimensions _nrItems
/// times _nrRegs

View File

@ -51,23 +51,12 @@ SharedAqlItemBlockPtr itemBlock::concatenate(AqlItemBlockManager& manager,
TRI_ASSERT(totalSize > 0);
TRI_ASSERT(nrRegs > 0);
auto res = manager.requestBlock(totalSize, nrRegs);
auto resultBlock = manager.requestBlock(totalSize, nrRegs);
size_t pos = 0;
for (auto& it : blocks) {
size_t const n = it->size();
for (size_t row = 0; row < n; ++row) {
for (RegisterId col = 0; col < nrRegs; ++col) {
// copy over value
AqlValue const& a = it->getValueReference(row, col);
if (!a.isEmpty()) {
res->setValue(pos + row, col, a);
}
}
}
it->eraseAll();
pos += n;
size_t nextFreeRow = 0;
for (auto& inputBlock : blocks) {
nextFreeRow = resultBlock->moveOtherBlockHere(nextFreeRow, *inputBlock);
}
return res;
return resultBlock;
}

View File

@ -166,6 +166,7 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::g
}
if (newBlock == nullptr) {
TRI_ASSERT(state == ExecutionState::DONE);
_state = InternalState::DONE;
// _rowFetcher must be DONE now already
return {state, nullptr};
}

View File

@ -119,135 +119,17 @@ SharedAqlItemBlockPtr InputAqlItemRow::cloneToBlock(AqlItemBlockManager& manager
/// corresponding position
/// "raw": List of actual values, positions 0 and 1 are always null
/// such that actual indices start at 2
void InputAqlItemRow::toVelocyPack(velocypack::Options const* const trxOptions, VPackBuilder& result) const {
void InputAqlItemRow::toVelocyPack(velocypack::Options const* const trxOptions,
VPackBuilder& result) const {
TRI_ASSERT(isInitialized());
TRI_ASSERT(result.isOpenObject());
VPackOptions options(VPackOptions::Defaults);
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
_block->toVelocyPack(_baseIndex, _baseIndex + 1, trxOptions, result);
}
VPackBuilder raw(&options);
raw.openArray();
// Two nulls in the beginning such that indices start with 2
raw.add(VPackValue(VPackValueType::Null));
raw.add(VPackValue(VPackValueType::Null));
result.add("nrItems", VPackValue(1));
result.add("nrRegs", VPackValue(getNrRegisters()));
result.add(StaticStrings::Error, VPackValue(false));
enum State {
Empty, // saw an empty value
Range, // saw a range value
Next, // saw a previously unknown value
Positional, // saw a value previously encountered
};
std::unordered_map<AqlValue, size_t> table; // remember duplicates
size_t lastTablePos = 0;
State lastState = Positional;
State currentState = Positional;
size_t runLength = 0;
size_t tablePos = 0;
result.add("data", VPackValue(VPackValueType::Array));
// write out data buffered for repeated "empty" or "next" values
auto writeBuffered = [](State lastState, size_t lastTablePos,
VPackBuilder& result, size_t runLength) {
if (lastState == Range) {
return;
}
if (lastState == Positional) {
if (lastTablePos >= 2) {
if (runLength == 1) {
result.add(VPackValue(lastTablePos));
} else {
result.add(VPackValue(-4));
result.add(VPackValue(runLength));
result.add(VPackValue(lastTablePos));
}
}
} else {
TRI_ASSERT(lastState == Empty || lastState == Next);
if (runLength == 1) {
// saw exactly one value
result.add(VPackValue(lastState == Empty ? 0 : 1));
} else {
// saw multiple values
result.add(VPackValue(lastState == Empty ? -1 : -3));
result.add(VPackValue(runLength));
}
}
};
size_t pos = 2; // write position in raw
// We use column == 0 to simulate a shadowRow
// this is only relevant if all participants can use shadow rows
RegisterId startRegister = 0;
if (block().getFormatType() == SerializationFormat::CLASSIC) {
// Skip over the shadowRows
startRegister = 1;
}
for (RegisterId column = startRegister; column < getNrRegisters() + 1; column++) {
AqlValue const& a = column == 0 ? AqlValue{} : getValue(column - 1);
// determine current state
if (a.isEmpty()) {
currentState = Empty;
} else if (a.isRange()) {
currentState = Range;
} else {
auto it = table.find(a);
if (it == table.end()) {
currentState = Next;
a.toVelocyPack(trxOptions, raw, false);
table.try_emplace(a, pos++);
} else {
currentState = Positional;
tablePos = it->second;
TRI_ASSERT(tablePos >= 2);
if (lastState != Positional) {
lastTablePos = tablePos;
}
}
}
// handle state change
if (currentState != lastState || (currentState == Positional && tablePos != lastTablePos)) {
// write out remaining buffered data in case of a state change
writeBuffered(lastState, lastTablePos, result, runLength);
lastTablePos = 0;
lastState = currentState;
runLength = 0;
}
switch (currentState) {
case Empty:
case Next:
case Positional:
++runLength;
lastTablePos = tablePos;
break;
case Range:
result.add(VPackValue(-2));
result.add(VPackValue(a.range()->_low));
result.add(VPackValue(a.range()->_high));
break;
}
}
// write out any remaining buffered data
writeBuffered(lastState, lastTablePos, result, runLength);
result.close(); // closes "data"
raw.close();
result.add("raw", raw.slice());
void InputAqlItemRow::toSimpleVelocyPack(velocypack::Options const* trxOpts,
arangodb::velocypack::Builder& result) const {
TRI_ASSERT(_block != nullptr);
_block->rowToSimpleVPack(_baseIndex, trxOpts, result);
}
InputAqlItemRow::InputAqlItemRow(SharedAqlItemBlockPtr const& block, size_t baseIndex)
@ -280,7 +162,9 @@ AqlValue InputAqlItemRow::stealValue(RegisterId registerId) {
return a;
}
RegisterCount InputAqlItemRow::getNrRegisters() const noexcept { return block().getNrRegs(); }
RegisterCount InputAqlItemRow::getNrRegisters() const noexcept {
return block().getNrRegs();
}
bool InputAqlItemRow::operator==(InputAqlItemRow const& other) const noexcept {
return this->_block == other._block && this->_baseIndex == other._baseIndex;

View File

@ -36,7 +36,7 @@ namespace arangodb {
namespace velocypack {
class Builder;
struct Options;
}
} // namespace velocypack
namespace aql {
class AqlItemBlock;
@ -133,6 +133,8 @@ class InputAqlItemRow {
/// Uses the same API as an AqlItemBlock with only a single row
void toVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const;
void toSimpleVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const;
private:
AqlItemBlock& block() noexcept;

View File

@ -7282,11 +7282,6 @@ void arangodb::aql::parallelizeGatherRule(Optimizer* opt, std::unique_ptr<Execut
namespace {
bool nodeMakesThisQueryLevelUnsuitableForSubquerySplicing(ExecutionNode const* const node) {
// TODO Enable modification nodes again, as soon as the corresponding branch
// is merged. Fix them in the switch statement below, too!
if (node->isModificationNode()) {
return true;
}
switch (node->getType()) {
case ExecutionNode::CALCULATION:
case ExecutionNode::SUBQUERY:
@ -7306,6 +7301,11 @@ bool nodeMakesThisQueryLevelUnsuitableForSubquerySplicing(ExecutionNode const* c
case ExecutionNode::GATHER:
case ExecutionNode::REMOTE:
case ExecutionNode::REMOTESINGLE:
case ExecutionNode::INSERT:
case ExecutionNode::REMOVE:
case ExecutionNode::REPLACE:
case ExecutionNode::UPDATE:
case ExecutionNode::UPSERT:
case ExecutionNode::MATERIALIZE:
case ExecutionNode::DISTRIBUTE_CONSUMER:
case ExecutionNode::SUBQUERY_START:
@ -7323,16 +7323,6 @@ bool nodeMakesThisQueryLevelUnsuitableForSubquerySplicing(ExecutionNode const* c
// Collect nodes skip iff using the COUNT method.
return collectNode->aggregationMethod() == CollectOptions::CollectMethod::COUNT;
}
// TODO Enable modification nodes again, as soon as the corresponding branch
// is merged.
case ExecutionNode::INSERT:
case ExecutionNode::REMOVE:
case ExecutionNode::REPLACE:
case ExecutionNode::UPDATE:
case ExecutionNode::UPSERT:
// These should already have been handled
TRI_ASSERT(false);
return true;
case ExecutionNode::MAX_NODE_TYPE_VALUE:
break;
}

View File

@ -83,7 +83,6 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
}
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecutor>::getSomeWithoutTrace(size_t atMost) {
// silence tests -- we need to introduce new failure tests for fetchers
TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome1") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
@ -167,7 +166,6 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSome(s
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWithoutTrace(size_t atMost) {
std::unique_lock<std::mutex> guard(_communicationMutex);
if (_requestInFlight) {
@ -297,6 +295,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
VPackBuilder builder(buffer, &options);
builder.openObject(/*unindexed*/ true);
// Required for 3.5.* and earlier, dropped in 3.6.0
builder.add("exhausted", VPackValue(false));
// Used in 3.4.0 onwards
builder.add("done", VPackValue(false));
builder.add(StaticStrings::Code, VPackValue(TRI_ERROR_NO_ERROR));
@ -306,7 +306,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
builder.add("pos", VPackValue(0));
builder.add(VPackValue("items"));
builder.openObject(/*unindexed*/ true);
input.toVelocyPack(_engine->getQuery()->trx()->transactionContextPtr()->getVPackOptions(), builder);
input.toVelocyPack(_engine->getQuery()->trx()->transactionContextPtr()->getVPackOptions(),
builder);
builder.close();
builder.close();
@ -324,7 +325,6 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
/// @brief shutdown, will be called exactly once for the whole query
std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(int errorCode) {
// this should make the whole thing idempotent
if (!_isResponsibleForInitializeCursor) {
// do nothing...
@ -362,7 +362,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
}
if (_lastError.fail()) {
// _didReceiveShutdownRequest = true;
// _didReceiveShutdownRequest = true;
TRI_ASSERT(_lastResponse == nullptr);
Result res = std::move(_lastError);
@ -455,7 +455,8 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
res = VelocyPackHelper::getNumericValue(slice, StaticStrings::ErrorNum, res);
VPackStringRef ref =
VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage,
VPackStringRef("(no valid error in response)"));
VPackStringRef(
"(no valid error in response)"));
msg.append(ref.data(), ref.size());
}
}
@ -468,7 +469,6 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type,
std::string const& urlPart,
VPackBuffer<uint8_t>&& body) {
NetworkFeature const& nf =
_engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
@ -485,7 +485,7 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
TRI_ASSERT(!spec.endpoint.empty());
auto req = fuerte::createRequest(type, fuerte::ContentType::VPack);
req->header.database =_query.vocbase().name();
req->header.database = _query.vocbase().name();
req->header.path = urlPart + _queryId;
req->addVPack(std::move(body));
@ -499,12 +499,9 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
_requestInFlight = true;
auto ticket = generateRequestTicket();
conn->sendRequest(std::move(req),
[this, ticket, spec,
sqs = _query.sharedState()](fuerte::Error err,
std::unique_ptr<fuerte::Request> req,
conn->sendRequest(std::move(req), [this, ticket, spec, sqs = _query.sharedState()](
fuerte::Error err, std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
// `this` is only valid as long as sharedState is valid.
// So we must execute this under sharedState's mutex.
sqs->executeAndWakeup([&] {
@ -559,6 +556,10 @@ void ExecutionBlockImpl<RemoteExecutor>::traceRequest(char const* const rpc,
LOG_TOPIC("92c71", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] remote request sent: " << rpc
<< (args.empty() ? "" : " ") << args << " registryId=" << remoteQueryId;
if (_profile >= PROFILE_LEVEL_TRACE_2) {
LOG_TOPIC("e0ae6", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] data: " << slice.toJson();
}
}
}

View File

@ -460,7 +460,13 @@ set(LIB_ARANGO_REPLICATION_SOURCES
Replication/TailingSyncer.cpp
Replication/common-defines.cpp
Replication/utilities.cpp
)
)
set (LIB_ARANGO_METRICS_SOURCES
RestServer/Metrics.cpp
RestServer/MetricsFeature.cpp
RestHandler/RestMetricsHandler.cpp
)
set(LIB_ARANGO_AGENCY_SOURCES
Agency/AgentConfiguration.cpp
@ -763,6 +769,10 @@ add_library(arango_replication STATIC
${LIB_ARANGO_REPLICATION_SOURCES}
)
add_library(arango_metrics STATIC
${LIB_ARANGO_METRICS_SOURCES}
)
add_library(arango_agency STATIC
${LIB_ARANGO_AGENCY_SOURCES}
)
@ -820,6 +830,7 @@ target_include_directories(llhttp PUBLIC "${PROJECT_SOURCE_DIR}/3rdParty/llhttp/
target_link_libraries(arango_agency arango)
target_link_libraries(arango_agency arango_iresearch)
target_link_libraries(arango_agency arango_metrics)
target_link_libraries(arango_aql arango_geo)
target_link_libraries(arango_aql arango_graph)
@ -849,6 +860,8 @@ target_link_libraries(arango_indexes boost_boost)
target_link_libraries(arango_iresearch arango_indexes)
target_link_libraries(arango_iresearch arango_cluster_engine)
target_link_libraries(arango_metrics arango)
target_link_libraries(arango_mmfiles arango_geo)
target_link_libraries(arango_mmfiles arango_indexes)
target_link_libraries(arango_mmfiles arango_storage_engine_common)
@ -910,6 +923,7 @@ target_link_libraries(arangoserver arango_geo)
target_link_libraries(arangoserver arango_graph)
target_link_libraries(arangoserver arango_indexes)
target_link_libraries(arangoserver arango_iresearch)
target_link_libraries(arangoserver arango_metrics)
target_link_libraries(arangoserver arango_network)
target_link_libraries(arangoserver arango_pregel)
target_link_libraries(arangoserver arango_replication)
@ -990,6 +1004,7 @@ foreach(TARGET
arango_common_rest_handler
arango_graph
arango_indexes
arango_metrics
arango_mmfiles
arango_pregel
arango_replication

View File

@ -29,7 +29,7 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Statistics/ServerStatistics.h"
#include "RestServer/MetricsFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/TransactionCollection.h"
#include "Transaction/Manager.h"
@ -61,7 +61,7 @@ Result ClusterTransactionState::beginTransaction(transaction::Hints hints) {
auto cleanup = scopeGuard([&] {
if (nestingLevel() == 0) {
updateStatus(transaction::Status::ABORTED);
ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++;
}
// free what we have got so far
unuseCollections(nestingLevel());
@ -75,7 +75,7 @@ Result ClusterTransactionState::beginTransaction(transaction::Hints hints) {
// all valid
if (nestingLevel() == 0) {
updateStatus(transaction::Status::RUNNING);
ServerStatistics::statistics()._transactionsStatistics._transactionsStarted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsStarted++;
transaction::ManagerFeature::manager()->registerTransaction(id(), nullptr, isReadOnlyTransaction());
setRegistered();
@ -122,7 +122,7 @@ Result ClusterTransactionState::commitTransaction(transaction::Methods* activeTr
arangodb::Result res;
if (nestingLevel() == 0) {
updateStatus(transaction::Status::COMMITTED);
ServerStatistics::statistics()._transactionsStatistics._transactionsCommitted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsCommitted++;
}
unuseCollections(nestingLevel());
@ -136,7 +136,7 @@ Result ClusterTransactionState::abortTransaction(transaction::Methods* activeTrx
Result res;
if (nestingLevel() == 0) {
updateStatus(transaction::Status::ABORTED);
ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++;
}
unuseCollections(nestingLevel());

View File

@ -74,6 +74,7 @@
#include "RestHandler/RestImportHandler.h"
#include "RestHandler/RestIndexHandler.h"
#include "RestHandler/RestJobHandler.h"
#include "RestHandler/RestMetricsHandler.h"
#include "RestHandler/RestPleaseUpgradeHandler.h"
#include "RestHandler/RestPregelHandler.h"
#include "RestHandler/RestQueryCacheHandler.h"
@ -437,6 +438,7 @@ void GeneralServerFeature::defineHandlers() {
agency.agent());
}
if (cluster.isEnabled()) {
// add "/agency-callbacks" handler
_handlerFactory->addPrefixHandler(
@ -532,6 +534,9 @@ void GeneralServerFeature::defineHandlers() {
_handlerFactory->addHandler("/_admin/statistics",
RestHandlerCreator<arangodb::RestAdminStatisticsHandler>::createNoData);
_handlerFactory->addHandler("/_admin/metrics",
RestHandlerCreator<arangodb::RestMetricsHandler>::createNoData);
_handlerFactory->addHandler("/_admin/statistics-description",
RestHandlerCreator<arangodb::RestAdminStatisticsHandler>::createNoData);

View File

@ -669,7 +669,7 @@ void HttpCommTask<T>::sendResponse(std::unique_ptr<GeneralResponse> baseRes,
}
// TODO lease buffers
auto header = std::make_unique<VPackBuffer<uint8_t>>();
auto header = std::make_shared<VPackBuffer<uint8_t>>();
header->reserve(220);
header->append(TRI_CHAR_LENGTH_PAIR("HTTP/1.1 "));
@ -786,7 +786,7 @@ void HttpCommTask<T>::sendResponse(std::unique_ptr<GeneralResponse> baseRes,
header->append(std::to_string(len));
header->append("\r\n\r\n", 4);
std::unique_ptr<basics::StringBuffer> body = response.stealBody();
std::shared_ptr<basics::StringBuffer> body = response.stealBody();
// append write buffer and statistics
double const totalTime = RequestStatistics::ELAPSED_SINCE_READ_START(stat);
@ -796,17 +796,19 @@ void HttpCommTask<T>::sendResponse(std::unique_ptr<GeneralResponse> baseRes,
<< "\",\"" << GeneralRequest::translateMethod(::llhttpToRequestType(&_parser)) << "\",\""
<< static_cast<int>(response.responseCode()) << "\"," << Logger::FIXED(totalTime, 6);
RequestStatistics::SET_WRITE_START(stat);
this->_protocol->context.io_context.post([this, self = this->shared_from_this(), header = std::move(header), body = std::move(body), stat] () mutable {
std::array<asio_ns::const_buffer, 2> buffers;
buffers[0] = asio_ns::buffer(header->data(), header->size());
if (HTTP_HEAD != _parser.method) {
buffers[1] = asio_ns::buffer(body->data(), body->size());
TRI_ASSERT(len == body->size());
}
RequestStatistics::SET_WRITE_START(stat);
// FIXME measure performance w/o sync write
asio_ns::async_write(this->_protocol->socket, buffers,
[self = CommTask::shared_from_this(),
[self = std::move(self),
h = std::move(header),
b = std::move(body),
stat](asio_ns::error_code ec, size_t nwrite) {
@ -829,6 +831,7 @@ void HttpCommTask<T>::sendResponse(std::unique_ptr<GeneralResponse> baseRes,
stat->release();
}
});
});
}
template <SocketType T>

View File

@ -323,7 +323,7 @@ bool parse_ngram_vpack_config(const irs::string_ref& args, irs::analysis::ngram_
options.stream_bytes_type = itr->second;
}
min = std::max(min, size_t(1));
min = std::max(min, decltype(min)(1));
max = std::max(max, min);
options.min_gram = min;

View File

@ -33,6 +33,7 @@
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/MMFilesPersistentIndexFeature.h"
#include "MMFiles/MMFilesTransactionCollection.h"
#include "RestServer/MetricsFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionCollection.h"
@ -126,14 +127,14 @@ Result MMFilesTransactionState::beginTransaction(transaction::Hints hints) {
// all valid
if (nestingLevel() == 0) {
updateStatus(transaction::Status::RUNNING);
ServerStatistics::statistics()._transactionsStatistics._transactionsStarted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsStarted++;
// defer writing of the begin marker until necessary!
}
} else {
// something is wrong
if (nestingLevel() == 0) {
updateStatus(transaction::Status::ABORTED);
ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++;
}
// free what we have got so far
@ -174,7 +175,7 @@ Result MMFilesTransactionState::commitTransaction(transaction::Methods* activeTr
}
updateStatus(transaction::Status::COMMITTED);
ServerStatistics::statistics()._transactionsStatistics._transactionsCommitted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsCommitted++;
// if a write query, clear the query cache for the participating collections
if (AccessMode::isWriteOrExclusive(_type) && !_collections.empty() &&
@ -203,7 +204,7 @@ Result MMFilesTransactionState::abortTransaction(transaction::Methods* activeTrx
result.reset(res);
updateStatus(transaction::Status::ABORTED);
ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++;
if (_hasOperations) {
// must clean up the query cache because the transaction

View File

@ -0,0 +1,68 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "RestMetricsHandler.h"
#include "Agency/AgencyComm.h"
#include "Agency/AgencyFeature.h"
#include "Agency/Agent.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/ServerSecurityFeature.h"
#include "Rest/Version.h"
#include "RestServer/ServerFeature.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::rest;
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoDB server
////////////////////////////////////////////////////////////////////////////////
RestMetricsHandler::RestMetricsHandler(
application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {}
RestStatus RestMetricsHandler::execute() {
auto& server = application_features::ApplicationServer::server();
ServerSecurityFeature& security = server.getFeature<ServerSecurityFeature>();
if (!security.canAccessHardenedApi()) {
// dont leak information about server internals here
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN);
return RestStatus::DONE;
}
MetricsFeature& metrics = server.getFeature<MetricsFeature>();
std::string result;
metrics.toPrometheus(result);
_response->setResponseCode(rest::ResponseCode::OK);
_response->setContentType(rest::ContentType::TEXT);
_response->addRawPayload(VPackStringRef(result));
return RestStatus::DONE;
}

View File

@ -0,0 +1,45 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_REST_HANDLER_REST_METRICS_HANDLER_H
#define ARANGOD_REST_HANDLER_REST_METRICS_HANDLER_H 1
#include "RestHandler/RestBaseHandler.h"
#include "RestServer/MetricsFeature.h"
namespace arangodb {
class RestMetricsHandler : public arangodb::RestBaseHandler {
public:
RestMetricsHandler(application_features::ApplicationServer&, GeneralRequest*,
GeneralResponse*);
char const* name() const override final { return "RestMetricsHandler"; }
RequestLane lane() const override final { return RequestLane::CLIENT_SLOW; }
RestStatus execute() override;
};
} // namespace arangodb
#endif

View File

@ -0,0 +1,98 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "Metrics.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Basics/debugging.h"
#include <type_traits>
using namespace arangodb;
std::ostream& operator<< (std::ostream& o, Metrics::counter_type const& s) {
o << s.load();
return o;
}
std::ostream& operator<< (std::ostream& o, Counter const& s) {
o << s.load();
return o;
}
std::ostream& operator<< (std::ostream& o, Metrics::hist_type const& v) {
o << "[";
for (size_t i = 0; i < v.size(); ++i) {
if (i > 0) { o << ", "; }
o << v.load(i);
}
o << "]";
return o;
}
Metric::Metric(std::string const& name, std::string const& help)
: _name(name), _help(help) {};
Metric::~Metric() {}
std::string const& Metric::help() const { return _help; }
std::string const& Metric::name() const { return _name; }
Counter& Counter::operator++() {
count();
return *this;
}
Counter& Counter::operator++(int n) {
count(1);
return *this;
}
void Counter::count() {
++_b;
}
void Counter::count(uint64_t n) {
_b += n;
}
std::ostream& Counter::print(std::ostream& o) const {
o << _c;
return o;
}
uint64_t Counter::load() const {
_b.push();
return _c.load();
}
void Counter::toPrometheus(std::string& result) const {
_b.push();
result += "#TYPE " + name() + " counter\n";
result += "#HELP " + name() + " " + help() + "\n";
result += name() + " " + std::to_string(load()) + "\n";
}
Counter::Counter(uint64_t const& val, std::string const& name, std::string const& help) :
Metric(name, help), _c(val), _b(_c) {}
Counter::~Counter() { _b.push(); }

View File

@ -0,0 +1,234 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_REST_SERVER_METRICS_H
#define ARANGODB_REST_SERVER_METRICS_H 1
#include <atomic>
#include <map>
#include <iostream>
#include <string>
#include <memory>
#include <variant>
#include <vector>
#include <unordered_map>
#include <cassert>
#include <cmath>
#include <limits>
#include "Basics/VelocyPackHelper.h"
#include "Logger/LogMacros.h"
#include "counter.h"
class Counter;
template<typename T> class Histogram;
class Metric {
public:
Metric(std::string const& name, std::string const& help);
virtual ~Metric();
std::string const& help() const;
std::string const& name() const;
virtual void toPrometheus(std::string& result) const = 0;
void header(std::string& result) const;
protected:
std::string const _name;
std::string const _help;
};
struct Metrics {
enum Type {COUNTER, HISTOGRAM};
using counter_type = gcl::counter::simplex<uint64_t, gcl::counter::atomicity::full>;
using hist_type = gcl::counter::simplex_array<uint64_t, gcl::counter::atomicity::full>;
using buffer_type = gcl::counter::buffer<uint64_t>;
};
/**
* @brief Counter functionality
*/
class Counter : public Metric {
public:
Counter(uint64_t const& val, std::string const& name, std::string const& help);
Counter(Counter const&) = delete;
~Counter();
std::ostream& print (std::ostream&) const;
Counter& operator++();
Counter& operator++(int);
void count();
void count(uint64_t);
uint64_t load() const;
void push();
virtual void toPrometheus(std::string&) const override;
private:
mutable Metrics::counter_type _c;
mutable Metrics::buffer_type _b;
};
template<typename T> class Gauge : public Metric {
public:
Gauge() = delete;
Gauge(uint64_t const& val, std::string const& name, std::string const& help)
: Metric(name, help) {
_g.store(val);
}
Gauge(Gauge const&) = delete;
virtual ~Gauge();
std::ostream& print (std::ostream&) const;
Gauge<T>& operator+=(T const& t) {
_g.store(_g + t);
return *this;
}
Gauge<T>& operator-=(T const& t) {
_g.store(_g - t);
return *this;
}
Gauge<T>& operator*=(T const& t) {
_g.store(_g * t);
return *this;
}
Gauge<T>& operator/=(T const& t) {
_g.store(_g / t);
return *this;
}
Gauge<T>& operator=(T const& t) {
_g.store(t);
return *this;
}
T load() const {
return _g.load();
};
virtual void toPrometheus(std::string&) const override {};
private:
std::atomic<T> _g;
};
std::ostream& operator<< (std::ostream&, Metrics::hist_type const&);
/**
* @brief Histogram functionality
*/
template<typename T> class Histogram : public Metric {
public:
Histogram() = delete;
Histogram (size_t const& buckets, T const& low, T const& high, std::string const& name, std::string const& help = "")
: Metric(name, help), _c(Metrics::hist_type(buckets)), _low(low), _high(high),
_lowr(std::numeric_limits<T>::max()), _highr(std::numeric_limits<T>::min()) {
TRI_ASSERT(_c.size() > 0);
_n = _c.size() - 1;
_div = std::floor((double)(high - low) / (double)_c.size());
TRI_ASSERT(_div != 0);
}
~Histogram() {}
void records(T const& t) {
if(t < _lowr) {
_lowr = t;
} else if (t > _highr) {
_highr = t;
}
}
void count(T const& t) {
if (t < _low) {
++_c[0];
} else if (t >= _high) {
++_c[_n];
} else {
++_c[static_cast<size_t>(std::floor(t / _div))];
}
records(t);
}
void count(T const& t, uint64_t n) {
if (t < _low) {
_c[0] += n;
} else if (t >= _high) {
_c[_n] += n;
} else {
_c[static_cast<size_t>(std::floor(t / _div))] += n;
}
records(t);
}
T const& low() const { return _low; }
T const& high() const { return _high; }
Metrics::hist_type::value_type& operator[](size_t n) {
return _c[n];
}
std::vector<uint64_t> load() const {
std::vector<uint64_t> v(size());
for (size_t i = 0; i < size(); ++i) {
v[i] = load(i);
}
return v;
}
uint64_t load(size_t i) const { return _c.load(i); };
size_t size() const { return _c.size(); }
virtual void toPrometheus(std::string& result) const override {
result += "#TYPE " + name() + " histogram\n";
result += "#HELP " + name() + " " + help() + "\n";
T le = _low;
T sum = T(0);
for (size_t i = 0; i < size(); ++i) {
uint64_t n = load(i);
sum += n;
result += name() + "_bucket{le=\"" + std::to_string(le) + "\"} " +
std::to_string(n) + "\n";
le += _div;
}
result += name() + "_count " + std::to_string(sum) + "\n";
}
std::ostream& print(std::ostream& o) const {
o << "_div: " << _div << ", _c: " << _c << ", _r: [" << _lowr << ", " << _highr << "] " << name();
return o;
}
Metrics::hist_type _c;
T _low, _high, _div, _lowr, _highr;
size_t _n;
};
std::ostream& operator<< (std::ostream&, Metrics::counter_type const&);
template<typename T>
std::ostream& operator<<(std::ostream& o, Histogram<T> const& h) {
return h.print(o);
}
#endif

View File

@ -0,0 +1,149 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "MetricsFeature.h"
#include "ApplicationFeatures/GreetingsFeaturePhase.h"
#include "Basics/application-exit.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/Metrics.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "MMFiles/MMFilesEngine.h"
#include "Statistics/StatisticsFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/StorageEngineFeature.h"
#include <chrono>
#include <thread>
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::basics;
using namespace arangodb::options;
// -----------------------------------------------------------------------------
// --SECTION-- global variables
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- MetricsFeature
// -----------------------------------------------------------------------------
MetricsFeature* MetricsFeature::METRICS = nullptr;
#include <iostream>
MetricsFeature::MetricsFeature(application_features::ApplicationServer& server)
: ApplicationFeature(server, "Metrics"),
_enabled(true) {
METRICS = this;
_serverStatistics = new
ServerStatistics(std::chrono::duration<double>(
std::chrono::system_clock::now().time_since_epoch()).count());
setOptional(false);
startsAfter<LoggerFeature>();
startsBefore<GreetingsFeaturePhase>();
}
void MetricsFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {}
void MetricsFeature::validateOptions(std::shared_ptr<ProgramOptions>) {}
void MetricsFeature::unprepare() {
METRICS = nullptr;
}
void MetricsFeature::prepare() {}
double time() {
return std::chrono::duration<double>( // time since epoch in seconds
std::chrono::system_clock::now().time_since_epoch())
.count();
}
void MetricsFeature::toPrometheus(std::string& result) const {
{
std::lock_guard<std::mutex> guard(_lock);
for (auto const& i : _registry) {
i.second->toPrometheus(result);
}
}
// StatisticsFeature
auto& sf = server().getFeature<StatisticsFeature>();
if (sf.enabled()) {
sf.toPrometheus(result, std::chrono::duration<double,std::milli>(
std::chrono::system_clock::now().time_since_epoch()).count());
}
// RocksDBEngine
auto es = EngineSelectorFeature::ENGINE;
if (es != nullptr) {
std::string const& engineName = es->typeName();
if (engineName == RocksDBEngine::EngineName) {
es->getStatistics(result);
}
}
}
Counter& MetricsFeature::counter (
std::string const& name, uint64_t const& val, std::string const& help) {
std::lock_guard<std::mutex> guard(_lock);
auto const it = _registry.find(name);
if (it != _registry.end()) {
LOG_TOPIC("8523d", ERR, Logger::STATISTICS) << "Failed to retrieve histogram " << name;
TRI_ASSERT(false);
}
auto c = std::make_shared<Counter>(val, name, help);
_registry.emplace(name, std::dynamic_pointer_cast<Metric>(c));
return *c;
};
Counter& MetricsFeature::counter (std::string const& name) {
std::lock_guard<std::mutex> guard(_lock);
auto it = _registry.find(name);
if (it == _registry.end()) {
LOG_TOPIC("32d58", ERR, Logger::STATISTICS)
<< "Failed to retrieve counter " << name;
TRI_ASSERT(false);
throw std::exception();
}
std::shared_ptr<Counter> h;
try {
h = std::dynamic_pointer_cast<Counter>(it->second);
} catch (std::exception const& e) {
LOG_TOPIC("853d2", ERR, Logger::STATISTICS)
<< "Failed to retrieve counter " << name;
TRI_ASSERT(false);
throw(e);
}
return *h;
};
ServerStatistics& MetricsFeature::serverStatistics() {
return *_serverStatistics;
}

View File

@ -0,0 +1,154 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_REST_SERVER_METRICS_FEATURE_H
#define ARANGODB_REST_SERVER_METRICS_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Logger/LoggerFeature.h"
#include "Logger/LogMacros.h"
#include "ProgramOptions/ProgramOptions.h"
#include "RestServer/Metrics.h"
#include "Statistics/ServerStatistics.h"
namespace arangodb {
class MetricsFeature final : public application_features::ApplicationFeature {
public:
bool enabled() const {
return _enabled;
}
static double time();
explicit MetricsFeature(application_features::ApplicationServer& server);
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void unprepare() override final;
template<typename T> Histogram<T>&
histogram (std::string const& name, size_t const& buckets, T const& low,
T const& high, std::string const& help = std::string()) {
std::lock_guard<std::mutex> guard(_lock);
auto const it = _registry.find(name);
if (it != _registry.end()) {
LOG_TOPIC("32e85", ERR, Logger::STATISTICS) << "histogram " << name << " alredy exists";
TRI_ASSERT(false);
}
auto h = std::make_shared<Histogram<T>>(buckets, low, high, name, help);
_registry.emplace(name, std::dynamic_pointer_cast<Metric>(h));
return *h;
};
template<typename T> Histogram<T>& histogram (std::string const& name) {
std::lock_guard<std::mutex> guard(_lock);
auto const it = _registry.find(name);
if (it == _registry.end()) {
LOG_TOPIC("32d85", ERR, Logger::STATISTICS) << "No histogram booked as " << name;
TRI_ASSERT(false);
throw std::exception();
}
std::shared_ptr<Histogram<T>> h = nullptr;
try {
h = std::dynamic_pointer_cast<Histogram<T>>(*it->second);
if (h == nullptr) {
LOG_TOPIC("d2358", ERR, Logger::STATISTICS) << "Failed to retrieve histogram " << name;
}
} catch (std::exception const& e) {
LOG_TOPIC("32d75", ERR, Logger::STATISTICS)
<< "Failed to retrieve histogram " << name << ": " << e.what();
}
if (h == nullptr) {
TRI_ASSERT(false);
}
return *h;
};
Counter& counter(std::string const& name, uint64_t const& val, std::string const& help);
Counter& counter(std::string const& name);
template<typename T>
Gauge<T>& gauge(std::string const& name, T const& t, std::string const& help) {
std::lock_guard<std::mutex> guard(_lock);
auto it = _registry.find(name);
if (it != _registry.end()) {
LOG_TOPIC("c7b37", ERR, Logger::STATISTICS) << "gauge " << name << " alredy exists";
TRI_ASSERT(false);
throw(std::exception());
}
auto g = std::make_shared<Gauge>(t, name, help);
_registry.emplace(name, std::dynamic_pointer_cast<Metric>(g));
return *g;
}
template<typename T> Gauge<T>& gauge(std::string const& name) {
std::lock_guard<std::mutex> guard(_lock);
auto it = _registry.find(name);
if (it == _registry.end()) {
LOG_TOPIC("5832d", ERR, Logger::STATISTICS) << "No metric booked as " << name;
TRI_ASSERT(false);
throw std::exception();
}
std::shared_ptr<Gauge<T>> g = nullptr;
try {
g = std::dynamic_pointer_cast<Gauge<T>>(it->second);
if (g == nullptr) {
LOG_TOPIC("d4368", ERR, Logger::STATISTICS) << "Failed to retrieve gauge metric " << name;
TRI_ASSERT(false);
throw std::exception();
}
} catch (std::exception const& e) {
LOG_TOPIC("c2348", ERR, Logger::STATISTICS)
<< "Failed to retrieve gauge metric " << name << ": " << e.what();
TRI_ASSERT(false);
throw(e);
}
}
void toPrometheus(std::string& result) const;
static MetricsFeature* metrics() {
return METRICS;
}
ServerStatistics& serverStatistics();
private:
static MetricsFeature* METRICS;
bool _enabled;
std::unordered_map<std::string, std::shared_ptr<Metric>> _registry;
mutable std::mutex _lock;
ServerStatistics* _serverStatistics;
};
} // namespace arangodb
#endif

View File

@ -89,6 +89,7 @@
#include "RestServer/InitDatabaseFeature.h"
#include "RestServer/LanguageCheckFeature.h"
#include "RestServer/LockfileFeature.h"
#include "RestServer/MetricsFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/ScriptFeature.h"
#include "RestServer/ServerFeature.h"
@ -155,6 +156,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) {
std::type_index(typeid(GreetingsFeature)),
std::type_index(typeid(HttpEndpointProvider)),
std::type_index(typeid(LoggerBufferFeature)),
std::type_index(typeid(MetricsFeature)),
std::type_index(typeid(pregel::PregelFeature)),
std::type_index(typeid(ServerFeature)),
std::type_index(typeid(SslServerFeature)),
@ -228,6 +230,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) {
std::vector<std::type_index>{std::type_index(typeid(ScriptFeature))});
server.addFeature<SslFeature>();
server.addFeature<StatisticsFeature>();
server.addFeature<MetricsFeature>();
server.addFeature<StorageEngineFeature>();
server.addFeature<SystemDatabaseFeature>();
server.addFeature<TempFeature>(name);

View File

@ -0,0 +1,949 @@
// Copyright 2009 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "dynarray.h"
#include <unordered_set>
#include <atomic>
#include <mutex>
namespace gcl {
/*
INTRODUCTION
This file implements highly concurrent distributed counters.
The intent is to minimize the cost of incrementing the counter,
accepting increased costs to obtain the count.
That is, these counters are appropriate to code with
very frequent counter increments but relatively rare counter reads.
These counters are parameterized by the base integer type
that maintains the count.
Avoid situations that overflow the integer,
as that may have undefined behavior.
This constraint implies that counters must be sized to their use.
GENERAL METHODS
The general counter methods are as follows.
<constructor>( integer ):
The parameter is the initial counter value.
<constructor>():
Equivalent to an initial value of zero.
void operator +=( integer ):
void operator -=( integer ):
Add/subtract a value to/front the counter.
There is no default value.
void operator ++(): // prefix
void operator ++(int): // postfix
void operator --(): // prefix
void operator --(int): // postfix
Increment or decrement the counter.
integer load():
Returns the value of the counter.
integer exchange( integer ):
Replaces the existing count by the count in the parameter
and returns the previous count,
which enables safe concurrent count extraction.
There are no copy or assignment operations.
SIMPLEX COUNTERS
The simplex counters provide low-latency counting.
They implement all the general operations.
counter::simplex<int> red_count;
void count_red( Bag bag ) {
for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ )
if ( is_red( *i ) )
++red_count;
}
Note that this code may have significant cost
because of the repeated global atomic increments.
Using a temporary int to maintain the count within the loop
runs the risk of loosing some counts in the event of an exception.
COUNTER BUFFERS
The cost of incrementing the counter is reduced
by using a buffer as a proxy for the counter.
The counter is a reference parameter to the buffer.
This buffer is typically accessed by a single thread,
and so its cost can be substantially lower.
counter::simplex<int> red_count;
void count_red( Bag bag ) {
counter::buffer<int> local_red( red_count );
for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ )
if ( is_red( *i ) )
++local_red;
}
The buffer will automatically transfer its count
to the main counter on destruction.
If this latency is too great,
use the push method to transfer the count early.
void count_red( Bag bag1, Bag bag2 ) {
counter::buffer<int> local_red( red_count );
for ( Bag::iterator i = bag1.begin(); i != bag1.end(); i++ )
if ( is_red( *i ) )
++local_red;
local_red.push();
for ( Bag::iterator i = bag2.begin(); i != bag2.end(); i++ )
if ( is_red( *i ) )
++local_red;
}
Any increments on buffers since the last push
will not be reflected in the value reported by a load of the counter.
The destructor does an implicit push.
The lifetime of the counter must be strictly larger than
the lifetimes of any buffers attached to it.
DUPLEX COUNTERS
The push model of buffers sometimes yields an unacceptable lag
in the observed value of the count.
To avoid this lag,
there are duplex counters.
A duplex counter is paired with a broker,
the counter can query the broker for counts,
thus maintaining a somewhat current view of the count.
counter::strong_duplex<int> red_count;
void count_red( Bag bag )
counter::strong_broker broker( red_count );
for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ )
if ( is_red( *i ) )
++broker;
}
Another thread may call red_count.load() and get the current count.
That operation will poll each broker for its count and return the sum.
Naturally, any increments done to a broker after it is polled will be missed,
but no counts will be lost.
The primary use case for duplex counters
is to enable fast thread-local increments
while still maintaining a decent global count.
weak_duplex<int> red_count;
thread_local weak_broker<int> thread_red( red_count );
void count_red( Bag bag )
for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ )
if ( is_red( *i ) )
++thread_red;
}
WEAK DUPLEX COUNTERS
The exchange operation works
by atomically transfering broker counts to the main counter,
and then exchanging out of the main counter.
Consequently,
every count will be extracted by one and only one exchange operation.
However, that exchange operation can be expensive because
it and the broker increment operations
require write atomicity to the same broker object.
To reduce that concurrency cost,
the weak_duplex counter and its weak_broker
do not provide the exchange operation.
This difference
means that the polling is a read-only operation
and requires less synchronization.
Use this counter when you do not intend to exchange values.
BUFFERING BROKERS
Thread-local increments, while cheaper than shared global increments,
are still more expensive than function-local increments.
To mitigate that cost, buffers work with brokers as well.
weak_duplex<int> red_count;
thread_local weak_broker<int> thread_red( red_count );
void count_red( Bag bag )
buffer<int> local_red( thread_red );
for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ )
if ( is_red( *i ) )
++local_red;
}
As with buffers in general,
the count transfers only on destruction a push operation.
Consequently, red_count.load() will not reflect any counts in buffers.
Those counts will not be lost, though.
COUNTER ARRAYS
Counter arrays provide a means to handle many counters with one name.
The size of the counter arrays is fixed at construction time.
Counter arrays have the same structure as single-value counters,
with the following exceptions.
Increment a counter by indexing its array
and then applying one of the operations.
E.g. ++ctr_ary[i].
The load and exchange operations take an additional index parameter.
Do we want to initialize a counter array with an initializer list?
Do we want to return a dynarray for the load operation?
Do we want to pass and return a dynarray for the exchange operation?
ATOMICITY
In the course of program evolution, debugging and tuning,
a counter may desire implementation with weaker concurrency requirements.
That is accomplished by explicitly specifing the atomicity.
For example, suppose it is discovered that a counter
counter::simplex<int> red_count;
is only ever read and written from a single thread.
We can avoid the cost of atomic operations by making the counter serial.
counter::simplex<int, counter::atomicity::none> red_count;
This approach preserves the programming interface.
The counters support three levels of atomicity.
These are specified as a (usually defaulted) template parameter.
counter::atomicity::none // only one thread may access the counter
counter::atomicity::semi // multiple readers, but only one writer
counter::atomicity::full // multiple readers and writers
Buffers have two template parameters for atomicity,
one for the atomicity of the counter it is modifying,
and one for atomicity of the buffer itself.
By exlicitly specifying this atomicity,
one can build unusual configurations of buffers for unusual situations.
For example, suppose increments of red_count
tend to cluster in tasks with high processor affinity.
By separating those counts with a global intermediate buffer,
counting can exploit processor locality
and avoid substantial inter-processor communication.
For example,
counter::simplex<int> red_count;
counter::buffer<int, counter::atomicity::full, counter::atomicity::full>
red_squares( red_count );
counter::buffer<int, counter::atomicity::full, counter::atomicity::full>
red_circles( red_count );
void count_red_squares( Bag bag ) {
counter::buffer<int> local_red( red_squares );
for ( Bag::iterator i = bag.begin(); i != bag.end(); i++ )
if ( is_red( *i ) )
++local_red;
}
The red_squares variable is global,
and will automatically transfer its count to red_count
only on global variable destruction
This transfer is likely to be too late for many purposes.
One solution is explicit programming to call red_squares.push()
at appropriate times.
Another solution is to use duplex counters.
GUIDELINES FOR USE
Use a simplex counter
when you have a low rate of updates or a high read rate
or little tolerance for latency.
Use a strong duplex counter and broker
when your update rate is significantly higher than the load rate,
you can tolerate latency in counting,
and you need the exchange operation.
Use a weak duplex counter and broker
when your update rate is significantly higher than the load rate,
you can tolerate latency in counting,
but you do not need the exchange operation.
Use buffers to collect short-term bursts of counts.
The operations of the counters, brokers, and buffers
have the following rough costs.
strong weak
simplex duplex duplex
update atomic rmw atomic rmw atomic rmw
load atomic read mutex + n * mutex + n *
atomic read atomic read
exchange atomic rmw mutex + n * n/a
atomic rmw
construction trivial std::set std::set
destruction trivial std::set std::set
== == ==
strong weak
buffer broker broker
update serial atomic rmw atomic
read & write read & write
construction pointer mutex + mutex +
assign set.insert set.insert
destruction pointer mutex + mutex +
assign set.remove set.remove
IMPLEMENTATION
The lowest implementation layer level of the organization is a bumper.
Bumpers provide the interface of simplex counter,
but only the increment and decrement interface is public.
The rest are protected.
Buffer constructors require a reference to a bumper.
Simplex counters, buffers, duplex counters, and buffers
are all derived from a bumper,
which enables buffers to connect to all of them.
*/
/* Implementation .... */
/*
The operation defined out-of-line
break cyclic dependences in the class definitions.
They are expected to be rarely called, and efficiency is less of a concern.
*/
namespace counter {
enum class atomicity
{
none, // permits access to only one thread
semi, // allows multiple readers, but only one writer
full // allows multiple readers and writers
};
/*
The bumper classes provide the minimal increment and decrement interface.
They serve as base classes for the public types.
*/
template< typename Integral, atomicity Atomicity >
class bumper;
template< typename Integral >
class bumper< Integral, atomicity::none >
{
bumper( const bumper& ) = delete;
bumper& operator=( const bumper& ) = delete;
public:
void operator +=( Integral by ) { value_ += by; }
void operator -=( Integral by ) { value_ -= by; }
void operator ++() { *this += 1; }
void operator ++(int) { *this += 1; }
void operator --() { *this -= 1; }
void operator --(int) { *this -= 1; }
protected:
constexpr bumper( Integral in ) : value_( in ) {}
constexpr bumper() : value_( 0 ) {}
Integral load() const { return value_; }
Integral exchange( Integral to )
{ Integral tmp = value_; value_ = to; return tmp; }
Integral value_;
template< typename, atomicity >
friend class bumper_array;
template< typename, atomicity, atomicity >
friend class buffer_array;
friend struct std::dynarray< bumper >;
};
template< typename Integral >
class bumper< Integral, atomicity::semi >
{
bumper( const bumper& ) = delete;
bumper& operator=( const bumper& ) = delete;
public:
void operator +=( Integral by )
{ value_.store( value_.load( std::memory_order_relaxed ) + by,
std::memory_order_relaxed ); }
void operator -=( Integral by )
{ value_.store( value_.load( std::memory_order_relaxed ) - by,
std::memory_order_relaxed ); }
void operator ++() { *this += 1; }
void operator ++(int) { *this += 1; }
void operator --() { *this -= 1; }
void operator --(int) { *this -= 1; }
protected:
constexpr bumper( Integral in ) : value_( in ) {}
constexpr bumper() : value_( 0 ) {}
Integral load() const { return value_.load( std::memory_order_relaxed ); }
Integral exchange( Integral to )
{ Integral tmp = value_.load( std::memory_order_relaxed );
value_.store( to, std::memory_order_relaxed ); return tmp; }
std::atomic< Integral > value_;
template< typename, atomicity >
friend class bumper_array;
template< typename, atomicity, atomicity >
friend class buffer_array;
friend struct std::dynarray< bumper >;
};
template< typename Integral >
class bumper< Integral, atomicity::full >
{
bumper( const bumper& ) = delete;
bumper& operator=( const bumper& ) = delete;
public:
void operator +=( Integral by )
{ value_.fetch_add( by, std::memory_order_relaxed ); }
void operator -=( Integral by )
{ value_.fetch_sub( by, std::memory_order_relaxed ); }
void operator ++() { *this += 1; }
void operator ++(int) { *this += 1; }
void operator --() { *this -= 1; }
void operator --(int) { *this -= 1; }
protected:
constexpr bumper( Integral in ) : value_( in ) {}
constexpr bumper() : value_( 0 ) {}
Integral load() const { return value_.load( std::memory_order_relaxed ); }
Integral exchange( Integral to )
{ return value_.exchange( to, std::memory_order_relaxed ); }
std::atomic< Integral > value_;
template< typename, atomicity >
friend class bumper_array;
template< typename, atomicity, atomicity >
friend class buffer_array;
friend struct std::dynarray< bumper >;
};
/*
The simplex counters.
*/
template< typename Integral,
atomicity Atomicity = atomicity::full >
class simplex
: public bumper< Integral, Atomicity >
{
typedef bumper< Integral, Atomicity > base_type;
public:
constexpr simplex() : base_type( 0 ) {}
constexpr simplex( Integral in ) : base_type( in ) {}
simplex( const simplex& ) = delete;
simplex& operator=( const simplex& ) = delete;
Integral load() const { return base_type::load(); }
Integral exchange( Integral to ) { return base_type::exchange( to ); }
};
/*
Buffers reduce contention on counters.
They require template parameters to specify the atomicity
of the prime counter and of the buffer itself.
The lifetime of the prime must cover the lifetime of the buffer.
Any increments in the buffer since the last buffer::push call
are not reflected in the queries on the prime.
*/
template< typename Integral,
atomicity PrimeAtomicity = atomicity::full,
atomicity BufferAtomicity = atomicity::none >
class buffer
: public bumper< Integral, BufferAtomicity >
{
typedef bumper< Integral, PrimeAtomicity > prime_type;
typedef bumper< Integral, BufferAtomicity > base_type;
public:
buffer() = delete;
buffer( prime_type& p ) : base_type( 0 ), prime_( p ) {}
buffer( const buffer& ) = delete;
buffer& operator=( const buffer& ) = delete;
void push()
{ Integral value = base_type::exchange( 0 );
if ( value != 0 ) prime_ += value; }
~buffer() { push(); }
private:
prime_type& prime_;
};
/*
Duplex counters enable a "pull" model of counting.
Each counter, the prime, may have one or more brokers.
The lifetime of the prime must cover the lifetime of the brokers.
The duplex counter queries may fail to detect any counting done concurrently.
The query operations are expected to be rare relative to the counting.
The weak duplex counter does not support the exchange operation,
which means that you cannot extract counts early.
*/
template< typename Integral > class strong_broker;
template< typename Integral > class strong_duplex
: public bumper< Integral, atomicity::full >
{
typedef bumper< Integral, atomicity::full > base_type;
typedef strong_broker< Integral > broker_type;
friend class strong_broker< Integral >;
public:
strong_duplex() : base_type( 0 ) {}
strong_duplex( Integral in ) : base_type( in ) {}
Integral load() const;
Integral exchange( Integral to );
~strong_duplex();
private:
void insert( broker_type* child );
void erase( broker_type* child, Integral by );
std::mutex serializer_;
typedef std::unordered_set< broker_type* > set_type;
set_type children_;
};
template< typename Integral > class strong_broker
: public bumper< Integral, atomicity::full >
{
typedef bumper< Integral, atomicity::full > base_type;
typedef strong_duplex< Integral > duplex_type;
friend class strong_duplex< Integral >;
public:
strong_broker( duplex_type& p );
strong_broker() = delete;
strong_broker( const strong_broker& ) = delete;
strong_broker& operator=( const strong_broker& ) = delete;
~strong_broker();
private:
Integral poll() const { return base_type::load(); }
Integral drain() { return base_type::exchange( 0 ); }
duplex_type& prime_;
};
template< typename Integral >
void strong_duplex< Integral >::insert( broker_type* child )
{
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.insert( child ).second );
}
template< typename Integral >
void strong_duplex< Integral >::erase( broker_type* child, Integral by )
{
this->operator +=( by );
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.erase( child ) == 1 );
}
template< typename Integral >
Integral strong_duplex< Integral >::load() const
{
typedef typename set_type::iterator iterator;
Integral tmp = 0;
{
std::lock_guard< std::mutex > _( serializer_ );
iterator rollcall = children_.begin();
for ( ; rollcall != children_.end(); rollcall++ )
tmp += (*rollcall)->poll();
}
return tmp + base_type::load();
}
template< typename Integral >
Integral strong_duplex< Integral >::exchange( Integral to )
{
typedef typename set_type::iterator iterator;
Integral tmp = 0;
{
std::lock_guard< std::mutex > _( serializer_ );
iterator rollcall = children_.begin();
for ( ; rollcall != children_.end(); rollcall++ )
tmp += (*rollcall)->drain();
}
return tmp + base_type::exchange( to );
}
template< typename Integral >
strong_duplex< Integral >::~strong_duplex()
{
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.size() == 0 );
}
template< typename Integral >
strong_broker< Integral >::strong_broker( duplex_type& p )
:
base_type( 0 ),
prime_( p )
{
prime_.insert( this );
}
template< typename Integral >
strong_broker< Integral >::~strong_broker()
{
prime_.erase( this, base_type::load() );
}
template< typename Integral > class weak_broker;
template< typename Integral > class weak_duplex
: public bumper< Integral, atomicity::full >
{
typedef bumper< Integral, atomicity::full > base_type;
typedef weak_broker< Integral > broker_type;
friend class weak_broker< Integral >;
public:
weak_duplex() : base_type( 0 ) {}
weak_duplex( Integral in ) : base_type( in ) {}
weak_duplex( const weak_duplex& ) = delete;
weak_duplex& operator=( const weak_duplex& ) = delete;
Integral load() const;
~weak_duplex();
private:
void insert( broker_type* child );
void erase( broker_type* child, Integral by );
std::mutex serializer_;
typedef std::unordered_set< broker_type* > set_type;
set_type children_;
};
template< typename Integral > class weak_broker
: public bumper< Integral, atomicity::semi >
{
typedef bumper< Integral, atomicity::semi > base_type;
typedef weak_duplex< Integral > duplex_type;
friend class weak_duplex< Integral >;
public:
weak_broker( duplex_type& p );
weak_broker() = delete;
weak_broker( const weak_broker& ) = delete;
weak_broker& operator=( const weak_broker& ) = delete;
~weak_broker();
private:
Integral poll() const { return base_type::load(); }
duplex_type& prime_;
};
template< typename Integral >
void weak_duplex< Integral >::insert( broker_type* child )
{
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.insert( child ).second );
}
template< typename Integral >
void weak_duplex< Integral >::erase( broker_type* child, Integral by )
{
std::lock_guard< std::mutex > _( serializer_ );
this->operator +=( by );
assert( children_.erase( child ) == 1 );
}
template< typename Integral >
Integral weak_duplex< Integral >::load() const
{
typedef typename set_type::iterator iterator;
Integral tmp = 0;
{
std::lock_guard< std::mutex > _( serializer_ );
iterator rollcall = children_.begin();
for ( ; rollcall != children_.end(); rollcall++ )
tmp += (*rollcall)->poll();
tmp += base_type::load();
}
return tmp;
}
template< typename Integral >
weak_duplex< Integral >::~weak_duplex()
{
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.size() == 0 );
}
template< typename Integral >
weak_broker< Integral >::weak_broker( duplex_type& p )
:
base_type( 0 ),
prime_( p )
{
prime_.insert( this );
}
template< typename Integral >
weak_broker< Integral >::~weak_broker()
{
prime_.erase( this, base_type::load() );
}
// Counter arrays.
template< typename Integral,
atomicity Atomicity = atomicity::full >
class bumper_array
{
public:
typedef bumper< Integral, Atomicity > value_type;
private:
typedef std::dynarray< value_type > storage_type;
public:
typedef typename storage_type::size_type size_type;
bumper_array() = delete;
bumper_array( size_type size ) : storage( size ) {}
bumper_array( const bumper_array& ) = delete;
bumper_array& operator=( const bumper_array& ) = delete;
value_type& operator[]( size_type idx ) { return storage[ idx ]; }
size_type size() const { return storage.size(); }
protected:
Integral load( size_type idx ) const { return storage[ idx ].load(); }
Integral exchange( size_type idx, Integral value )
{ return storage[ idx ].exchange( value ); }
private:
storage_type storage;
};
template< typename Integral,
atomicity Atomicity = atomicity::full >
class simplex_array
: public bumper_array< Integral, Atomicity >
{
typedef bumper_array< Integral, Atomicity > base_type;
public:
typedef typename base_type::value_type value_type;
typedef typename base_type::size_type size_type;
simplex_array() = delete;
constexpr simplex_array( size_type size ) : base_type( size ) {}
simplex_array( const simplex_array& ) = delete;
simplex_array& operator=( const simplex_array& ) = delete;
Integral load( size_type idx ) const{ return base_type::load( idx ); }
Integral exchange( size_type idx, Integral value )
{ return base_type::exchange( idx, value ); }
value_type& operator[]( size_type idx ) { return base_type::operator[]( idx ); }
size_type size() const { return base_type::size(); }
};
template< typename Integral,
atomicity PrimeAtomicity = atomicity::full,
atomicity BufferAtomicity = atomicity::full >
class buffer_array
: public bumper_array< Integral, BufferAtomicity >
{
typedef bumper_array< Integral, BufferAtomicity > base_type;
typedef bumper_array< Integral, PrimeAtomicity > prime_type;
public:
typedef typename base_type::value_type value_type;
typedef typename base_type::size_type size_type;
buffer_array() = delete;
buffer_array( prime_type& p ) : base_type( p.size() ), prime_( p ) {}
buffer_array( const buffer_array& ) = delete;
buffer_array& operator=( const buffer_array& ) = delete;
void push( size_type idx )
{ Integral value = base_type::operator[]( idx ).exchange( 0 );
if ( value != 0 ) prime_[ idx ] += value; }
void push();
size_type size() const { return base_type::size(); }
~buffer_array() { push(); }
private:
prime_type& prime_;
};
template< typename Integral,
atomicity BufferAtomicity, atomicity PrimeAtomicity >
void
buffer_array< Integral, BufferAtomicity, PrimeAtomicity >::push()
{
int size = base_type::size();
for ( int i = 0; i < size; ++i )
push( i );
}
// Duplex arrays
template< typename Integral > class strong_broker_array;
template< typename Integral > class strong_duplex_array
: public bumper_array< Integral, atomicity::full >
{
typedef bumper_array< Integral, atomicity::full > base_type;
typedef strong_broker_array< Integral > broker_type;
friend class strong_broker_array< Integral >;
public:
typedef typename base_type::value_type value_type;
typedef typename base_type::size_type size_type;
strong_duplex_array() = delete;
constexpr strong_duplex_array( size_type size )
: base_type( size ) {}
strong_duplex_array( const strong_duplex_array& ) = delete;
strong_duplex_array& operator=( const strong_duplex_array& ) = delete;
value_type& operator[]( size_type idx ) { return base_type::operator[]( idx ); }
size_type size() const { return base_type::size(); }
~strong_duplex_array();
private:
void insert( broker_type* child );
void erase( broker_type* child, Integral by );
std::mutex serializer_;
typedef std::unordered_set< broker_type* > set_type;
set_type children_;
};
template< typename Integral > class strong_broker_array
: public bumper_array< Integral, atomicity::semi >
{
typedef bumper_array< Integral, atomicity::semi > base_type;
typedef strong_duplex_array< Integral > duplex_type;
friend class strong_duplex_array< Integral >;
public:
typedef typename base_type::value_type value_type;
typedef typename base_type::size_type size_type;
strong_broker_array() = delete;
strong_broker_array( duplex_type& p );
strong_broker_array( const strong_broker_array& ) = delete;
strong_broker_array& operator=( const strong_broker_array& ) = delete;
size_type size() const { return base_type::size(); }
~strong_broker_array();
private:
Integral poll( size_type idx )
{ return base_type::operator[]( idx ).load(); }
Integral drain( size_type idx )
{ return base_type::operator[]( idx ).exchange( 0 ); }
duplex_type& prime_;
};
template< typename Integral >
strong_duplex_array< Integral >::~strong_duplex_array()
{
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.size() == 0 );
}
template< typename Integral >
strong_broker_array< Integral >::strong_broker_array( duplex_type& p )
:
base_type( p.size() ),
prime_( p )
{
prime_.insert( this );
}
template< typename Integral >
strong_broker_array< Integral >::~strong_broker_array()
{
prime_.erase( this, base_type::load() );
}
template< typename Integral > class weak_broker_array;
template< typename Integral > class weak_duplex_array
: public bumper_array< Integral, atomicity::full >
{
typedef bumper_array< Integral, atomicity::full > base_type;
typedef weak_broker_array< Integral > broker_type;
friend class weak_broker_array< Integral >;
public:
typedef typename base_type::value_type value_type;
typedef typename base_type::size_type size_type;
weak_duplex_array() = delete;
constexpr weak_duplex_array( size_type size )
: base_type( size ) {}
weak_duplex_array( const weak_duplex_array& ) = delete;
weak_duplex_array& operator=( const weak_duplex_array& ) = delete;
value_type& operator[]( size_type idx ) { return base_type::operator[]( idx ); }
size_type size() const { return base_type::size(); }
~weak_duplex_array();
private:
void insert( broker_type* child );
void erase( broker_type* child, Integral by );
std::mutex serializer_;
typedef std::unordered_set< broker_type* > set_type;
set_type children_;
};
template< typename Integral > class weak_broker_array
: public bumper_array< Integral, atomicity::semi >
{
typedef bumper_array< Integral, atomicity::semi > base_type;
typedef weak_duplex_array< Integral > duplex_type;
friend class weak_duplex_array< Integral >;
public:
typedef typename base_type::value_type value_type;
typedef typename base_type::size_type size_type;
weak_broker_array() = delete;
weak_broker_array( duplex_type& p );
weak_broker_array( const weak_broker_array& ) = delete;
weak_broker_array& operator=( const weak_broker_array& ) = delete;
size_type size() const { return base_type::size(); }
~weak_broker_array();
private:
Integral poll( size_type idx )
{ return base_type::operator[]( idx ).load(); }
duplex_type& prime_;
};
template< typename Integral >
weak_duplex_array< Integral >::~weak_duplex_array()
{
std::lock_guard< std::mutex > _( serializer_ );
assert( children_.size() == 0 );
}
template< typename Integral >
weak_broker_array< Integral >::weak_broker_array( duplex_type& p )
:
base_type( p.size() ),
prime_( p )
{
prime_.insert( this );
}
template< typename Integral >
weak_broker_array< Integral >::~weak_broker_array()
{
prime_.erase( this, base_type::load() );
}
} // namespace counter
} // namespace gcl

View File

@ -0,0 +1,116 @@
// Copyright 2009 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iterator>
#include <stdexcept>
#include <limits>
namespace std {
struct bad_array_length_ { };
template< class T >
struct dynarray
{
// types:
typedef T value_type;
typedef T& reference;
typedef const T& const_reference;
typedef T* iterator;
typedef const T* const_iterator;
typedef std::reverse_iterator<iterator> reverse_iterator;
typedef std::reverse_iterator<const_iterator> const_reverse_iterator;
typedef size_t size_type;
typedef ptrdiff_t difference_type;
// fields:
private:
T* store;
size_type count;
// helper functions:
void check(size_type n)
{ if ( n >= count ) throw out_of_range("dynarray"); }
T* alloc(size_type n)
{ if ( n > std::numeric_limits<size_type>::max()/sizeof(T) )
throw std::bad_array_length_();
return reinterpret_cast<T*>( new char[ n*sizeof(T) ] ); }
public:
// construct and destruct:
dynarray() = delete;
const dynarray operator=(const dynarray&) = delete;
explicit dynarray(size_type c)
: store( alloc( c ) ), count( c )
{ size_type i;
try {
for ( size_type i = 0; i < count; ++i )
new (store+i) T;
} catch ( ... ) {
for ( ; i > 0; --i )
(store+(i-1))->~T();
throw;
} }
dynarray(const dynarray& d)
: store( alloc( d.count ) ), count( d.count )
{ try { uninitialized_copy( d.begin(), d.end(), begin() ); }
catch ( ... ) { delete store; throw; } }
~dynarray()
{ for ( size_type i = 0; i < count; ++i )
(store+i)->~T();
delete[] store; }
// iterators:
iterator begin() { return store; }
const_iterator begin() const { return store; }
const_iterator cbegin() const { return store; }
iterator end() { return store + count; }
const_iterator end() const { return store + count; }
const_iterator cend() const { return store + count; }
reverse_iterator rbegin()
{ return reverse_iterator(end()); }
const_reverse_iterator rbegin() const
{ return reverse_iterator(end()); }
reverse_iterator rend()
{ return reverse_iterator(begin()); }
const_reverse_iterator rend() const
{ return reverse_iterator(begin()); }
// capacity:
size_type size() const { return count; }
size_type max_size() const { return count; }
bool empty() const { return count == 0; }
// element access:
reference operator[](size_type n) { return store[n]; }
const_reference operator[](size_type n) const { return store[n]; }
reference front() { return store[0]; }
const_reference front() const { return store[0]; }
reference back() { return store[count-1]; }
const_reference back() const { return store[count-1]; }
const_reference at(size_type n) const { check(n); return store[n]; }
reference at(size_type n) { check(n); return store[n]; }
// data access:
T* data() { return store; }
const T* data() const { return store; }
};
} // namespace std

View File

@ -2134,6 +2134,26 @@ std::unique_ptr<TRI_vocbase_t> RocksDBEngine::openExistingDatabase(
}
}
void RocksDBEngine::getStatistics(std::string& result) const {
VPackBuilder stats;
getStatistics(stats);
VPackSlice sslice = stats.slice();
TRI_ASSERT(sslice.isObject());
for (auto const& a : VPackObjectIterator(sslice)) {
if (a.value.isNumber()) {
std::string name = a.key.copyString();
std::replace(name.begin(), name.end(), '.', '_');
std::replace(name.begin(), name.end(), '-', '_');
if (name.front() != 'r') {
name = EngineName + "_" + name ;
}
result += "#TYPE " + name +
" counter\n" + "#HELP " + name + " " + name + "\n" +
name + " " + std::to_string(a.value.getNumber<uint64_t>()) + "\n";
}
}
}
void RocksDBEngine::getStatistics(VPackBuilder& builder) const {
// add int properties
auto addInt = [&](std::string const& s) {

View File

@ -160,6 +160,7 @@ class RocksDBEngine final : public StorageEngine {
LogicalCollection& collection, velocypack::Slice const& info) override;
void getStatistics(velocypack::Builder& builder) const override;
void getStatistics(std::string& result) const override;
// inventory functionality
// -----------------------

View File

@ -32,6 +32,7 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "RestServer/MetricsFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
@ -106,7 +107,7 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
// register with manager
transaction::ManagerFeature::manager()->registerTransaction(id(), nullptr, isReadOnlyTransaction());
updateStatus(transaction::Status::RUNNING);
ServerStatistics::statistics()._transactionsStatistics._transactionsStarted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsStarted++;
setRegistered();
@ -397,7 +398,7 @@ Result RocksDBTransactionState::commitTransaction(transaction::Methods* activeTr
if (res.ok()) {
updateStatus(transaction::Status::COMMITTED);
cleanupTransaction(); // deletes trx
ServerStatistics::statistics()._transactionsStatistics._transactionsCommitted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsCommitted++;
} else {
abortTransaction(activeTrx); // deletes trx
}
@ -429,7 +430,7 @@ Result RocksDBTransactionState::abortTransaction(transaction::Methods* activeTrx
TRI_ASSERT(!_rocksTransaction && !_cacheTx && !_readSnapshot);
}
ServerStatistics::statistics()._transactionsStatistics._transactionsAborted++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._transactionsAborted++;
unuseCollections(nestingLevel());
return result;
}
@ -601,7 +602,7 @@ Result RocksDBTransactionState::triggerIntermediateCommit(bool& hasPerformedInte
}
hasPerformedIntermediateCommit = true;
ServerStatistics::statistics()._transactionsStatistics._intermediateCommits++;
MetricsFeature::metrics()->serverStatistics()._transactionsStatistics._intermediateCommits++;
TRI_IF_FAILURE("FailAfterIntermediateCommit") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);

View File

@ -22,6 +22,7 @@
#include "Descriptions.h"
#include "Basics/process-utils.h"
#include "RestServer/MetricsFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Statistics/ConnectionStatistics.h"
@ -410,15 +411,15 @@ stats::Descriptions::Descriptions()
}
void stats::Descriptions::serverStatistics(velocypack::Builder& b) const {
ServerStatistics const& info = ServerStatistics::statistics();
ServerStatistics const& info = MetricsFeature::metrics()->serverStatistics();
b.add("uptime", VPackValue(info._uptime));
b.add("physicalMemory", VPackValue(TRI_PhysicalMemory));
b.add("transactions", VPackValue(VPackValueType::Object));
b.add("started", VPackValue(info._transactionsStatistics._transactionsStarted));
b.add("aborted", VPackValue(info._transactionsStatistics._transactionsAborted));
b.add("committed", VPackValue(info._transactionsStatistics._transactionsCommitted));
b.add("intermediateCommits", VPackValue(info._transactionsStatistics._intermediateCommits));
b.add("started", VPackValue(info._transactionsStatistics._transactionsStarted.load()));
b.add("aborted", VPackValue(info._transactionsStatistics._transactionsAborted.load()));
b.add("committed", VPackValue(info._transactionsStatistics._transactionsCommitted.load()));
b.add("intermediateCommits", VPackValue(info._transactionsStatistics._intermediateCommits.load()));
b.close();
auto& server = application_features::ApplicationServer::server();

View File

@ -23,25 +23,25 @@
#include "ServerStatistics.h"
#include "Statistics/StatisticsFeature.h"
#include "RestServer/MetricsFeature.h"
using namespace arangodb;
// -----------------------------------------------------------------------------
// --SECTION-- static members
// -----------------------------------------------------------------------------
ServerStatistics serverStatisticsGlobal(0);
TransactionStatistics::TransactionStatistics() :
_metrics(arangodb::MetricsFeature::metrics()),
_transactionsStarted(
_metrics->counter("arangodb_transactions_started", 0, "Transactions started")),
_transactionsAborted(
_metrics->counter("arangodb_transactions_aborted", 0, "Transactions aborted")),
_transactionsCommitted(
_metrics->counter("arangodb_transactions_committed", 0, "Transactions committed")),
_intermediateCommits(
_metrics->counter("arangodb_intermediate_commits", 0, "Intermediate commits")) {}
// -----------------------------------------------------------------------------
// --SECTION-- static public methods
// -----------------------------------------------------------------------------
ServerStatistics& ServerStatistics::statistics() {
//update the uptime for everyone reading the statistics.
serverStatisticsGlobal._uptime = StatisticsFeature::time() - serverStatisticsGlobal._startTime;
return serverStatisticsGlobal;
}
void ServerStatistics::initialize(double startTime) {
serverStatisticsGlobal._startTime = startTime;
_startTime = startTime;
}

View File

@ -26,15 +26,26 @@
#include <atomic>
#include <cstdint>
#include "RestServer/Metrics.h"
namespace arangodb {
class MetricsFeature;
}
struct TransactionStatistics {
TransactionStatistics() : _transactionsStarted(0), _transactionsAborted(0)
, _transactionsCommitted(0), _intermediateCommits(0) {}
std::atomic<std::uint64_t> _transactionsStarted;
std::atomic<std::uint64_t> _transactionsAborted;
std::atomic<std::uint64_t> _transactionsCommitted;
std::atomic<std::uint64_t> _intermediateCommits;
TransactionStatistics();
TransactionStatistics(TransactionStatistics const&) = delete;
TransactionStatistics(TransactionStatistics &&) = delete;
TransactionStatistics& operator=(TransactionStatistics const&) = delete;
TransactionStatistics& operator=(TransactionStatistics &&) = delete;
arangodb::MetricsFeature* _metrics;
Counter& _transactionsStarted;
Counter& _transactionsAborted;
Counter& _transactionsCommitted;
Counter& _intermediateCommits;
};
struct ServerStatistics {
@ -44,14 +55,15 @@ struct ServerStatistics {
ServerStatistics& operator=(ServerStatistics const&) = delete;
ServerStatistics& operator=(ServerStatistics &&) = delete;
static ServerStatistics& statistics();
static void initialize(double);
ServerStatistics& statistics();
void initialize(double);
TransactionStatistics _transactionsStatistics;
double _startTime;
std::atomic<double> _uptime;
explicit ServerStatistics(double start) : _transactionsStatistics(), _startTime(start), _uptime(0.0) {}
explicit ServerStatistics(double start) :
_transactionsStatistics(), _startTime(start), _uptime(0.0) {}
};
#endif

View File

@ -30,6 +30,7 @@
#include "Logger/LoggerStream.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/MetricsFeature.h"
#include "RestServer/SystemDatabaseFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "Statistics/ConnectionStatistics.h"
@ -186,7 +187,7 @@ void StatisticsFeature::prepare() {
STATISTICS = this;
ServerStatistics::initialize(StatisticsFeature::time());
MetricsFeature::metrics()->serverStatistics().initialize(StatisticsFeature::time());
ConnectionStatistics::initialize();
RequestStatistics::initialize();
}

View File

@ -86,6 +86,9 @@ class StatisticsFeature final : public application_features::ApplicationFeature
void prepare() override final;
void start() override final;
void stop() override final;
void toPrometheus(std::string& result, double const& now) {
_statisticsWorker->generateRawStatistics(result, now);
}
static stats::Descriptions const* descriptions() {
if (STATISTICS != nullptr) {

View File

@ -37,6 +37,7 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "RestServer/MetricsFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/TtlFeature.h"
#include "Scheduler/Scheduler.h"
@ -799,8 +800,8 @@ void StatisticsWorker::avgPercentDistributon(VPackBuilder& builder, VPackSlice c
builder.openObject();
builder.add("values", VPackValue(VPackValueType::Array));
for (auto const& n : result) {
builder.add(VPackValue(n));
for (auto const& i : result) {
builder.add(VPackValue(i));
}
builder.close();
@ -809,6 +810,257 @@ void StatisticsWorker::avgPercentDistributon(VPackBuilder& builder, VPackSlice c
builder.close();
}
std::string const TYPE_("\n\n#TYPE ");
std::string const HELP_("\n#HELP ");
// local_name: {"prometheus_name", "type", "help"}
std::map<std::string, std::vector<std::string>> statStrings{
{"bytesReceived",
{"arangodb_client_connection_statistics_bytes_received_bucket ", "gauge",
"Bytes received for a request.\n"}},
{"bytesReceivedCount",
{"arangodb_client_connection_statistics_bytes_received_count ", "gauge",
"Bytes received for a request.\n"}},
{"bytesReceivedSum",
{"arangodb_client_connection_statistics_bytes_received_sum ", "gauge",
"Bytes received for a request.\n"}},
{"bytesSent",
{"arangodb_client_connection_statistics_bytes_sent_bucket ", "gauge",
"Bytes sent for a request.\n"}},
{"bytesSentCount",
{"arangodb_client_connection_statistics_bytes_sent_count ", "gauge",
"Bytes sent for a request.\n"}},
{"bytesSentSum",
{"arangodb_client_connection_statistics_bytes_sent_sum ", "gauge",
"Bytes sent for a request.\n"}},
{"minorPageFaults",
{"arangodb_process_statistics_minor_page_faults ", "gauge",
"The number of minor faults the process has made which have not required loading a memory page from disk. This figure is not reported on Windows.\n"}},
{"majorPageFaults",
{"arangodb_process_statistics_major_page_faults ", "gauge",
"On Windows, this figure contains the total number of page faults. On other system, this figure contains the number of major faults the process has made which have required loading a memory page from disk.\n"}},
{"bytesReceived",
{"arangodb_client_connection_statistics_bytes_received_bucket ", "gauge",
"Bytes received for a request"}},
{"userTime",
{"arangodb_process_statistics_user_time ", "gauge",
"On Windows, this figure contains the total amount of memory that the memory manager has committed for the arangod process. On other systems, this figure contains The size of the virtual memory the process is using.\n"}},
{"systemTime",
{"arangodb_process_statistics_system_time ", "gauge",
"Amount of time that this process has been scheduled in kernel mode, measured in seconds.\n"}},
{"numberOfThreads",
{"arangodb_process_statistics_number_of_threads ", "gauge",
"Number of threads in the arangod process.\n"}},
{"residentSize",
{"arangodb_process_statistics_resident_set_size ", "gauge", "The total size of the number of pages the process has in real memory. This is just the pages which count toward text, data, or stack space. This does not include pages which have not been demand-loaded in, or which are swapped out. The resident set size is reported in bytes.\n"}},
{"residentSizePercent",
{"arangodb_process_statistics_resident_set_size_percent ", "gauge", "The relative size of the number of pages the process has in real memory compared to system memory. This is just the pages which count toward text, data, or stack space. This does not include pages which have not been demand-loaded in, or which are swapped out. The value is a ratio between 0.00 and 1.00.\n"}},
{"virtualSize",
{"arangodb_process_statistics_virtual_memory_size ", "gauge", "On Windows, this figure contains the total amount of memory that the memory manager has committed for the arangod process. On other systems, this figure contains The size of the virtual memory the process is using.\n"}},
{"clientHttpConnections",
{"arangodb_client_connection_statistics_client_connections ", "guage",
"The number of client connections that are currently open.\n"}},
{"connectionTimeCounts",
{"arangodb_client_connection_statistics_connection_time_bucket", "gauge",
"Total connection time of a client.\n"}},
{"connectionTimeCount",
{"arangodb_client_connection_statistics_connection_time_count ", "gauge",
"Total connection time of a client.\n"}},
{"connectionTimeSum",
{"arangodb_client_connection_statistics_connection_time_sum ", "gauge",
"Total connection time of a client.\n"}}
/*{"",
{"", "",
""}}*/
};
void StatisticsWorker::generateRawStatistics(std::string& result, double const& now) {
ProcessInfo info = TRI_ProcessInfoSelf();
uint64_t rss = static_cast<uint64_t>(info._residentSize);
double rssp = 0;
if (TRI_PhysicalMemory != 0) {
rssp = static_cast<double>(rss) / static_cast<double>(TRI_PhysicalMemory);
}
StatisticsCounter httpConnections;
StatisticsCounter totalRequests;
std::array<StatisticsCounter, MethodRequestsStatisticsSize> methodRequests;
StatisticsCounter asyncRequests;
StatisticsDistribution connectionTime;
ConnectionStatistics::fill(httpConnections, totalRequests, methodRequests,
asyncRequests, connectionTime);
StatisticsDistribution totalTime;
StatisticsDistribution requestTime;
StatisticsDistribution queueTime;
StatisticsDistribution ioTime;
StatisticsDistribution bytesSent;
StatisticsDistribution bytesReceived;
RequestStatistics::fill(totalTime, requestTime, queueTime, ioTime, bytesSent, bytesReceived, stats::RequestStatisticsSource::ALL);
// processStatistics()
result +=
TYPE_ + statStrings.at("minorPageFaults")[0] + statStrings.at("minorPageFaults")[1] +
HELP_ + statStrings.at("minorPageFaults")[0] + statStrings.at("minorPageFaults")[2] +
statStrings.at("minorPageFaults")[0] + std::to_string(info._minorPageFaults);
result +=
TYPE_ + statStrings.at("majorPageFaults")[0] + statStrings.at("majorPageFaults")[1] +
HELP_ + statStrings.at("majorPageFaults")[0] + statStrings.at("majorPageFaults")[2] +
statStrings.at("majorPageFaults")[0] + std::to_string(info._majorPageFaults);
if (info._scClkTck != 0) { // prevent division by zero
result +=
TYPE_ + statStrings.at("userTime")[0] + statStrings.at("userTime")[1] +
HELP_ + statStrings.at("userTime")[0] + statStrings.at("userTime")[2] +
statStrings.at("userTime")[0] +
std::to_string(static_cast<double>(info._userTime) / static_cast<double>(info._scClkTck));
result +=
TYPE_ + statStrings.at("systemTime")[0] + statStrings.at("systemTime")[1] +
HELP_ + statStrings.at("systemTime")[0] + statStrings.at("systemTime")[2] +
statStrings.at("systemTime")[0] +
std::to_string(static_cast<double>(info._systemTime) / static_cast<double>(info._scClkTck));
}
result +=
TYPE_ + statStrings.at("numberOfThreads")[0] + statStrings.at("numberOfThreads")[1] +
HELP_ + statStrings.at("numberOfThreads")[0] + statStrings.at("numberOfThreads")[2] +
statStrings.at("numberOfThreads")[0] + std::to_string(info._numberThreads);
result +=
TYPE_ + statStrings.at("residentSize")[0] + statStrings.at("residentSize")[1] +
HELP_ + statStrings.at("residentSize")[0] + statStrings.at("residentSize")[2] +
statStrings.at("residentSize")[0] + std::to_string(rss);
result +=
TYPE_ + statStrings.at("residentSizePercent")[0] + statStrings.at("residentSizePercent")[1] +
HELP_ + statStrings.at("residentSizePercent")[0] + statStrings.at("residentSizePercent")[2] +
statStrings.at("residentSizePercent")[0] + std::to_string(rssp);
result +=
TYPE_ + statStrings.at("virtualSize")[0] + statStrings.at("virtualSize")[1] +
HELP_ + statStrings.at("virtualSize")[0] + statStrings.at("virtualSize")[2] +
statStrings.at("virtualSize")[0] + std::to_string(info._virtualSize);
// _clientStatistics()
result +=
TYPE_ + statStrings.at("clientHttpConnections")[0] + statStrings.at("clientHttpConnections")[1] +
HELP_ + statStrings.at("clientHttpConnections")[0] + statStrings.at("clientHttpConnections")[2] +
statStrings.at("clientHttpConnections")[0] + std::to_string(httpConnections._count);
VPackBuilder tmp = fillDistribution(connectionTime);
VPackSlice slc = tmp.slice();
result +=
TYPE_ + statStrings.at("connectionTimeCounts")[0] + statStrings.at("connectionTimeCounts")[1] +
HELP_ + statStrings.at("connectionTimeCounts")[0] + statStrings.at("connectionTimeCounts")[2] +
statStrings.at("connectionTimeCounts")[0] + "{le=\"0.1\"}" + " " +
std::to_string(slc.get("counts").at(0).getNumber<uint64_t>()) + "\n" +
statStrings.at("connectionTimeCounts")[0] + "{le=\"1\"}" + " " +
std::to_string(slc.get("counts").at(1).getNumber<uint64_t>()) + "\n" +
statStrings.at("connectionTimeCounts")[0] + "{le=\"60\"}" + " " +
std::to_string(slc.get("counts").at(1).getNumber<uint64_t>()) + "\n" +
statStrings.at("connectionTimeCounts")[0] + "{le=\"+Inf\"}" + " " +
std::to_string(slc.get("counts").at(3).getNumber<uint64_t>());
result +=
TYPE_ + statStrings.at("connectionTimeCount")[0] + statStrings.at("connectionTimeCount")[1] +
HELP_ + statStrings.at("connectionTimeCount")[0] + statStrings.at("connectionTimeCount")[2] +
statStrings.at("connectionTimeCount")[0] + std::to_string(slc.get("count").template getNumber<uint64_t>());
result +=
TYPE_ + statStrings.at("connectionTimeSum")[0] + statStrings.at("connectionTimeSum")[1] +
HELP_ + statStrings.at("connectionTimeSum")[0] + statStrings.at("connectionTimeSum")[2] +
statStrings.at("connectionTimeSum")[0] + std::to_string(slc.get("sum").template getNumber<uint64_t>());
result += "\n";
/*
// _clientStatistics()
tmp = fillDistribution(totalTime);
builder.add("totalTime", tmp.slice());
tmp = fillDistribution(requestTime);
builder.add("requestTime", tmp.slice());
tmp = fillDistribution(queueTime);
builder.add("queueTime", tmp.slice());
tmp = fillDistribution(ioTime);
builder.add("ioTime", tmp.slice());
tmp = fillDistribution(bytesSent);
builder.add("bytesSent", tmp.slice());
tmp = fillDistribution(bytesReceived);
builder.add("bytesReceived", tmp.slice());
builder.close();
// _httpStatistics()
builder.add("http", VPackValue(VPackValueType::Object));
builder.add("requestsTotal", VPackValue(totalRequests._count));
builder.add("requestsAsync", VPackValue(asyncRequests._count));
builder.add("requestsGet",
VPackValue(methodRequests[(int)rest::RequestType::GET]._count));
builder.add("requestsHead",
VPackValue(methodRequests[(int)rest::RequestType::HEAD]._count));
builder.add("requestsPost",
VPackValue(methodRequests[(int)rest::RequestType::POST]._count));
builder.add("requestsPut",
VPackValue(methodRequests[(int)rest::RequestType::PUT]._count));
builder.add("requestsPatch",
VPackValue(methodRequests[(int)rest::RequestType::PATCH]._count));
builder.add("requestsDelete",
VPackValue(methodRequests[(int)rest::RequestType::DELETE_REQ]._count));
builder.add("requestsOptions",
VPackValue(methodRequests[(int)rest::RequestType::OPTIONS]._count));
builder.add("requestsOther",
VPackValue(methodRequests[(int)rest::RequestType::ILLEGAL]._count));
builder.close();
// _serverStatistics()
builder.add("server", VPackValue(VPackValueType::Object));
builder.add("physicalMemory", VPackValue(TRI_PhysicalMemory));
builder.add("transactions", VPackValue(VPackValueType::Object));
builder.close();
// export v8 statistics
builder.add("v8Context", VPackValue(VPackValueType::Object));
V8DealerFeature::Statistics v8Counters{};
// std::vector<V8DealerFeature::MemoryStatistics> memoryStatistics;
// V8 may be turned off on a server
if (_server.hasFeature<V8DealerFeature>()) {
V8DealerFeature& dealer = _server.getFeature<V8DealerFeature>();
if (dealer.isEnabled()) {
v8Counters = dealer.getCurrentContextNumbers();
// see below: memoryStatistics = dealer.getCurrentMemoryNumbers();
}
}
builder.add("available", VPackValue(v8Counters.available));
builder.add("busy", VPackValue(v8Counters.busy));
builder.add("dirty", VPackValue(v8Counters.dirty));
builder.add("free", VPackValue(v8Counters.free));
builder.add("max", VPackValue(v8Counters.max));
builder.close();
// export threads statistics
builder.add("threads", VPackValue(VPackValueType::Object));
SchedulerFeature::SCHEDULER->toVelocyPack(builder);
builder.close();
// export ttl statistics
TtlFeature& ttlFeature = _server.getFeature<TtlFeature>();
builder.add(VPackValue("ttl"));
ttlFeature.statsToVelocyPack(builder);
builder.close();
builder.close();
*/
}
void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const& now) {
ProcessInfo info = TRI_ProcessInfoSelf();
uint64_t rss = static_cast<uint64_t>(info._residentSize);
@ -836,7 +1088,7 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const
RequestStatistics::fill(totalTime, requestTime, queueTime, ioTime, bytesSent, bytesReceived, stats::RequestStatisticsSource::ALL);
ServerStatistics const& serverInfo = ServerStatistics::statistics();
ServerStatistics const& serverInfo = MetricsFeature::metrics()->serverStatistics();
builder.openObject();
if (!_clusterId.empty()) {
@ -851,10 +1103,10 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const
builder.add("majorPageFaults", VPackValue(info._majorPageFaults));
if (info._scClkTck != 0) {
// prevent division by zero
builder.add("userTime", VPackValue(static_cast<double>(info._userTime) /
static_cast<double>(info._scClkTck)));
builder.add("systemTime", VPackValue(static_cast<double>(info._systemTime) /
static_cast<double>(info._scClkTck)));
builder.add("userTime", VPackValue(
static_cast<double>(info._userTime) / static_cast<double>(info._scClkTck)));
builder.add("systemTime", VPackValue(
static_cast<double>(info._systemTime) / static_cast<double>(info._scClkTck)));
}
builder.add("numberOfThreads", VPackValue(info._numberThreads));
builder.add("residentSize", VPackValue(rss));
@ -915,10 +1167,10 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const
builder.add("uptime", VPackValue(serverInfo._uptime));
builder.add("physicalMemory", VPackValue(TRI_PhysicalMemory));
builder.add("transactions", VPackValue(VPackValueType::Object));
builder.add("started", VPackValue(serverInfo._transactionsStatistics._transactionsStarted));
builder.add("aborted", VPackValue(serverInfo._transactionsStatistics._transactionsAborted));
builder.add("committed", VPackValue(serverInfo._transactionsStatistics._transactionsCommitted));
builder.add("intermediateCommits", VPackValue(serverInfo._transactionsStatistics._intermediateCommits));
builder.add("started", VPackValue(serverInfo._transactionsStatistics._transactionsStarted.load()));
builder.add("aborted", VPackValue(serverInfo._transactionsStatistics._transactionsAborted.load()));
builder.add("committed", VPackValue(serverInfo._transactionsStatistics._transactionsCommitted.load()));
builder.add("intermediateCommits", VPackValue(serverInfo._transactionsStatistics._intermediateCommits.load()));
builder.close();
// export v8 statistics

View File

@ -41,6 +41,7 @@ class StatisticsWorker final : public Thread {
void run() override;
void beginShutdown() override;
void generateRawStatistics(std::string& result, double const& now);
private:
// removes old statistics

View File

@ -413,6 +413,8 @@ class StorageEngine : public application_features::ApplicationFeature {
builder.close();
}
virtual void getStatistics(std::string& result) const {}
// management methods for synchronizing with external persistent stores
virtual TRI_voc_tick_t currentTick() const = 0;
virtual TRI_voc_tick_t releasedTick() const = 0;

View File

@ -445,6 +445,7 @@ v8::Handle<v8::Object> TRI_RequestCppToV8(v8::Isolate* isolate,
optionsWithUniquenessCheck.checkAttributeUniqueness = true;
auto parsed = request->payload(&optionsWithUniquenessCheck);
if (parsed.isObject() || parsed.isArray()) {
request->setDefaultContentType();
digesteable = true;
}
} catch ( ... ) {}

View File

@ -28,6 +28,7 @@
#include "Basics/StringUtils.h"
#include "Basics/process-utils.h"
#include "Rest/GeneralRequest.h"
#include "RestServer/MetricsFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Statistics/ConnectionStatistics.h"
@ -98,7 +99,7 @@ static void JS_ServerStatistics(v8::FunctionCallbackInfo<v8::Value> const& args)
TRI_V8_TRY_CATCH_BEGIN(isolate)
v8::HandleScope scope(isolate);
ServerStatistics const& info = ServerStatistics::statistics();
ServerStatistics const& info = MetricsFeature::metrics()->serverStatistics();
v8::Handle<v8::Object> result = v8::Object::New(isolate);
@ -111,13 +112,13 @@ static void JS_ServerStatistics(v8::FunctionCallbackInfo<v8::Value> const& args)
auto const& ts = info._transactionsStatistics;
v8::Handle<v8::Object> v8TransactionInfoObj = v8::Object::New(isolate);
v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "started"),
v8::Number::New(isolate, (double)ts._transactionsStarted));
v8::Number::New(isolate, (double)ts._transactionsStarted.load()));
v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "aborted"),
v8::Number::New(isolate, (double)ts._transactionsAborted));
v8::Number::New(isolate, (double)ts._transactionsAborted.load()));
v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "committed"),
v8::Number::New(isolate, (double)ts._transactionsCommitted));
v8::Number::New(isolate, (double)ts._transactionsCommitted.load()));
v8TransactionInfoObj->Set(TRI_V8_ASCII_STRING(isolate, "intermediateCommits"),
v8::Number::New(isolate, (double)ts._intermediateCommits));
v8::Number::New(isolate, (double)ts._intermediateCommits.load()));
result->Set(TRI_V8_ASCII_STRING(isolate, "transactions"), v8TransactionInfoObj);
// v8 counters

View File

@ -202,6 +202,7 @@ class GeneralRequest {
return std::make_shared<velocypack::Builder>(payload());
};
virtual void setDefaultContentType() = 0;
/// @brieg should reflect the Content-Type header
ContentType contentType() const { return _contentType; }
/// @brief should generally reflect the Accept header

View File

@ -79,6 +79,9 @@ class HttpRequest final : public GeneralRequest {
return _cookies;
}
virtual void setDefaultContentType() override {
_contentType = rest::ContentType::JSON;
}
/// @brief the body content length
size_t contentLength() const override { return _contentLength; }
// Payload

View File

@ -60,6 +60,10 @@ class VstRequest final : public GeneralRequest {
arangodb::velocypack::StringRef rawPayload() const override;
velocypack::Slice payload(arangodb::velocypack::Options const*) override;
virtual void setDefaultContentType() override {
_contentType = rest::ContentType::VPACK;
}
arangodb::Endpoint::TransportType transportType() override {
return arangodb::Endpoint::TransportType::VST;
};

View File

@ -351,6 +351,77 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_slices) {
}
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_with_ranges) {
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 3, 2)};
block->emplaceValue(0, 0, dummyData(4));
block->emplaceValue(0, 1, dummyData(5));
block->emplaceValue(1, 0, dummyData(0));
block->emplaceValue(1, 1, dummyData(1));
block->emplaceValue(2, 0, dummyData(2));
block->emplaceValue(2, 1, dummyData(3));
{
// test range 0->1
VPackBuilder result;
result.openObject();
block->toVelocyPack(0, 1, nullptr, result);
ASSERT_TRUE(result.isOpenObject());
result.close();
SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice());
// Check exposed attributes
EXPECT_EQ(testee->size(), 1);
EXPECT_EQ(testee->getNrRegs(), block->getNrRegs());
// check data
compareWithDummy(testee, 0, 0, 4);
compareWithDummy(testee, 0, 1, 5);
assertShadowRowIndexes(testee, {});
}
{
// Test range 1->2
VPackBuilder result;
result.openObject();
block->toVelocyPack(1, 2, nullptr, result);
ASSERT_TRUE(result.isOpenObject());
result.close();
SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice());
// Check exposed attributes
EXPECT_EQ(testee->size(), 1);
EXPECT_EQ(testee->getNrRegs(), block->getNrRegs());
// check data
compareWithDummy(testee, 0, 0, 0);
compareWithDummy(testee, 0, 1, 1);
assertShadowRowIndexes(testee, {});
}
{
// Test range 0->2
VPackBuilder result;
result.openObject();
block->toVelocyPack(0, 2, nullptr, result);
ASSERT_TRUE(result.isOpenObject());
result.close();
SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice());
// Check exposed attributes
EXPECT_EQ(testee->size(), 2);
EXPECT_EQ(testee->getNrRegs(), block->getNrRegs());
// check data
compareWithDummy(testee, 0, 0, 4);
compareWithDummy(testee, 0, 1, 5);
compareWithDummy(testee, 1, 0, 0);
compareWithDummy(testee, 1, 1, 1);
assertShadowRowIndexes(testee, {});
}
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_input_row) {
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 2, 2)};
block->emplaceValue(0, 0, dummyData(4));

View File

@ -42,6 +42,9 @@ struct GeneralRequestMock: public arangodb::GeneralRequest {
~GeneralRequestMock();
using arangodb::GeneralRequest::addSuffix;
virtual size_t contentLength() const override;
virtual void setDefaultContentType() override {
_contentType = arangodb::rest::ContentType::VPACK;
}
virtual arangodb::velocypack::StringRef rawPayload() const override;
virtual arangodb::velocypack::Slice payload(arangodb::velocypack::Options const* options = &arangodb::velocypack::Options::Defaults) override;
virtual arangodb::Endpoint::TransportType transportType() override;