diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 5c5d28bf91..c31779c3c3 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1181,41 +1181,22 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover(AgencyTransaction const bool AgencyComm::ensureStructureInitialized() { LOG_TOPIC("748e2", TRACE, Logger::AGENCYCOMM) << "checking if agency is initialized"; - while (true) { - while (shouldInitializeStructure()) { - LOG_TOPIC("17e16", TRACE, Logger::AGENCYCOMM) - << "Agency is fresh. Needs initial structure."; - // mop: we are the chosen one .. great success + while (!application_features::ApplicationServer::isStopping() && + shouldInitializeStructure()) { - if (tryInitializeStructure()) { - LOG_TOPIC("4c5aa", TRACE, Logger::AGENCYCOMM) - << "Successfully initialized agency"; - break; - } - - LOG_TOPIC("e05d1", WARN, Logger::AGENCYCOMM) - << "Initializing agency failed. We'll try again soon"; - // We should really have exclusive access, here, this is strange! - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - AgencyCommResult result = getValues("InitDone"); - - if (result.successful()) { - VPackSlice value = result.slice()[0].get( - std::vector({AgencyCommManager::path(), "InitDone"})); - if (value.isBoolean() && value.getBoolean()) { - // expecting a value of "true" - LOG_TOPIC("e8450", TRACE, Logger::AGENCYCOMM) << "Found an initialized agency"; - break; - } - } else { - if (result.httpCode() == 401) { - // unauthorized - LOG_TOPIC("e0376", FATAL, Logger::STARTUP) << "Unauthorized. Wrong credentials."; - FATAL_ERROR_EXIT(); - } + LOG_TOPIC("17e16", TRACE, Logger::AGENCYCOMM) + << "Agency is fresh. Needs initial structure."; + + if (tryInitializeStructure()) { + LOG_TOPIC("4c5aa", TRACE, Logger::AGENCYCOMM) + << "Successfully initialized agency"; + break; } + + LOG_TOPIC("63f7b", WARN, Logger::AGENCYCOMM) + << "Initializing agency failed. We'll try again soon"; + // We should really have exclusive access, here, this is strange! + std::this_thread::sleep_for(std::chrono::seconds(1)); LOG_TOPIC("9d265", TRACE, Logger::AGENCYCOMM) << "Waiting for agency to get initialized"; @@ -1346,10 +1327,7 @@ AgencyCommResult AgencyComm::sendWithFailover(arangodb::rest::RequestType method auto waitSomeTime = [&waitInterval, &result]() -> bool { // Returning true means timeout because of shutdown: - auto serverFeature = application_features::ApplicationServer::getFeature( - "Server"); - if (serverFeature->isStopping() || - !application_features::ApplicationServer::isRetryOK()) { + if (!application_features::ApplicationServer::isRetryOK()) { LOG_TOPIC("53e58", INFO, Logger::AGENCYCOMM) << "Unsuccessful AgencyComm: Timeout because of shutdown " << "errorCode: " << result.errorCode() @@ -1404,8 +1382,6 @@ AgencyCommResult AgencyComm::sendWithFailover(arangodb::rest::RequestType method // Some reporting: if (tries > 20) { auto serverState = application_features::ApplicationServer::server->state(); - application_features::ApplicationServer::getFeature( - "Server"); std::string serverStateStr; switch (serverState) { case arangodb::application_features::ServerState::UNINITIALIZED: @@ -1532,8 +1508,7 @@ AgencyCommResult AgencyComm::sendWithFailover(arangodb::rest::RequestType method continue; } } else { - // How odd, we are supposed to get at least {results=[...]}, let's - // retry... + // How odd, we are supposed to get at least {results=[...]}, let's retry... isInquiry = false; continue; } @@ -1749,7 +1724,9 @@ bool AgencyComm::tryInitializeStructure() { { VPackObjectBuilder c(&builder); builder.add("LatestID", VPackValue(1)); + addEmptyVPackObject("Problems", builder); builder.add("UserVersion", VPackValue(1)); + addEmptyVPackObject("ServerStates", builder); builder.add("HeartbeatIntervalMs", VPackValue(1000)); } @@ -1796,10 +1773,9 @@ bool AgencyComm::tryInitializeStructure() { LOG_TOPIC("58ffe", TRACE, Logger::AGENCYCOMM) << "Initializing agency with " << builder.toJson(); - AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice()); - - AgencyWriteTransaction initTransaction; - initTransaction.operations.push_back(initOperation); + AgencyWriteTransaction initTransaction( + AgencyOperation("", AgencyValueOperationType::SET, builder.slice()), + AgencyPrecondition("Plan", AgencyPrecondition::Type::EMPTY, true)); AgencyCommResult result = sendTransactionWithFailover(initTransaction); if (result.httpCode() == TRI_ERROR_HTTP_UNAUTHORIZED) { @@ -1820,19 +1796,60 @@ bool AgencyComm::tryInitializeStructure() { } bool AgencyComm::shouldInitializeStructure() { - VPackBuilder builder; - builder.add(VPackValue(false)); - // "InitDone" key should not previously exist - auto result = casValue("InitDone", builder.slice(), false, 60.0, - AgencyCommManager::CONNECTION_OPTIONS._requestTimeout); + size_t nFail = 0; + + while (!application_features::ApplicationServer::isStopping()) { + + auto result = getValues("Plan"); + + if (!result.successful()) { // Not 200 - 299 + + if (result.httpCode() == 401) { + // unauthorized + LOG_TOPIC("32781", FATAL, Logger::STARTUP) << "Unauthorized. Wrong credentials."; + FATAL_ERROR_EXIT(); + } + + // Agency not ready yet + LOG_TOPIC("36253", TRACE, Logger::AGENCYCOMM) + << "waiting for agency to become ready"; + continue; + + } else { + + // Sanity + if (result.slice().isArray() && result.slice().length() == 1) { + + // No plan entry? Should initialise + if (result.slice()[0] == VPackSlice::emptyObjectSlice()) { + LOG_TOPIC("98732", DEBUG, Logger::AGENCYCOMM) + << "agency initialisation should be performed"; + return true; + } else { + LOG_TOPIC("abedb", DEBUG, Logger::AGENCYCOMM) + << "agency initialisation under way or done"; + return false; + } + } else { + // Should never get here + TRI_ASSERT(false); + if (nFail++ < 3) { + LOG_TOPIC("fed52", DEBUG, Logger::AGENCYCOMM) << "What the hell just happened?"; + } else { + LOG_TOPIC("54fea", FATAL, Logger::AGENCYCOMM) + << "Illegal response from agency during bootstrap: " + << result.slice().toJson(); + FATAL_ERROR_EXIT(); + } + continue; + } + + } + + std::this_thread::sleep_for(std::chrono::milliseconds(250)); - if (!result.successful()) { - // somebody else has or is initializing the agency - LOG_TOPIC("8a39b", TRACE, Logger::AGENCYCOMM) - << "someone else is initializing the agency"; - return false; } - return true; + return false; } diff --git a/arangod/Agency/AgencyComm.h b/arangod/Agency/AgencyComm.h index 51361c435f..2889dfdd6f 100644 --- a/arangod/Agency/AgencyComm.h +++ b/arangod/Agency/AgencyComm.h @@ -306,9 +306,7 @@ class AgencyTransaction { /*struct AgencyGeneralTransaction : public AgencyTransaction { - typedef -std::pair,std::vector> -TransactionType; + typedef std::pair,std::vector> TransactionType; explicit AgencyGeneralTransaction(AgencyOperation const& op, AgencyPrecondition const& pre) : diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index e078b19629..8bd49a835c 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -120,10 +120,11 @@ struct log_t { std::string const& clientId = std::string()) : index(idx), term(t), - entry(std::make_shared>(*e.get())), clientId(clientId), timestamp(std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch())) {} + std::chrono::system_clock::now().time_since_epoch())) { + entry = std::make_shared>(*e.get()); + } friend std::ostream& operator<<(std::ostream& o, log_t const& l) { o << l.index << " " << l.term << " " << VPackSlice(l.entry->data()).toJson() << " " diff --git a/arangod/Agency/AgencyFeature.cpp b/arangod/Agency/AgencyFeature.cpp index 329fff9406..2627527438 100644 --- a/arangod/Agency/AgencyFeature.cpp +++ b/arangod/Agency/AgencyFeature.cpp @@ -35,6 +35,7 @@ using namespace arangodb::application_features; using namespace arangodb::basics; using namespace arangodb::options; +using namespace arangodb::rest; namespace arangodb { @@ -123,12 +124,11 @@ void AgencyFeature::collectOptions(std::shared_ptr options) { new UInt64Parameter(&_maxAppendSize), arangodb::options::makeFlags(arangodb::options::Flags::Hidden)); - options->addOption( - "--agency.disaster-recovery-id", - "allows for specification of the id for this agent; dangerous option for " - "disaster recover only!", - new StringParameter(&_recoveryId), - arangodb::options::makeFlags(arangodb::options::Flags::Hidden)); + options->addOption("--agency.disaster-recovery-id", + "allows for specification of the id for this agent; " + "dangerous option for disaster recover only!", + new StringParameter(&_recoveryId), + arangodb::options::makeFlags(arangodb::options::Flags::Hidden)); } void AgencyFeature::validateOptions(std::shared_ptr options) { diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 484ca7b113..7452c51f28 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -481,8 +481,7 @@ void Agent::sendAppendEntriesRPC() { } // If the follower is behind our first log entry send last snapshot and - // following logs. Else try to have the follower catch up in regular - // order. + // following logs. Else try to have the follower catch up in regular order. bool needSnapshot = lastConfirmed < _state.firstIndex(); if (needSnapshot) { lastConfirmed = _state.lastCompactionAt() - 1; @@ -859,15 +858,14 @@ bool Agent::challengeLeadership() { // ensure that a leader resigns before another one even starts an // election. However, the Raft paper does not mention this at all. Rather, // in the paper it is written that the leader should resign immediately if - // it sees a higher term from another server. Currently we have not - // implemented to return the follower's term with a response to - // AppendEntriesRPC, so the leader cannot find out a higher term this - // way. The leader can, however, see a higher term in the incoming - // AppendEntriesRPC a new leader sends out, and it will immediately - // resign if it sees that. For the moment, this value here can stay. - // We should soon implement sending the follower's term back with - // each response and probably get rid of this method altogether, - // but this requires a bit more thought. + // it sees a higher term from another server. Currently we have not implemented + // to return the follower's term with a response to AppendEntriesRPC, so + // the leader cannot find out a higher term this way. The leader can, + // however, see a higher term in the incoming AppendEntriesRPC a new + // leader sends out, and it will immediately resign if it sees that. For + // the moment, this value here can stay. We should soon implement sending + // the follower's term back with each response and probably get rid of + // this method altogether, but this requires a bit more thought. if (_config.maxPing() * _config.timeoutMult() > m.count()) { ++good; } @@ -1646,13 +1644,13 @@ Store const& Agent::transient() const { /// Rebuild from persisted state void Agent::setPersistedState(VPackSlice const& compaction) { // Catch up with compacted state, this is only called at startup - _spearhead = compaction.get("readDB"); + _spearhead = compaction; // Catch up with commit try { WRITE_LOCKER(oLocker, _outputLock); CONDITION_LOCKER(guard, _waitForCV); - _readDB = compaction.get("readDB"); + _readDB = compaction; _commitIndex = arangodb::basics::StringUtils::uint64(compaction.get("_key").copyString()); _waitForCV.broadcast(); diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index d0cb9edce8..473be6a48a 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -26,6 +26,7 @@ #include "Agency/AgencyCommon.h" #include "Agency/AgencyStrings.h" +#include "Agency/AgentCallback.h" #include "Agency/AgentConfiguration.h" #include "Agency/AgentInterface.h" #include "Agency/Compactor.h" @@ -314,8 +315,7 @@ class Agent final : public arangodb::Thread, public AgentInterface { /// @brief Activate this agent in single agent mode. void activateAgency(); - /// @brief add agent to configuration (from State after successful local - /// persistence) + /// @brief add agent to configuration (from State after successful local persistence) void updateConfiguration(VPackSlice const&); private: @@ -438,8 +438,7 @@ class Agent final : public arangodb::Thread, public AgentInterface { /// For _ioLock: We put in assertions to ensure that when this lock is /// acquired we do not have the _tiLock. - /// @brief Inception thread getting an agent up to join RAFT from cmd or - /// persistence + /// @brief Inception thread getting an agent up to join RAFT from cmd or persistence std::unique_ptr _inception; /// @brief Compactor diff --git a/arangod/Agency/AgentCallback.cpp b/arangod/Agency/AgentCallback.cpp index 5bde617b7b..7e294490b9 100644 --- a/arangod/Agency/AgentCallback.cpp +++ b/arangod/Agency/AgentCallback.cpp @@ -22,9 +22,11 @@ //////////////////////////////////////////////////////////////////////////////// #include "AgentCallback.h" + #include "Agency/Agent.h" #include "ApplicationFeatures/ApplicationServer.h" +using namespace arangodb::application_features; using namespace arangodb::consensus; using namespace arangodb::velocypack; @@ -88,8 +90,7 @@ bool AgentCallback::operator()(arangodb::ClusterCommResult* res) { << "comm_status(" << res->status << "), last(" << _last << "), follower(" << _slaveID << "), time(" << TRI_microtime() - _startTime << ")"; } else { - if (!application_features::ApplicationServer::isStopping() && - (_agent == nullptr || !_agent->isStopping())) { + if (!ApplicationServer::isStopping() && (_agent == nullptr || !_agent->isStopping())) { // Do not warn if we are already shutting down: LOG_TOPIC("2c712", WARN, Logger::AGENCY) << "Got bad callback from AppendEntriesRPC: " diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index 693460cb8a..3084bd4c57 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -514,8 +514,7 @@ bool config_t::findInPool(std::string const& id) const { /// @brief merge from persisted configuration bool config_t::merge(VPackSlice const& conf) { - WRITE_LOCKER(writeLocker, - _lock); // All must happen under the lock or else ... + WRITE_LOCKER(writeLocker, _lock); // All must happen under the lock or else ... // FIXME: All these "command line beats persistence" are wrong, since // the given default values never happen. Only fixed _supervision with diff --git a/arangod/Agency/AgentInterface.h b/arangod/Agency/AgentInterface.h index 360650803f..bbf5e40873 100644 --- a/arangod/Agency/AgentInterface.h +++ b/arangod/Agency/AgentInterface.h @@ -43,8 +43,7 @@ class AgentInterface { }; /// @brief Attempt write - virtual write_ret_t write(query_t const&, - WriteMode const& mode = WriteMode()) = 0; /// @brief Attempt write + virtual write_ret_t write(query_t const&, WriteMode const& mode = WriteMode()) = 0; /// @brief Attempt write virtual trans_ret_t transient(query_t const&) = 0; diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index e8ca82ff39..676d555b25 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -97,7 +97,6 @@ void Inception::gossip() { continue; } } - LOG_TOPIC("cc3fd", DEBUG, Logger::AGENCY) << "Sending gossip message 1: " << out->toJson() << " to peer " << p; if (this->isStopping() || _agent->isStopping() || cc == nullptr) { diff --git a/arangod/Agency/Node.cpp b/arangod/Agency/Node.cpp index 134e40ee97..480664cb04 100644 --- a/arangod/Agency/Node.cpp +++ b/arangod/Agency/Node.cpp @@ -46,6 +46,7 @@ struct Empty { }; const Node::Children Node::dummyChildren = Node::Children(); +const Node Node::_dummyNode = Node("dumm-di-dumm"); /// @brief Split strings by separator inline static std::vector split(const std::string& str, char separator) { @@ -390,11 +391,21 @@ bool Node::addTimeToLive(long millis) { return true; } +void Node::timeToLive(TimePoint const& ttl) { + _ttl = ttl; +} + +TimePoint const& Node::timeToLive() const { + return _ttl; +} + // remove time to live entry for this node bool Node::removeTimeToLive() { - if (_ttl != std::chrono::system_clock::time_point()) { - store().removeTTL(uri()); - _ttl = std::chrono::system_clock::time_point(); + if (_store != nullptr) { + _store->removeTTL(uri()); + if (_ttl != std::chrono::system_clock::time_point()) { + _ttl = std::chrono::system_clock::time_point(); + } } return true; } diff --git a/arangod/Agency/Node.h b/arangod/Agency/Node.h index fe9f1a1081..3015472962 100644 --- a/arangod/Agency/Node.h +++ b/arangod/Agency/Node.h @@ -231,6 +231,18 @@ class Node { /// @brief Is string bool isString() const; + /** + * @brief Get seconds this node still has to live. (Must be guarded by caller) + * @return seconds to live (int64_t::max, if none set) + */ + TimePoint const& timeToLive() const; + + /** + * @brief Set expiry for this node + * @param Time point of expiry + */ + void timeToLive(TimePoint const& ttl); + /// @brief accessor to Node object /// @return second is true if url exists, first populated if second true std::pair hasAsNode(std::string const&) const; @@ -316,6 +328,11 @@ class Node { /// @brief Clear key value store void clear(); + // @brief Helper function to return static instance of dummy node below + static Node const& dummyNode() { + return _dummyNode; + } + protected: /// @brief Add time to live entry virtual bool addTimeToLive(long millis); @@ -335,6 +352,7 @@ class Node { mutable bool _vecBufDirty; bool _isArray; static Children const dummyChildren; + static Node const _dummyNode; }; diff --git a/arangod/Agency/RemoveFollower.cpp b/arangod/Agency/RemoveFollower.cpp index c53ff47b9f..6d430cf3d9 100644 --- a/arangod/Agency/RemoveFollower.cpp +++ b/arangod/Agency/RemoveFollower.cpp @@ -183,8 +183,7 @@ bool RemoveFollower::start(bool&) { // Now find some new servers to remove: std::unordered_map overview; // get an overview over the servers // -1 : not "GOOD", can be in sync, or leader, or not - // >=0: number of servers for which it is in sync or confirmed - // leader + // >=0: number of servers for which it is in sync or confirmed leader bool leaderBad = false; for (auto const& srv : VPackArrayIterator(planned)) { std::string serverName = srv.copyString(); diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index f86ec386ba..a97cbfdaa1 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -562,18 +562,10 @@ RestStatus RestAgencyHandler::handleConfig() { RestStatus RestAgencyHandler::handleState() { VPackBuilder body; - body.add(VPackValue(VPackValueType::Array)); - for (auto const& i : _agent->state().get()) { - body.add(VPackValue(VPackValueType::Object)); - body.add("index", VPackValue(i.index)); - body.add("term", VPackValue(i.term)); - if (i.entry != nullptr) { - body.add("query", VPackSlice(i.entry->data())); - } - body.add("clientId", VPackValue(i.clientId)); - body.close(); + { + VPackObjectBuilder o(&body); + _agent->readDB(body); } - body.close(); generateResult(rest::ResponseCode::OK, body.slice()); return RestStatus::DONE; } diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index ebae01e14f..aeda883b4c 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -245,7 +245,7 @@ std::vector State::logLeaderMulti(query_t const& transactions, if (!i.isArray()) { THROW_ARANGO_EXCEPTION_MESSAGE(30000, "Transaction syntax is [{}, " - "}, \"clientId\"]"); + "{}, \"clientId\"]"); } if (applicable[j] == APPLIED) { @@ -380,7 +380,7 @@ index_t State::logFollower(query_t const& transactions) { // Now we must completely erase our log and compaction snapshots and // start from the snapshot Store snapshot(_agent, "snapshot"); - snapshot = slices[0].get("readDB"); + snapshot = slices[0]; if (!storeLogFromSnapshot(snapshot, snapshotIndex, snapshotTerm)) { LOG_TOPIC("f7250", FATAL, Logger::AGENCY) << "Could not restore received log snapshot."; @@ -809,7 +809,7 @@ bool State::loadLastCompactedSnapshot(Store& store, index_t& index, term_t& term VPackSlice i = result[0]; VPackSlice ii = i.resolveExternals(); try { - store = ii.get("readDB"); + store = ii; index = basics::StringUtils::uint64(ii.get("_key").copyString()); term = ii.get("term").getNumber(); return true; @@ -1246,6 +1246,7 @@ bool State::persistCompactionSnapshot(index_t cind, arangodb::consensus::term_t } store.add("term", VPackValue(static_cast(term))); store.add("_key", VPackValue(i_str.str())); + store.add("version", VPackValue(2)); } TRI_ASSERT(_vocbase != nullptr); @@ -1493,7 +1494,7 @@ std::shared_ptr State::latestAgencyState(TRI_vocbase_t& vocbase, // Result can only have length 0 or 1. VPackSlice ii = result[0].resolveExternals(); buffer_t tmp = std::make_shared>(); - store = ii.get("readDB"); + store = ii; index = arangodb::basics::StringUtils::uint64(ii.get("_key").copyString()); term = ii.get("term").getNumber(); LOG_TOPIC("d838b", INFO, Logger::AGENCY) @@ -1598,11 +1599,12 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const { } if (n > 0) { - std::string const compstr - = "FOR c in compact FILTER c._key >= '" + firstIndex + - "' SORT c._key LIMIT 1 RETURN c"; - arangodb::aql::Query compQuery(false, *_vocbase, aql::QueryString(compstr), + std::string const compQueryStr = + std::string("FOR c in compact FILTER c._key >= '") + firstIndex + + std::string("' SORT c._key LIMIT 1 RETURN c"); + + arangodb::aql::Query compQuery(false, *_vocbase, aql::QueryString(compQueryStr), bindVars, nullptr, arangodb::aql::PART_MAIN); aql::QueryResult compQueryResult = compQuery.executeSync(_queryRegistry); diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 3f0eeb2793..6bb53e727a 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -122,6 +122,7 @@ Store::Store(Agent* agent, std::string const& name) Store& Store::operator=(Store const& rhs) { if (&rhs != this) { MUTEX_LOCKER(otherLock, rhs._storeLock); + MUTEX_LOCKER(lock, _storeLock); _agent = rhs._agent; _timeTable = rhs._timeTable; _observerTable = rhs._observerTable; @@ -135,6 +136,7 @@ Store& Store::operator=(Store const& rhs) { Store& Store::operator=(Store&& rhs) { if (&rhs != this) { MUTEX_LOCKER(otherLock, rhs._storeLock); + MUTEX_LOCKER(lock, _storeLock); _agent = std::move(rhs._agent); _timeTable = std::move(rhs._timeTable); _observerTable = std::move(rhs._observerTable); @@ -336,23 +338,23 @@ std::vector Store::applyLogEntries(arangodb::velocypack::Builder const& qu body.add("term", VPackValue(term)); body.add("index", VPackValue(index)); auto ret = in.equal_range(url); - std::string currentKey; + std::map> result; + // key -> (modified -> op) for (auto it = ret.first; it != ret.second; ++it) { - if (currentKey != it->second->key) { - if (!currentKey.empty()) { - body.close(); - } - body.add(it->second->key, VPackValue(VPackValueType::Object)); - currentKey = it->second->key; - } - body.add(VPackValue(it->second->modified)); - { - VPackObjectBuilder b(&body); - body.add("op", VPackValue(it->second->oper)); - } + result[it->second->key][it->second->modified] = it->second->oper; } - if (!currentKey.empty()) { - body.close(); + for (auto const& m : result) { + body.add(VPackValue(m.first)); + { + VPackObjectBuilder guard(&body); + for (auto const& m2 : m.second) { + body.add(VPackValue(m2.first)); + { + VPackObjectBuilder guard2(&body); + body.add("op", VPackValue(m2.second)); + } + } + } } } @@ -388,26 +390,26 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const { std::string key = precond.key.copyString(); std::vector pv = split(key, '/'); - Node node("precond"); + Node const* node = &Node::dummyNode(); // Check is guarded in ::apply bool found = _node.has(pv); if (found) { - node = _node(pv); + node = &_node(pv); } if (precond.value.isObject()) { for (auto const& op : VPackObjectIterator(precond.value)) { std::string const& oper = op.key.copyString(); if (oper == "old") { // old - if (node != op.value) { + if (*node != op.value) { ret.push_back(precond.key); if (mode == FIRST_FAIL) { break; } } } else if (oper == "oldNot") { // oldNot - if (node == op.value) { + if (*node == op.value) { ret.push_back(precond.key); if (mode == FIRST_FAIL) { break; @@ -422,7 +424,7 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const { break; } } - bool isArray = (node.type() == LEAF && node.slice().isArray()); + bool isArray = (node->type() == LEAF && node->slice().isArray()); if (op.value.getBool() ? !isArray : isArray) { ret.push_back(precond.key); if (mode == FIRST_FAIL) { @@ -446,9 +448,9 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const { } } else if (oper == "in") { // in if (found) { - if (node.slice().isArray()) { + if (node->slice().isArray()) { bool _found = false; - for (auto const& i : VPackArrayIterator(node.slice())) { + for (auto const& i : VPackArrayIterator(node->slice())) { if (i == op.value) { _found = true; continue; @@ -469,9 +471,9 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const { if (!found) { continue; } else { - if (node.slice().isArray()) { + if (node->slice().isArray()) { bool _found = false; - for (auto const& i : VPackArrayIterator(node.slice())) { + for (auto const& i : VPackArrayIterator(node->slice())) { if (i == op.value) { _found = true; continue; @@ -498,7 +500,7 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const { } } } else { - if (node != precond.value) { + if (*node != precond.value) { ret.push_back(precond.key); if (mode == FIRST_FAIL) { break; @@ -609,14 +611,25 @@ query_t Store::clearExpired() const { void Store::dumpToBuilder(Builder& builder) const { MUTEX_LOCKER(storeLocker, _storeLock); toBuilder(builder, true); + + std::map clean {}; + for (auto const& i : _timeTable) { + auto ts = std::chrono::duration_cast( + i.first.time_since_epoch()).count(); + auto it = clean.find(i.second); + if (it == clean.end()) { + clean[i.second] = ts; + } else if (ts < it->second) { + it->second = ts; + } + } { VPackObjectBuilder guard(&builder); - for (auto const& i : _timeTable) { - auto ts = std::chrono::duration_cast(i.first.time_since_epoch()) - .count(); - builder.add(i.second, VPackValue(ts)); + for (auto const& c : clean) { + builder.add(c.first, VPackValue(c.second)); } } + { VPackArrayBuilder garray(&builder); for (auto const& i : _observerTable) { @@ -678,21 +691,29 @@ void Store::clear() { } /// Apply a request to my key value store -Store& Store::operator=(VPackSlice const& slice) { - TRI_ASSERT(slice.isArray()); +Store& Store::operator=(VPackSlice const& s) { + TRI_ASSERT(s.isObject()); + TRI_ASSERT(s.hasKey("readDB")); + auto const& slice = s.get("readDB"); TRI_ASSERT(slice.length() == 4); MUTEX_LOCKER(storeLocker, _storeLock); _node.applies(slice[0]); - TRI_ASSERT(slice[1].isObject()); - for (auto const& entry : VPackObjectIterator(slice[1])) { - long long tse = entry.value.getInt(); - _timeTable.emplace( - std::pair(TimePoint(std::chrono::duration(tse)), - entry.key.copyString())); + if (s.hasKey("version")) { + TRI_ASSERT(slice[1].isObject()); + for (auto const& entry : VPackObjectIterator(slice[1])) { + if (entry.value.isNumber()) { + auto const& key = entry.key.copyString(); + if (_node.has(key)) { + auto tp = TimePoint(std::chrono::seconds(entry.value.getNumber())); + _node(key).timeToLive(tp); + _timeTable.emplace(std::pair(tp, key)); + } + } + } } - + TRI_ASSERT(slice[2].isArray()); for (auto const& entry : VPackArrayIterator(slice[2])) { TRI_ASSERT(entry.isObject()); @@ -769,11 +790,10 @@ bool Store::has(std::string const& path) const { /// Remove ttl entry for path, guarded by caller void Store::removeTTL(std::string const& uri) { _storeLock.assertLockedByCurrentThread(); - if (!_timeTable.empty()) { for (auto it = _timeTable.cbegin(); it != _timeTable.cend();) { if (it->second == uri) { - _timeTable.erase(it++); + it = _timeTable.erase(it); } else { ++it; } diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index fc4efa517c..c3919bf066 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -872,7 +872,7 @@ void Supervision::run() { } } } - + auto lapTime = std::chrono::duration_cast( std::chrono::steady_clock::now() - lapStart).count(); diff --git a/arangod/Agency/Supervision.h b/arangod/Agency/Supervision.h index 00392c0e2b..9d51f99fd8 100644 --- a/arangod/Agency/Supervision.h +++ b/arangod/Agency/Supervision.h @@ -133,8 +133,7 @@ class Supervision : public arangodb::CriticalThread { /// @brief Upgrade agency to supervision overhaul jobs void upgradeHealthRecords(VPackBuilder&); - /// @brief Check for orphaned index creations, which have been successfully - /// built + /// @brief Check for orphaned index creations, which have been successfully built void readyOrphanedIndexCreations(); /// @brief Check for inconsistencies in replication factor vs dbs entries diff --git a/tests/js/client/agency/agency-test.js b/tests/js/client/agency/agency-test.js index a083ee3748..abcb391a58 100644 --- a/tests/js/client/agency/agency-test.js +++ b/tests/js/client/agency/agency-test.js @@ -676,7 +676,7 @@ function agencyTestSuite () { writeAndCheck([[{"/a/y":{"op":"set","new":12}}]]); assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]); wait(1.1); - assertEqual(readAndCheck([["/a/y"]]), [{"a":{"y":12}}]); + assertEqual(readAndCheck([["/a/y"]]), [{a:{}}]); writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12}}}]]); assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{"bar":{"baz":12}}}]);