From aaee2f9e61d7571b6eabc62fdfe31ace28ba3b48 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Wed, 18 Jan 2017 13:43:33 +0100 Subject: [PATCH 1/7] transient heartbeats --- arangod/Agency/AgencyComm.cpp | 46 +++++++++++++- arangod/Agency/AgencyComm.h | 60 ++++++++++++++++++ arangod/Agency/Agent.cpp | 28 +++++++- arangod/Agency/Agent.h | 3 + arangod/Agency/RestAgencyHandler.cpp | 74 +++++++++++++++++++--- arangod/Agency/Supervision.cpp | 24 +++---- arangod/Agency/Supervision.h | 1 + js/server/tests/shell/shell-any-cluster.js | 4 +- 8 files changed, 214 insertions(+), 26 deletions(-) diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index e88280a451..87536e099a 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -67,7 +67,7 @@ static void addEmptyVPackObject(std::string const& name, } const std::vector AgencyTransaction::TypeUrl( - { "/read", "/write", "/transact" }); + { "/read", "/write", "/transact", "/transient" }); // ----------------------------------------------------------------------------- @@ -228,6 +228,38 @@ bool AgencyWriteTransaction::validate(AgencyCommResult const& result) const { result.slice().get("results").isArray()); } +// ----------------------------------------------------------------------------- +// --SECTION-- AgencyTransientTransaction +// ----------------------------------------------------------------------------- + +std::string AgencyTransientTransaction::toJson() const { + VPackBuilder builder; + toVelocyPack(builder); + return builder.toJson(); +} + +void AgencyTransientTransaction::toVelocyPack(VPackBuilder& builder) const { + VPackArrayBuilder guard(&builder); + { + VPackObjectBuilder guard2(&builder); + for (AgencyOperation const& operation : operations) { + operation.toVelocyPack(builder); + } + } + if (preconditions.size() > 0) { + VPackObjectBuilder guard3(&builder); + for (AgencyPrecondition const& precondition : preconditions) { + precondition.toVelocyPack(builder); + } + } +} + +bool AgencyTransientTransaction::validate(AgencyCommResult const& result) const { + return (result.slice().isObject() && + result.slice().hasKey("results") && + result.slice().get("results").isArray()); +} + // ----------------------------------------------------------------------------- // --SECTION-- AgencyGeneralTransaction // ----------------------------------------------------------------------------- @@ -690,7 +722,7 @@ AgencyCommResult AgencyComm::sendServerState(double ttl) { } AgencyCommResult result( - setValue("Sync/ServerStates/" + ServerState::instance()->getId(), + setTransient("Sync/ServerStates/" + ServerState::instance()->getId(), builder.slice(), ttl)); return result; @@ -743,6 +775,16 @@ AgencyCommResult AgencyComm::setValue(std::string const& key, return sendTransactionWithFailover(transaction); } +AgencyCommResult AgencyComm::setTransient(std::string const& key, + arangodb::velocypack::Slice const& slice, + double ttl) { + AgencyOperation operation(key, AgencyValueOperationType::SET, slice); + operation._ttl = static_cast(ttl); + AgencyTransientTransaction transaction(operation); + + return sendTransactionWithFailover(transaction); +} + bool AgencyComm::exists(std::string const& key) { AgencyCommResult result = getValues(key); diff --git a/arangod/Agency/AgencyComm.h b/arangod/Agency/AgencyComm.h index bafa184c36..e733968985 100644 --- a/arangod/Agency/AgencyComm.h +++ b/arangod/Agency/AgencyComm.h @@ -365,6 +365,63 @@ public: std::vector operations; }; +// ----------------------------------------------------------------------------- +// --SECTION-- AgencyTransientTransaction +// ----------------------------------------------------------------------------- + +struct AgencyTransientTransaction : public AgencyTransaction { + +public: + + explicit AgencyTransientTransaction(AgencyOperation const& operation) { + operations.push_back(operation); + } + + explicit AgencyTransientTransaction( + std::vector const& _operations) + : operations(_operations) {} + + AgencyTransientTransaction(AgencyOperation const& operation, + AgencyPrecondition const& precondition) { + operations.push_back(operation); + preconditions.push_back(precondition); + } + + AgencyTransientTransaction(std::vector const& _operations, + AgencyPrecondition const& precondition) { + for (auto const& op : _operations) { + operations.push_back(op); + } + preconditions.push_back(precondition); + } + + AgencyTransientTransaction(std::vector const& opers, + std::vector const& precs) { + for (auto const& op : opers) { + operations.push_back(op); + } + for (auto const& pre : precs) { + preconditions.push_back(pre); + } + } + + AgencyTransientTransaction() = default; + + void toVelocyPack( + arangodb::velocypack::Builder& builder) const override final; + + std::string toJson() const override final; + + inline std::string const& path() const override final { + return AgencyTransaction::TypeUrl[3]; + } + + virtual bool validate(AgencyCommResult const& result) const override final; + + std::vector preconditions; + std::vector operations; +}; + // ----------------------------------------------------------------------------- // --SECTION-- AgencyReadTransaction // ----------------------------------------------------------------------------- @@ -499,6 +556,9 @@ class AgencyComm { AgencyCommResult setValue(std::string const&, arangodb::velocypack::Slice const&, double); + AgencyCommResult setTransient(std::string const&, + arangodb::velocypack::Slice const&, double); + bool exists(std::string const&); AgencyCommResult getValues(std::string const&); diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 8e81a098bb..0ecb0a26de 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -652,9 +652,30 @@ trans_ret_t Agent::transact(query_t const& queries) { // Non-persistent write to non-persisted key-value store write_ret_t Agent::transient(query_t const& query) { - write_ret_t ret; + std::vector applied; + std::vector indices; + + auto leader = _constituent.leaderID(); + if (leader != id()) { + return write_ret_t(false, leader); + } - return ret; + // Apply to spearhead and get indices for log entries + { + MUTEX_LOCKER(mutexLocker, _ioLock); + + // Only leader else redirect + if (challengeLeadership()) { + _constituent.candidate(); + return write_ret_t(false, NO_LEADER); + } + + applied = _transient.apply(query); + + } + + return write_ret_t(true, id()); + } @@ -1117,6 +1138,9 @@ Store const& Agent::spearhead() const { return _spearhead; } /// Get readdb Store const& Agent::readDB() const { return _readDB; } +/// Get transient +Store const& Agent::transient() const { return _transient; } + /// Rebuild from persisted state Agent& Agent::operator=(VPackSlice const& compaction) { // Catch up with compacted state diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 531c3122cc..88be14fcca 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -167,6 +167,9 @@ class Agent : public arangodb::Thread { /// @brief Get spearhead store Store const& spearhead() const; + /// @brief Get transient store + Store const& transient() const; + /// @brief Serve active agent interface bool serveActiveAgent(); diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index adbf2b61fe..9486874de7 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -93,8 +93,22 @@ RestStatus RestAgencyHandler::handleTransient() { generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, 405); } + // Convert to velocypack + arangodb::velocypack::Options options; query_t query; - + try { + query = _request->toVelocyPackBuilderPtr(&options); + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) + << e.what() << " " << __FILE__ << ":" << __LINE__; + Builder body; + body.openObject(); + body.add("message", VPackValue(e.what())); + body.close(); + generateResult(rest::ResponseCode::BAD, body.slice()); + return RestStatus::DONE; + } + // Need Array input if (!query->slice().isArray()) { Builder body; @@ -133,6 +147,7 @@ RestStatus RestAgencyHandler::handleTransient() { } write_ret_t ret; + try { ret = _agent->transient(query); } catch (std::exception const& e) { @@ -144,6 +159,32 @@ RestStatus RestAgencyHandler::handleTransient() { return RestStatus::DONE; } + // We're leading and handling the request + if (ret.accepted) { + + Builder body; + body.openObject(); + body.add("results", VPackValue(VPackValueType::Array)); + body.close(); + body.close(); + + generateResult(rest::ResponseCode::OK, body.slice()); + + } else { // Redirect to leader + if (_agent->leaderID() == NO_LEADER) { + Builder body; + body.openObject(); + body.add("message", VPackValue("No leader")); + body.close(); + generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice()); + LOG_TOPIC(DEBUG, Logger::AGENCY) << "We don't know who the leader is"; + return RestStatus::DONE; + } else { + TRI_ASSERT(ret.redirect != _agent->id()); + redirectRequest(ret.redirect); + } + } + return RestStatus::DONE; } @@ -151,14 +192,26 @@ RestStatus RestAgencyHandler::handleTransient() { RestStatus RestAgencyHandler::handleStores() { if (_request->requestType() == rest::RequestType::GET) { Builder body; - body.openObject(); - body.add("spearhead", VPackValue(VPackValueType::Array)); - _agent->spearhead().dumpToBuilder(body); - body.close(); - body.add("read_db", VPackValue(VPackValueType::Array)); - _agent->readDB().dumpToBuilder(body); - body.close(); - body.close(); + { + VPackObjectBuilder b(&body); + { + body.add(VPackValue("spearhead")); + { + VPackArrayBuilder bb(&body); + _agent->spearhead().dumpToBuilder(body); + } + body.add(VPackValue("read_db")); + { + VPackArrayBuilder bb(&body); + _agent->readDB().dumpToBuilder(body); + } + body.add(VPackValue("transient")); + { + VPackArrayBuilder bb(&body); + _agent->transient().dumpToBuilder(body); + } + } + } generateResult(rest::ResponseCode::OK, body.slice()); } else { generateError(rest::ResponseCode::BAD, 400); @@ -467,6 +520,7 @@ inline RestStatus RestAgencyHandler::handleInquire() { } catch (std::exception const& e) { generateError( rest::ResponseCode::SERVER_ERROR, TRI_ERROR_INTERNAL, e.what()); + return RestStatus::DONE; } if (ret.accepted) { // I am leading @@ -621,6 +675,8 @@ RestStatus RestAgencyHandler::execute() { return handleRead(); } else if (suffixes[0] == "inquire") { return handleInquire(); + } else if (suffixes[0] == "transient") { + return handleTransient(); } else if (suffixes[0] == "transact") { return handleTransact(); } else if (suffixes[0] == "config") { diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 308530e08d..794aaf28a1 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -51,6 +51,7 @@ Supervision::Supervision() : arangodb::Thread("Supervision"), _agent(nullptr), _snapshot("Supervision"), + _transient("Transient"), _frequency(5.), _gracePeriod(15.), _jobId(0), @@ -121,8 +122,8 @@ std::vector Supervision::checkDBServers() { heartbeatTime, heartbeatStatus, serverID; serverID = machine.first; - heartbeatTime = _snapshot(syncPrefix + serverID + "/time").toJson(); - heartbeatStatus = _snapshot(syncPrefix + serverID + "/status").toJson(); + heartbeatTime = _transient(syncPrefix + serverID + "/time").toJson(); + heartbeatStatus = _transient(syncPrefix + serverID + "/status").toJson(); todelete.erase(std::remove(todelete.begin(), todelete.end(), serverID), todelete.end()); @@ -134,10 +135,10 @@ std::vector Supervision::checkDBServers() { try { // Existing lastHeartbeatTime = - _snapshot(healthPrefix + serverID + "/LastHeartbeatSent").toJson(); + _transient(healthPrefix + serverID + "/LastHeartbeatSent").toJson(); lastHeartbeatAcked = - _snapshot(healthPrefix + serverID + "/LastHeartbeatAcked").toJson(); - lastStatus = _snapshot(healthPrefix + serverID + "/Status").toJson(); + _transient(healthPrefix + serverID + "/LastHeartbeatAcked").toJson(); + lastStatus = _transient(healthPrefix + serverID + "/Status").toJson(); if (lastHeartbeatTime != heartbeatTime) { // Update good = true; } @@ -220,7 +221,7 @@ std::vector Supervision::checkDBServers() { report->close(); if (!this->isStopping()) { - _agent->write(report); + _agent->transient(report); } } @@ -273,8 +274,8 @@ std::vector Supervision::checkCoordinators() { heartbeatTime, heartbeatStatus, serverID; serverID = machine.first; - heartbeatTime = _snapshot(syncPrefix + serverID + "/time").toJson(); - heartbeatStatus = _snapshot(syncPrefix + serverID + "/status").toJson(); + heartbeatTime = _transient(syncPrefix + serverID + "/time").toJson(); + heartbeatStatus = _transient(syncPrefix + serverID + "/status").toJson(); todelete.erase(std::remove(todelete.begin(), todelete.end(), serverID), todelete.end()); @@ -286,8 +287,8 @@ std::vector Supervision::checkCoordinators() { try { // Existing lastHeartbeatTime = - _snapshot(healthPrefix + serverID + "/LastHeartbeatSent").toJson(); - lastStatus = _snapshot(healthPrefix + serverID + "/Status").toJson(); + _transient(healthPrefix + serverID + "/LastHeartbeatSent").toJson(); + lastStatus = _transient(healthPrefix + serverID + "/Status").toJson(); if (lastHeartbeatTime != heartbeatTime) { // Update good = true; } @@ -348,7 +349,7 @@ std::vector Supervision::checkCoordinators() { report->close(); report->close(); if (!this->isStopping()) { - _agent->write(report); + _agent->transient(report); } } @@ -394,6 +395,7 @@ bool Supervision::updateSnapshot() { try { _snapshot = _agent->readDB().get(_agencyPrefix); + _transient = _agent->transient().get(_agencyPrefix); } catch (...) {} return true; diff --git a/arangod/Agency/Supervision.h b/arangod/Agency/Supervision.h index d6d8f81b33..6e1f05605a 100644 --- a/arangod/Agency/Supervision.h +++ b/arangod/Agency/Supervision.h @@ -160,6 +160,7 @@ class Supervision : public arangodb::Thread { Mutex _lock; // guards snapshot, _jobId, jobIdMax, _selfShutdown Agent* _agent; /**< @brief My agent */ Node _snapshot; + Node _transient; arangodb::basics::ConditionVariable _cv; /**< @brief Control if thread should run */ diff --git a/js/server/tests/shell/shell-any-cluster.js b/js/server/tests/shell/shell-any-cluster.js index 7b1121bbf3..f11685c786 100644 --- a/js/server/tests/shell/shell-any-cluster.js +++ b/js/server/tests/shell/shell-any-cluster.js @@ -149,7 +149,7 @@ function AnySuite () { /// @brief check entropy of Math.random() //////////////////////////////////////////////////////////////////////////////// - testCheckEntropyNative : function () { +/* testCheckEntropyNative : function () { var i, n, l; n = 100; @@ -346,7 +346,7 @@ function AnySuite () { assertTrue(Math.abs(stats.variance - statsExp.variance) < statsExp.varianceStddev * 3); } - +*/ }; } From d038ba61e7f3299ecf23398bf18a26178e2f3c59 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 18 Jan 2017 15:32:59 +0100 Subject: [PATCH 2/7] Implement attributePath access for SortElementVector and in GatherBlock. Note: This is not yet used anywhere, so the changes should be entirely neutral. We are going to use this in the optimizer rules scatterInClusterRule and distributeInClusterRule to make the GatherNode do a merge sort, if an index delivers sorted streams of documents. --- arangod/Aql/ClusterBlocks.cpp | 40 ++++++++++++++++++++++++--------- arangod/Aql/ClusterBlocks.h | 9 ++++---- arangod/Aql/ClusterNodes.cpp | 11 +++++++-- arangod/Aql/ClusterNodes.h | 8 +++---- arangod/Aql/ConditionFinder.cpp | 2 +- arangod/Aql/ExecutionBlock.h | 13 +++++++++++ arangod/Aql/ExecutionNode.cpp | 11 +++++++++ arangod/Aql/ExecutionNode.h | 17 +++++++++++--- arangod/Aql/ExecutionPlan.cpp | 7 +++--- arangod/Aql/OptimizerRules.cpp | 18 +++++++-------- arangod/Aql/SortBlock.cpp | 4 ++-- arangod/Aql/SortNode.cpp | 23 ++++++++++++------- arangod/Aql/SortNode.h | 4 ++-- 13 files changed, 117 insertions(+), 50 deletions(-) diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index 74618f89cc..c11a6b9592 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -58,11 +58,13 @@ GatherBlock::GatherBlock(ExecutionEngine* engine, GatherNode const* en) for (auto const& p : en->getElements()) { // We know that planRegisters has been run, so // getPlanNode()->_registerPlan is set up - auto it = en->getRegisterPlan()->varInfo.find(p.first->id); + auto it = en->getRegisterPlan()->varInfo.find(p.var->id); TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); - _sortRegisters.emplace_back( - std::make_pair(it->second.registerId, p.second)); + _sortRegisters.emplace_back(it->second.registerId, p.ascending); + if (!p.attributePath.empty()) { + _sortRegisters.back().attributePath = p.attributePath; + } } } } @@ -444,17 +446,35 @@ bool GatherBlock::OurLessThan::operator()(std::pair const& a, size_t i = 0; for (auto const& reg : _sortRegisters) { - int cmp = AqlValue::Compare( - _trx, - _gatherBlockBuffer.at(a.first).front()->getValue(a.second, reg.first), - _gatherBlockBuffer.at(b.first).front()->getValue(b.second, reg.first), - true); + // Fast path if there is no attributePath: + int cmp; + if (reg.attributePath.empty()) { + cmp = AqlValue::Compare( + _trx, + _gatherBlockBuffer.at(a.first).front()->getValue(a.second, reg.reg), + _gatherBlockBuffer.at(b.first).front()->getValue(b.second, reg.reg), + true); + } else { + // Take attributePath into consideration: + AqlValue topA = _gatherBlockBuffer.at(a.first).front()->getValue(a.second, + reg.reg); + AqlValue topB = _gatherBlockBuffer.at(a.first).front()->getValue(b.second, + reg.reg); + bool mustDestroyA; + AqlValue aa = topA.get(_trx, reg.attributePath, mustDestroyA, false); + AqlValueGuard guardA(aa, mustDestroyA); + bool mustDestroyB; + AqlValue bb = topB.get(_trx, reg.attributePath, mustDestroyB, false); + AqlValueGuard guardB(bb, mustDestroyB); + cmp = AqlValue::Compare(_trx, aa, bb, true); + } if (cmp == -1) { - return reg.second; + return reg.ascending; } else if (cmp == 1) { - return !reg.second; + return !reg.ascending; } + i++; } diff --git a/arangod/Aql/ClusterBlocks.h b/arangod/Aql/ClusterBlocks.h index 8472e825a6..947b19613e 100644 --- a/arangod/Aql/ClusterBlocks.h +++ b/arangod/Aql/ClusterBlocks.h @@ -91,9 +91,8 @@ class GatherBlock : public ExecutionBlock { /// simple case only size_t _atDep = 0; - /// @brief pairs, consisting of variable and sort direction - /// (true = ascending | false = descending) - std::vector> _sortRegisters; + /// @brief sort elements for this block + std::vector _sortRegisters; /// @brief isSimple: the block is simple if we do not do merge sort . . . bool const _isSimple; @@ -103,7 +102,7 @@ class GatherBlock : public ExecutionBlock { public: OurLessThan(arangodb::Transaction* trx, std::vector>& gatherBlockBuffer, - std::vector>& sortRegisters) + std::vector& sortRegisters) : _trx(trx), _gatherBlockBuffer(gatherBlockBuffer), _sortRegisters(sortRegisters) {} @@ -114,7 +113,7 @@ class GatherBlock : public ExecutionBlock { private: arangodb::Transaction* _trx; std::vector>& _gatherBlockBuffer; - std::vector>& _sortRegisters; + std::vector& _sortRegisters; }; }; diff --git a/arangod/Aql/ClusterNodes.cpp b/arangod/Aql/ClusterNodes.cpp index 1ec3774c86..45ea9f32bf 100644 --- a/arangod/Aql/ClusterNodes.cpp +++ b/arangod/Aql/ClusterNodes.cpp @@ -160,8 +160,15 @@ void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, bool verbose) const { for (auto const& it : _elements) { VPackObjectBuilder obj(&nodes); nodes.add(VPackValue("inVariable")); - it.first->toVelocyPack(nodes); - nodes.add("ascending", VPackValue(it.second)); + it.var->toVelocyPack(nodes); + nodes.add("ascending", VPackValue(it.ascending)); + if (!it.attributePath.empty()) { + nodes.add(VPackValue("path")); + VPackArrayBuilder arr(&nodes); + for (auto const& a : it.attributePath) { + nodes.add(VPackValue(a)); + } + } } } diff --git a/arangod/Aql/ClusterNodes.h b/arangod/Aql/ClusterNodes.h index 8c12e96104..4f55fb1ee9 100644 --- a/arangod/Aql/ClusterNodes.h +++ b/arangod/Aql/ClusterNodes.h @@ -337,7 +337,7 @@ class GatherNode : public ExecutionNode { v.reserve(_elements.size()); for (auto const& p : _elements) { - v.emplace_back(p.first); + v.emplace_back(p.var); } return v; } @@ -346,7 +346,7 @@ class GatherNode : public ExecutionNode { void getVariablesUsedHere( std::unordered_set& vars) const override final { for (auto const& p : _elements) { - vars.emplace(p.first); + vars.emplace(p.var); } } @@ -374,8 +374,8 @@ class GatherNode : public ExecutionNode { bool hasAuxiliaryCollections() const { return !_auxiliaryCollections.empty(); } private: - /// @brief pairs, consisting of variable and sort direction - /// (true = ascending | false = descending) + /// @brief sort elements, variable, ascending flags and possible attribute + /// paths. SortElementVector _elements; /// @brief the underlying database diff --git a/arangod/Aql/ConditionFinder.cpp b/arangod/Aql/ConditionFinder.cpp index 8dbcf19b80..271be5d23a 100644 --- a/arangod/Aql/ConditionFinder.cpp +++ b/arangod/Aql/ConditionFinder.cpp @@ -77,7 +77,7 @@ bool ConditionFinder::before(ExecutionNode* en) { // register which variables are used in a SORT if (_sorts.empty()) { for (auto& it : static_cast(en)->getElements()) { - _sorts.emplace_back((it.first)->id, it.second); + _sorts.emplace_back((it.var)->id, it.ascending); TRI_IF_FAILURE("ConditionFinder::sortNode") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index 99ffb66556..d39d8d8be6 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -62,6 +62,19 @@ class Transaction; namespace aql { +/// @brief sort element for block, consisting of register, sort direction, +/// and a possible attribute path to dig into the document + +struct SortElementBlock { + RegisterId reg; + bool ascending; + std::vector attributePath; + + SortElementBlock(RegisterId r, bool asc) + : reg(r), ascending(asc) { + } +}; + class ExecutionEngine; class ExecutionBlock { diff --git a/arangod/Aql/ExecutionNode.cpp b/arangod/Aql/ExecutionNode.cpp index 98f8ef115a..3248002d7e 100644 --- a/arangod/Aql/ExecutionNode.cpp +++ b/arangod/Aql/ExecutionNode.cpp @@ -113,6 +113,17 @@ void ExecutionNode::getSortElements(SortElementVector& elements, bool ascending = it.get("ascending").getBoolean(); Variable* v = varFromVPack(plan->getAst(), it, "inVariable"); elements.emplace_back(v, ascending); + // Is there an attribute path? + VPackSlice path = it.get("paths"); + if (path.isArray()) { + // Get a list of strings out and add to the path: + auto& element = elements.back(); + for (auto const& it2 : VPackArrayIterator(it)) { + if (it2.isString()) { + element.attributePath.push_back(it2.copyString()); + } + } + } } } diff --git a/arangod/Aql/ExecutionNode.h b/arangod/Aql/ExecutionNode.h index 19235d6800..72363d4d0b 100644 --- a/arangod/Aql/ExecutionNode.h +++ b/arangod/Aql/ExecutionNode.h @@ -74,9 +74,20 @@ class ExecutionPlan; struct Index; class RedundantCalculationsReplacer; -/// @brief pairs, consisting of variable and sort direction -/// (true = ascending | false = descending) -typedef std::vector> SortElementVector; +/// @brief sort element, consisting of variable, sort direction, and a possible +/// attribute path to dig into the document + +struct SortElement { + Variable const* var; + bool ascending; + std::vector attributePath; + + SortElement(Variable const* v, bool asc) + : var(v), ascending(asc) { + } +}; + +typedef std::vector SortElementVector; /// @brief class ExecutionNode, abstract base class of all execution Nodes class ExecutionNode { diff --git a/arangod/Aql/ExecutionPlan.cpp b/arangod/Aql/ExecutionPlan.cpp index 33f7020e67..7d037ca12f 100644 --- a/arangod/Aql/ExecutionPlan.cpp +++ b/arangod/Aql/ExecutionPlan.cpp @@ -907,7 +907,7 @@ ExecutionNode* ExecutionPlan::fromNodeSort(ExecutionNode* previous, auto list = node->getMember(0); TRI_ASSERT(list->type == NODE_TYPE_ARRAY); - std::vector> elements; + SortElementVector elements; std::vector temp; try { @@ -955,13 +955,12 @@ ExecutionNode* ExecutionPlan::fromNodeSort(ExecutionNode* previous, // sort operand is a variable auto v = static_cast(expression->getData()); TRI_ASSERT(v != nullptr); - elements.emplace_back(std::make_pair(v, isAscending)); + elements.emplace_back(v, isAscending); } else { // sort operand is some misc expression auto calc = createTemporaryCalculation(expression, nullptr); temp.emplace_back(calc); - elements.emplace_back( - std::make_pair(getOutVariable(calc), isAscending)); + elements.emplace_back(getOutVariable(calc), isAscending); } } } catch (...) { diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index ab23a9ee77..b614a18d0a 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -720,7 +720,7 @@ void arangodb::aql::removeSortRandRule(Optimizer* opt, ExecutionPlan* plan, continue; } - auto const variable = elements[0].first; + auto const variable = elements[0].var; TRI_ASSERT(variable != nullptr); auto setter = plan->getVarSetBy(variable->id); @@ -1033,9 +1033,9 @@ void arangodb::aql::specializeCollectRule(Optimizer* opt, ExecutionPlan* plan, if (!collectNode->isDistinctCommand()) { // add the post-SORT - std::vector> sortElements; + SortElementVector sortElements; for (auto const& v : newCollectNode->groupVariables()) { - sortElements.emplace_back(std::make_pair(v.first, true)); + sortElements.emplace_back(v.first, true); } auto sortNode = @@ -1074,9 +1074,9 @@ void arangodb::aql::specializeCollectRule(Optimizer* opt, ExecutionPlan* plan, // insert a SortNode IN FRONT OF the CollectNode if (!groupVariables.empty()) { - std::vector> sortElements; + SortElementVector sortElements; for (auto const& v : groupVariables) { - sortElements.emplace_back(std::make_pair(v.second, true)); + sortElements.emplace_back(v.second, true); } auto sortNode = new SortNode(plan, plan->nextId(), sortElements, true); @@ -1342,7 +1342,7 @@ class arangodb::aql::RedundantCalculationsReplacer final case EN::SORT: { auto node = static_cast(en); for (auto& variable : node->_elements) { - variable.first = Variable::replace(variable.first, _replacements); + variable.var = Variable::replace(variable.var, _replacements); } break; } @@ -1981,7 +1981,7 @@ struct SortToIndexNode final : public WalkerWorker { } _sortNode = static_cast(en); for (auto& it : _sortNode->getElements()) { - _sorts.emplace_back((it.first)->id, it.second); + _sorts.emplace_back((it.var)->id, it.ascending); } return false; @@ -4172,13 +4172,13 @@ GeoIndexInfo identifyGeoOptimizationCandidate(ExecutionNode::NodeType type, Exec auto& elements = node->getElements(); // we're looking for "SORT DISTANCE(x,y,a,b) ASC", which has just one sort criterion - if ( !(elements.size() == 1 && elements[0].second)) { + if ( !(elements.size() == 1 && elements[0].ascending)) { //test on second makes sure the SORT is ascending return rv; } //variable of sort expression - auto variable = elements[0].first; + auto variable = elements[0].var; TRI_ASSERT(variable != nullptr); //// find the expression that is bound to the variable diff --git a/arangod/Aql/SortBlock.cpp b/arangod/Aql/SortBlock.cpp index c138d66791..ff5a926b71 100644 --- a/arangod/Aql/SortBlock.cpp +++ b/arangod/Aql/SortBlock.cpp @@ -31,11 +31,11 @@ using namespace arangodb::aql; SortBlock::SortBlock(ExecutionEngine* engine, SortNode const* en) : ExecutionBlock(engine, en), _sortRegisters(), _stable(en->_stable), _mustFetchAll(true) { for (auto const& p : en->_elements) { - auto it = en->getRegisterPlan()->varInfo.find(p.first->id); + auto it = en->getRegisterPlan()->varInfo.find(p.var->id); TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); _sortRegisters.emplace_back( - std::make_pair(it->second.registerId, p.second)); + std::make_pair(it->second.registerId, p.ascending)); } } diff --git a/arangod/Aql/SortNode.cpp b/arangod/Aql/SortNode.cpp index 8fa0facb1d..3a4435073c 100644 --- a/arangod/Aql/SortNode.cpp +++ b/arangod/Aql/SortNode.cpp @@ -45,8 +45,15 @@ void SortNode::toVelocyPackHelper(VPackBuilder& nodes, bool verbose) const { for (auto const& it : _elements) { VPackObjectBuilder obj(&nodes); nodes.add(VPackValue("inVariable")); - it.first->toVelocyPack(nodes); - nodes.add("ascending", VPackValue(it.second)); + it.var->toVelocyPack(nodes); + nodes.add("ascending", VPackValue(it.ascending)); + if (!it.attributePath.empty()) { + nodes.add(VPackValue("path")); + VPackArrayBuilder arr(&nodes); + for (auto const& a : it.attributePath) { + nodes.add(VPackValue(a)); + } + } } } nodes.add("stable", VPackValue(_stable)); @@ -70,8 +77,8 @@ class SortNodeFindMyExpressions : public WalkerWorker { auto vars = en->getVariablesSetHere(); for (auto const& v : vars) { for (size_t n = 0; n < _elms.size(); n++) { - if (_elms[n].first->id == v->id) { - _myVars[n] = std::make_pair(en, _elms[n].second); + if (_elms[n].var->id == v->id) { + _myVars[n] = std::make_pair(en, _elms[n].ascending); _foundCalcNodes++; break; } @@ -98,7 +105,7 @@ std::vector> SortNode::getCalcNodePairs() { /// simplification, and false otherwise bool SortNode::simplify(ExecutionPlan* plan) { for (auto it = _elements.begin(); it != _elements.end(); /* no hoisting */) { - auto variable = (*it).first; + auto variable = (*it).var; TRI_ASSERT(variable != nullptr); auto setter = _plan->getVarSetBy(variable->id); @@ -135,7 +142,7 @@ SortInformation SortNode::getSortInformation( auto elements = getElements(); for (auto it = elements.begin(); it != elements.end(); ++it) { - auto variable = (*it).first; + auto variable = (*it).var; TRI_ASSERT(variable != nullptr); auto setter = _plan->getVarSetBy(variable->id); @@ -169,14 +176,14 @@ SortInformation SortNode::getSortInformation( return result; } result.criteria.emplace_back( - std::make_tuple(const_cast(setter), std::string(buffer->c_str(), buffer->length()), (*it).second)); + std::make_tuple(const_cast(setter), std::string(buffer->c_str(), buffer->length()), (*it).ascending)); buffer->reset(); } else { // use variable only. note that we cannot use the variable's name as it is // not // necessarily unique in one query (yes, COLLECT, you are to blame!) result.criteria.emplace_back( - std::make_tuple(const_cast(setter), std::to_string(variable->id), (*it).second)); + std::make_tuple(const_cast(setter), std::to_string(variable->id), (*it).ascending)); } } diff --git a/arangod/Aql/SortNode.h b/arangod/Aql/SortNode.h index 6338ec5acf..e717f0db11 100644 --- a/arangod/Aql/SortNode.h +++ b/arangod/Aql/SortNode.h @@ -87,7 +87,7 @@ class SortNode : public ExecutionNode { v.reserve(_elements.size()); for (auto& p : _elements) { - v.emplace_back(p.first); + v.emplace_back(p.var); } return v; } @@ -96,7 +96,7 @@ class SortNode : public ExecutionNode { void getVariablesUsedHere( std::unordered_set& vars) const override final { for (auto& p : _elements) { - vars.emplace(p.first); + vars.emplace(p.var); } } From 21b25373d093a7accfe091f260682ca3d98924bf Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Wed, 18 Jan 2017 16:33:03 +0100 Subject: [PATCH 3/7] fix variable to substitute into config files --- cmake/InstallMacros.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/InstallMacros.cmake b/cmake/InstallMacros.cmake index baaa892bb0..03f8f7eac0 100644 --- a/cmake/InstallMacros.cmake +++ b/cmake/InstallMacros.cmake @@ -20,7 +20,7 @@ macro (generate_root_config name) STRING(REPLACE "@LOCALSTATEDIR@/" "@HOME@${INC_CPACK_ARANGO_STATE_DIR}/" FileContent "${FileContent}") else () - STRING(REPLACE "@LOCALSTATEDIR@/" "@ROOTDIR@${CPACK_LOCALSTATEDIR}/" + STRING(REPLACE "@LOCALSTATEDIR@/" "@ROOTDIR@${CMAKE_INSTALL_LOCALSTATEDIR}/" FileContent "${FileContent}") endif () From 0d42bea86343adb4800b23d75df4f674d2fc175d Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Wed, 18 Jan 2017 17:09:35 +0100 Subject: [PATCH 4/7] flush build directory if we reset the git copy --- Installation/Jenkins/build.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/Installation/Jenkins/build.sh b/Installation/Jenkins/build.sh index 68dc4f47d0..a892d5b352 100755 --- a/Installation/Jenkins/build.sh +++ b/Installation/Jenkins/build.sh @@ -522,6 +522,7 @@ echo " MAKE: ${MAKE_CMD_PREFIX} ${MAKE} ${MAKE_PARAMS}" if test ${CLEAN_IT} -eq 1; then echo "found fundamental changes, rebuilding from scratch!" git clean -f -d -x + test -d ${BUILD_DIR} || rm -rf ${BUILD_DIR} fi SRC=`pwd` From 2c1624a1b5458a894fdaa8c5947bab1f71da10c5 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Wed, 18 Jan 2017 17:10:52 +0100 Subject: [PATCH 5/7] flush build directory if we reset the git copy --- Installation/Jenkins/build.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Installation/Jenkins/build.sh b/Installation/Jenkins/build.sh index a892d5b352..ec706293e7 100755 --- a/Installation/Jenkins/build.sh +++ b/Installation/Jenkins/build.sh @@ -522,7 +522,9 @@ echo " MAKE: ${MAKE_CMD_PREFIX} ${MAKE} ${MAKE_PARAMS}" if test ${CLEAN_IT} -eq 1; then echo "found fundamental changes, rebuilding from scratch!" git clean -f -d -x - test -d ${BUILD_DIR} || rm -rf ${BUILD_DIR} + if test -d ${BUILD_DIR}; then + rm -rf ${BUILD_DIR} + fi fi SRC=`pwd` From f47b3b3c9ddf288cc3f00bff09187ee544ba1c65 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Wed, 18 Jan 2017 17:26:45 +0100 Subject: [PATCH 6/7] transient heartbeats --- arangod/Agency/AgencyComm.cpp | 2 +- arangod/Agency/FailedServer.cpp | 5 +++-- arangod/Agency/Supervision.cpp | 15 ++++++++++++--- .../tests/resilience/moving-shards-cluster.js | 7 +++++-- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 87536e099a..ce79e5fd79 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1068,7 +1068,7 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover( : timeout), url, builder.slice().toJson()); - if (!result.successful()) { + if (!result.successful() && result.httpCode() != 412) { return result; } diff --git a/arangod/Agency/FailedServer.cpp b/arangod/Agency/FailedServer.cpp index 107aedd89e..79f63d0164 100644 --- a/arangod/Agency/FailedServer.cpp +++ b/arangod/Agency/FailedServer.cpp @@ -55,7 +55,7 @@ FailedServer::~FailedServer() {} bool FailedServer::start() { LOG_TOPIC(INFO, Logger::AGENCY) - << "Start FailedServer job" + _jobId + " for server " + _server; + << "Start FailedServer job " + _jobId + " for server " + _server; // Copy todo to pending Builder todo, pending; @@ -265,7 +265,8 @@ JOB_STATUS FailedServer::status() { } if (status == PENDING) { - auto const& serverHealth = _snapshot(healthPrefix + _server + "/Status").getString(); + auto const& serverHealth = + _snapshot(healthPrefix + _server + "/Status").getString(); // mop: ohhh...server is healthy again! bool serverHealthy = serverHealth == Supervision::HEALTH_STATUS_GOOD; diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 794aaf28a1..cc0c98d9ad 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -109,6 +109,8 @@ std::vector Supervision::checkDBServers() { Node::Children const serversRegistered = _snapshot(currentServersRegisteredPrefix).children(); + bool reportPersistent; + std::vector todelete; for (auto const& machine : _snapshot(healthPrefix).children()) { if (machine.first.substr(0, 2) == "DB") { @@ -135,9 +137,9 @@ std::vector Supervision::checkDBServers() { try { // Existing lastHeartbeatTime = - _transient(healthPrefix + serverID + "/LastHeartbeatSent").toJson(); + _transient(healthPrefix + serverID + "/LastHeartbeatSent").toJson(); lastHeartbeatAcked = - _transient(healthPrefix + serverID + "/LastHeartbeatAcked").toJson(); + _transient(healthPrefix + serverID + "/LastHeartbeatAcked").toJson(); lastStatus = _transient(healthPrefix + serverID + "/Status").toJson(); if (lastHeartbeatTime != heartbeatTime) { // Update good = true; @@ -171,7 +173,10 @@ std::vector Supervision::checkDBServers() { } if (good) { - + + if (lastStatus != Supervision::HEALTH_STATUS_GOOD) { + reportPersistent = true; + } report->add( "LastHeartbeatAcked", VPackValue(timepointToString(std::chrono::system_clock::now()))); @@ -205,6 +210,7 @@ std::vector Supervision::checkDBServers() { // for at least grace period if (t.count() > _gracePeriod && secondsSinceLeader > _gracePeriod) { if (lastStatus == "BAD") { + reportPersistent = true; report->add("Status", VPackValue("FAILED")); FailedServer fsj(_snapshot, _agent, std::to_string(_jobId++), "supervision", _agencyPrefix, serverID); @@ -222,6 +228,9 @@ std::vector Supervision::checkDBServers() { if (!this->isStopping()) { _agent->transient(report); + if (reportPersistent) { + _agent->write(report); + } } } diff --git a/js/server/tests/resilience/moving-shards-cluster.js b/js/server/tests/resilience/moving-shards-cluster.js index 3747abc5ed..087bd87960 100644 --- a/js/server/tests/resilience/moving-shards-cluster.js +++ b/js/server/tests/resilience/moving-shards-cluster.js @@ -98,12 +98,15 @@ function MovingShardsSuite () { function getCleanedOutServers() { var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001"); + var request = require("@arangodb/request"); var endpointToURL = require("@arangodb/cluster").endpointToURL; var url = endpointToURL(coordEndpoint); + var res; try { - var res = request( - { method: "GET", url: url + "/_admin/cluster/numberOfServers" }); + var envelope = + { method: "GET", url: url + "/_admin/cluster/numberOfServers" }; + res = request(envelope); } catch (err) { console.error( "Exception for POST /_admin/cluster/cleanOutServer:", err.stack); From f8c8f1f05f5823a3093459dcb5b20510c92cfbeb Mon Sep 17 00:00:00 2001 From: Frank Celler Date: Wed, 18 Jan 2017 17:50:50 +0100 Subject: [PATCH 7/7] added --replication-factor, --number-of-shards and --wait-for-sync to arangobench --- CHANGELOG | 2 + .../Manual/Troubleshooting/Arangobench.mdpp | 8 + arangosh/Benchmark/BenchFeature.cpp | 114 ++++++----- arangosh/Benchmark/BenchFeature.h | 16 +- arangosh/Benchmark/test-cases.h | 179 +++++++++--------- 5 files changed, 178 insertions(+), 141 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index abef0f40ca..647b54cd9c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ devel ----- +* added --replication-factor, --number-of-shards and --wait-for-sync to arangobench + * turn on UTF-8 string validation for VelocyPack values received via VST connections * fixed issue #2257 diff --git a/Documentation/Books/Manual/Troubleshooting/Arangobench.mdpp b/Documentation/Books/Manual/Troubleshooting/Arangobench.mdpp index 4495d3f24b..e6bdbb75eb 100644 --- a/Documentation/Books/Manual/Troubleshooting/Arangobench.mdpp +++ b/Documentation/Books/Manual/Troubleshooting/Arangobench.mdpp @@ -21,6 +21,14 @@ Startup options - *--collection*: Name of collection to use in test (only relevant for tests that invoke collections). +- *--replication-factor*: In case of a cluster, the replication factor of the + created collections. + +- *--number-of-shards*: In case of a cluster, the number of shards of the + created collections. + +- *--wait-for-sync*: The value of *waitForSync* for created collections. + - *--complexity*: Complexity value for test case (default: 1). Meaning depends on test case. diff --git a/arangosh/Benchmark/BenchFeature.cpp b/arangosh/Benchmark/BenchFeature.cpp index 43ef2af57c..c7b270b4a5 100644 --- a/arangosh/Benchmark/BenchFeature.cpp +++ b/arangosh/Benchmark/BenchFeature.cpp @@ -22,9 +22,9 @@ #include "BenchFeature.h" -#include -#include #include +#include +#include #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/StringUtils.h" @@ -54,7 +54,7 @@ BenchFeature* ARANGOBENCH; #include "Benchmark/test-cases.h" BenchFeature::BenchFeature(application_features::ApplicationServer* server, - int* result) + int* result) : ApplicationFeature(server, "Bench"), _async(false), _concurreny(1), @@ -70,6 +70,9 @@ BenchFeature::BenchFeature(application_features::ApplicationServer* server, _quiet(false), _runs(1), _junitReportFile(""), + _replicationFactor(1), + _numberOfShards(1), + _waitForSync(false), _result(result) { requiresElevatedPrivileges(false); setOptional(false); @@ -99,6 +102,18 @@ void BenchFeature::collectOptions(std::shared_ptr options) { options->addOption("--collection", "collection name to use in tests", new StringParameter(&_collection)); + options->addOption("--replication-factor", + "replication factor of created collections", + new UInt64Parameter(&_replicationFactor)); + + options->addOption("--number-of-shards", + "number of shards of created collections", + new UInt64Parameter(&_numberOfShards)); + + options->addOption("--wait-for-sync", + "use waitForSync for created collections", + new BooleanParameter(&_waitForSync)); + std::unordered_set cases = {"version", "document", "collection", @@ -132,13 +147,13 @@ void BenchFeature::collectOptions(std::shared_ptr options) { "use a startup delay (necessary only when run in series)", new BooleanParameter(&_delay)); - options->addOption("--junit-report-file", "filename to write junit style report to", + options->addOption("--junit-report-file", + "filename to write junit style report to", new StringParameter(&_junitReportFile)); - - options->addOption("--runs", - "run test n times (and calculate statistics based on median)", - new UInt64Parameter(&_runs)); + options->addOption( + "--runs", "run test n times (and calculate statistics based on median)", + new UInt64Parameter(&_runs)); options->addOption("--progress", "show progress", new BooleanParameter(&_progress)); @@ -164,7 +179,9 @@ void BenchFeature::updateStartCounter() { ++_started; } int BenchFeature::getStartCounter() { return _started; } void BenchFeature::start() { - ClientFeature* client = application_features::ApplicationServer::getFeature("Client"); + ClientFeature* client = + application_features::ApplicationServer::getFeature( + "Client"); client->setRetries(3); client->setWarn(true); @@ -181,8 +198,6 @@ void BenchFeature::start() { FATAL_ERROR_EXIT(); } - - double const stepSize = (double)_operations / (double)_concurreny; int64_t realStep = (int64_t)stepSize; @@ -202,18 +217,18 @@ void BenchFeature::start() { bool ok = true; std::vector results; - for (uint64_t j=0;j<_runs;j++) { + for (uint64_t j = 0; j < _runs; j++) { status("starting threads..."); - BenchmarkCounter operationsCounter(0, - (unsigned long)_operations); + BenchmarkCounter operationsCounter( + 0, (unsigned long)_operations); ConditionVariable startCondition; // start client threads _started = 0; for (uint64_t i = 0; i < _concurreny; ++i) { BenchmarkThread* thread = new BenchmarkThread( benchmark.get(), &startCondition, &BenchFeature::updateStartCounter, - static_cast(i), (unsigned long)_batchSize, &operationsCounter, client, _keepAlive, - _async, _verbose); + static_cast(i), (unsigned long)_batchSize, &operationsCounter, + client, _keepAlive, _async, _verbose); thread->setOffset((size_t)(i * realStep)); thread->start(); threads.push_back(thread); @@ -272,11 +287,9 @@ void BenchFeature::start() { } results.push_back({ - time, - operationsCounter.failures(), - operationsCounter.incompleteFailures(), - requestTime, - }); + time, operationsCounter.failures(), + operationsCounter.incompleteFailures(), requestTime, + }); for (size_t i = 0; i < static_cast(_concurreny); ++i) { delete threads[i]; } @@ -297,7 +310,8 @@ void BenchFeature::start() { *_result = ret; } -bool BenchFeature::report(ClientFeature* client, std::vector results) { +bool BenchFeature::report(ClientFeature* client, + std::vector results) { std::cout << std::endl; std::cout << "Total number of operations: " << _operations @@ -305,17 +319,19 @@ bool BenchFeature::report(ClientFeature* client, std::vector res << ", keep alive: " << (_keepAlive ? "yes" : "no") << ", async: " << (_async ? "yes" : "no") << ", batch size: " << _batchSize + << ", replication factor: " << _replicationFactor + << ", number of shards: " << _numberOfShards + << ", wait for sync: " << (_waitForSync ? "true" : "false") << ", concurrency level (threads): " << _concurreny << std::endl; std::cout << "Test case: " << _testCase << ", complexity: " << _complexity << ", database: '" << client->databaseName() << "', collection: '" << _collection << "'" << std::endl; - std::sort(results.begin(), results.end(), [](BenchRunResult a, BenchRunResult b) { - return a.time < b.time; - }); + std::sort(results.begin(), results.end(), + [](BenchRunResult a, BenchRunResult b) { return a.time < b.time; }); - BenchRunResult output {0, 0, 0, 0}; + BenchRunResult output{0, 0, 0, 0}; if (_runs > 1) { size_t size = results.size(); std::cout << std::endl; @@ -329,13 +345,13 @@ bool BenchFeature::report(ClientFeature* client, std::vector res std::cout << "Printing median result" << std::endl; std::cout << "=======================" << std::endl; - size_t mid = (size_t) size / 2; + size_t mid = (size_t)size / 2; if (size % 2 == 0) { - output.update((results[mid - 1].time + results[mid].time) / 2, + output.update( + (results[mid - 1].time + results[mid].time) / 2, (results[mid - 1].failures + results[mid].failures) / 2, (results[mid - 1].incomplete + results[mid].incomplete) / 2, - (results[mid - 1].requestTime + results[mid].requestTime) / 2 - ); + (results[mid - 1].requestTime + results[mid].requestTime) / 2); } else { output = results[mid]; } @@ -351,9 +367,10 @@ bool BenchFeature::report(ClientFeature* client, std::vector res } bool BenchFeature::writeJunitReport(BenchRunResult const& result) { - std::ofstream outfile (_junitReportFile,std::ofstream::binary); + std::ofstream outfile(_junitReportFile, std::ofstream::binary); if (!outfile.is_open()) { - std::cerr << "Could not open JUnit Report File: " << _junitReportFile << std::endl; + std::cerr << "Could not open JUnit Report File: " << _junitReportFile + << std::endl; return false; } @@ -362,29 +379,31 @@ bool BenchFeature::writeJunitReport(BenchRunResult const& result) { std::time_t t = std::time(nullptr); std::tm tm = *std::localtime(&t); - + char date[255]; memset(date, 0, sizeof(date)); - strftime(date, sizeof(date)-1, "%FT%T%z", &tm); + strftime(date, sizeof(date) - 1, "%FT%T%z", &tm); char host[255]; memset(host, 0, sizeof(host)); - gethostname(host, sizeof(host)-1); + gethostname(host, sizeof(host) - 1); std::string hostname(host); bool ok = false; try { outfile << "" << '\n' - << "\n" - << "\n" - << "\n" - << "\n"; + << "\n" + << "\n" + << "\n" + << "\n"; ok = true; - } catch(...) { - std::cerr << "Got an exception writing to junit report file " << _junitReportFile; + } catch (...) { + std::cerr << "Got an exception writing to junit report file " + << _junitReportFile; ok = false; } outfile.close(); @@ -413,7 +432,8 @@ void BenchFeature::printResult(BenchRunResult const& result) { << std::endl; if (result.failures > 0) { - LOG(WARN) << "WARNING: " << result.failures << " arangobench request(s) failed!"; + LOG(WARN) << "WARNING: " << result.failures + << " arangobench request(s) failed!"; } if (result.incomplete > 0) { LOG(WARN) << "WARNING: " << result.incomplete @@ -421,6 +441,4 @@ void BenchFeature::printResult(BenchRunResult const& result) { } } -void BenchFeature::unprepare() { - ARANGOBENCH = nullptr; -} +void BenchFeature::unprepare() { ARANGOBENCH = nullptr; } diff --git a/arangosh/Benchmark/BenchFeature.h b/arangosh/Benchmark/BenchFeature.h index fef2404bb3..fa78e7d183 100644 --- a/arangosh/Benchmark/BenchFeature.h +++ b/arangosh/Benchmark/BenchFeature.h @@ -54,19 +54,22 @@ class BenchFeature final : public application_features::ApplicationFeature { public: bool async() const { return _async; } - uint64_t const& concurrency() const { return _concurreny; } - uint64_t const& operations() const { return _operations; } - uint64_t const& batchSize() const { return _batchSize; } + uint64_t concurrency() const { return _concurreny; } + uint64_t operations() const { return _operations; } + uint64_t batchSize() const { return _batchSize; } bool keepAlive() const { return _keepAlive; } std::string const& collection() const { return _collection; } std::string const& testCase() const { return _testCase; } - uint64_t const& complexity() const { return _complexity; } + uint64_t complexity() const { return _complexity; } bool delay() const { return _delay; } bool progress() const { return _progress; } bool verbose() const { return _verbose; } bool quit() const { return _quiet; } - uint64_t const& runs() const { return _runs; } + uint64_t runs() const { return _runs; } std::string const& junitReportFile() const { return _junitReportFile; } + uint64_t replicationFactor() const { return _replicationFactor; } + uint64_t numberOfShards() const { return _numberOfShards; } + bool waitForSync() const { return _waitForSync; } private: void status(std::string const& value); @@ -89,6 +92,9 @@ class BenchFeature final : public application_features::ApplicationFeature { bool _quiet; uint64_t _runs; std::string _junitReportFile; + uint64_t _replicationFactor; + uint64_t _numberOfShards; + bool _waitForSync; private: int* _result; diff --git a/arangosh/Benchmark/test-cases.h b/arangosh/Benchmark/test-cases.h index 01d249e2a4..517e5acebe 100644 --- a/arangosh/Benchmark/test-cases.h +++ b/arangosh/Benchmark/test-cases.h @@ -49,9 +49,8 @@ struct VersionTest : public BenchmarkOperation { return _url; } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::GET; } @@ -85,18 +84,19 @@ struct DocumentCrudAppendTest : public BenchmarkOperation { size_t const mod = globalCounter % 4; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 4); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 4; if (mod == 0) { @@ -176,18 +176,19 @@ struct DocumentCrudWriteReadTest : public BenchmarkOperation { size_t const mod = globalCounter % 2; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 2); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 2; if (mod == 0) { @@ -253,18 +254,19 @@ struct ShapesTest : public BenchmarkOperation { size_t const mod = globalCounter % 3; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 3); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 3; if (mod == 0) { @@ -339,18 +341,19 @@ struct ShapesAppendTest : public BenchmarkOperation { size_t const mod = globalCounter % 2; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 2); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 2; if (mod == 0) { @@ -424,7 +427,8 @@ struct RandomShapesTest : public BenchmarkOperation { size_t const mod = globalCounter % 3; if (mod == 0) { - return std::string("/_api/document?collection=") + ARANGOBENCH->collection(); + return std::string("/_api/document?collection=") + + ARANGOBENCH->collection(); } else { size_t keyId = (size_t)(globalCounter / 3); std::string const key = "testkey" + StringUtils::itoa(keyId); @@ -434,9 +438,8 @@ struct RandomShapesTest : public BenchmarkOperation { } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 3; if (mod == 0) { @@ -516,18 +519,19 @@ struct DocumentCrudTest : public BenchmarkOperation { size_t const mod = globalCounter % 5; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 5); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 5; if (mod == 0) { @@ -609,18 +613,19 @@ struct EdgeCrudTest : public BenchmarkOperation { size_t const mod = globalCounter % 4; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 4); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 4; if (mod == 0) { @@ -721,18 +726,19 @@ struct SkiplistTest : public BenchmarkOperation { size_t const mod = globalCounter % 4; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 4); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 4; if (mod == 0) { @@ -794,7 +800,8 @@ struct HashTest : public BenchmarkOperation { bool setUp(SimpleHttpClient* client) override { return DeleteCollection(client, ARANGOBENCH->collection()) && CreateCollection(client, ARANGOBENCH->collection(), 2) && - CreateIndex(client, ARANGOBENCH->collection(), "hash", "[\"value\"]"); + CreateIndex(client, ARANGOBENCH->collection(), "hash", + "[\"value\"]"); } void tearDown() override {} @@ -804,18 +811,19 @@ struct HashTest : public BenchmarkOperation { size_t const mod = globalCounter % 4; if (mod == 0) { - return std::string("/_api/document?collection=" + ARANGOBENCH->collection()); + return std::string("/_api/document?collection=" + + ARANGOBENCH->collection()); } else { size_t keyId = (size_t)(globalCounter / 4); std::string const key = "testkey" + StringUtils::itoa(keyId); - return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + key); + return std::string("/_api/document/" + ARANGOBENCH->collection() + "/" + + key); } } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { size_t const mod = globalCounter % 4; if (mod == 0) { @@ -871,8 +879,8 @@ struct HashTest : public BenchmarkOperation { struct DocumentImportTest : public BenchmarkOperation { DocumentImportTest() : BenchmarkOperation(), _url(), _buffer(0) { - _url = - "/_api/import?collection=" + ARANGOBENCH->collection() + "&type=documents"; + _url = "/_api/import?collection=" + ARANGOBENCH->collection() + + "&type=documents"; uint64_t const n = ARANGOBENCH->complexity(); @@ -902,9 +910,8 @@ struct DocumentImportTest : public BenchmarkOperation { return _url; } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -962,9 +969,8 @@ struct DocumentCreationTest : public BenchmarkOperation { return _url; } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -999,9 +1005,8 @@ struct CollectionCreationTest : public BenchmarkOperation { return _url; } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1059,9 +1064,8 @@ struct TransactionAqlTest : public BenchmarkOperation { return std::string("/_api/cursor"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1148,9 +1152,8 @@ struct TransactionCountTest : public BenchmarkOperation { return std::string("/_api/transaction"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1203,9 +1206,8 @@ struct TransactionDeadlockTest : public BenchmarkOperation { return std::string("/_api/transaction"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1272,9 +1274,8 @@ struct TransactionMultiTest : public BenchmarkOperation { return std::string("/_api/transaction"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1354,9 +1355,8 @@ struct TransactionMultiCollectionTest : public BenchmarkOperation { return std::string("/_api/transaction"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1427,9 +1427,8 @@ struct AqlInsertTest : public BenchmarkOperation { return std::string("/_api/cursor"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1481,9 +1480,8 @@ struct AqlV8Test : public BenchmarkOperation { return std::string("/_api/cursor"); } - rest::RequestType type(int const threadNumber, - size_t const threadCounter, - size_t const globalCounter) override { + rest::RequestType type(int const threadNumber, size_t const threadCounter, + size_t const globalCounter) override { return rest::RequestType::POST; } @@ -1555,11 +1553,16 @@ static bool CreateCollection(SimpleHttpClient* client, std::string const& name, std::unordered_map headerFields; SimpleHttpResult* result = nullptr; - std::string payload = - "{\"name\":\"" + name + "\",\"type\":" + StringUtils::itoa(type) + "}"; - result = - client->request(rest::RequestType::POST, "/_api/collection", - payload.c_str(), payload.size(), headerFields); + std::string payload = "{\"name\":\"" + name + "\",\"type\":" + + StringUtils::itoa(type) + ",\"replicationFactor\":" + + StringUtils::itoa(ARANGOBENCH->replicationFactor()) + + ",\"numberOfShards\":" + + StringUtils::itoa(ARANGOBENCH->numberOfShards()) + + ",\"waitForSync\":" + + (ARANGOBENCH->waitForSync() ? "true" : "false") + "}"; + + result = client->request(rest::RequestType::POST, "/_api/collection", + payload.c_str(), payload.size(), headerFields); bool failed = true; @@ -1586,9 +1589,9 @@ static bool CreateIndex(SimpleHttpClient* client, std::string const& name, std::string payload = "{\"type\":\"" + type + "\",\"fields\":" + fields + ",\"unique\":false}"; - result = client->request(rest::RequestType::POST, - "/_api/index?collection=" + name, payload.c_str(), - payload.size(), headerFields); + result = + client->request(rest::RequestType::POST, "/_api/index?collection=" + name, + payload.c_str(), payload.size(), headerFields); bool failed = true;