From 9044f7de97ab3983bd7a50cf0e81fa63e743874c Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Mon, 14 Oct 2019 15:46:06 +0200 Subject: [PATCH] [3.5] yet another agency ttl bug (#10242) * port from devel * Update CHANGELOG --- CHANGELOG | 6 ++- arangod/Agency/AgencyCommon.h | 6 +-- arangod/Agency/Node.cpp | 22 ++++++-- arangod/Agency/Node.h | 3 +- arangod/Agency/State.cpp | 99 ++++++++++++++++++++++++++--------- 5 files changed, 100 insertions(+), 36 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index e69d9eefd6..7399efcd15 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ v3.5.2 (XXXX-XX-XX) ------------------- +* Fixed agency TTL bug happening under certain rare conditions. + * Improved performance of some agency helper functions. * Fixed search not working in document view while in code mode. @@ -8,13 +10,13 @@ v3.5.2 (XXXX-XX-XX) * Fixed issue #10090: fix repeatable seek to the same document in SEARCH operations for ArangoSearch views. -* Fixed issue #10193: Arangoexport does not handle line feeds when exporting as +* Fixed issue #10193: Arangoexport does not handle line feeds when exporting as csv. * Removed debug log messages "found comm task ..." that could be logged on server shutdown. -* Fixed issue #10183: Arangoimport imports on _system when you try to +* Fixed issue #10183: Arangoimport imports on _system when you try to create a new database. This bugfix fixes the output of arangoimport, which could display a diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 85ced37d38..89598328e0 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -110,13 +110,13 @@ struct log_t { std::chrono::milliseconds timestamp; // Timestamp log_t(index_t idx, term_t t, buffer_t const& e, - std::string const& clientId = std::string()) + std::string const& clientId, + uint64_t const& m = 0) : index(idx), term(t), entry(std::make_shared>(*e.get())), clientId(clientId), - timestamp(std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch())) { + timestamp(m) { } friend std::ostream& operator<<(std::ostream& o, log_t const& l) { diff --git a/arangod/Agency/Node.cpp b/arangod/Agency/Node.cpp index 5e1f609b73..8e10732481 100644 --- a/arangod/Agency/Node.cpp +++ b/arangod/Agency/Node.cpp @@ -415,8 +415,7 @@ Store* Node::getStore() { ValueType Node::valueType() const { return slice().type(); } // file time to live entry for this node to now + millis -bool Node::addTimeToLive(long millis) { - auto tkey = std::chrono::system_clock::now() + std::chrono::milliseconds(millis); +bool Node::addTimeToLive(std::chrono::time_point const& tkey) { store().timeTable().insert(std::pair(tkey, uri())); _ttl = tkey; return true; @@ -451,6 +450,8 @@ namespace consensus { template <> bool Node::handle(VPackSlice const& slice) { + using namespace std::chrono; + if (!slice.hasKey("new")) { LOG_TOPIC("ad662", WARN, Logger::AGENCY) << "Operator set without new value: " << slice.toJson(); @@ -474,11 +475,22 @@ bool Node::handle(VPackSlice const& slice) { if (slice.hasKey("ttl")) { VPackSlice ttl_v = slice.get("ttl"); if (ttl_v.isNumber()) { + + // ttl in millisconds long ttl = 1000l * ((ttl_v.isDouble()) - ? static_cast(slice.get("ttl").getNumber()) - : static_cast(slice.get("ttl").getNumber())); - addTimeToLive(ttl); + ? static_cast(slice.get("ttl").getNumber()) + : static_cast(slice.get("ttl").getNumber())); + + // calclate expiry time + auto const expires = slice.hasKey("epoch_millis") ? + time_point( + milliseconds(slice.get("epoch_millis").getNumber() + ttl)) : + system_clock::now() + milliseconds(ttl); + + // set ttl limit + addTimeToLive(expires); + } else { LOG_TOPIC("66da2", WARN, Logger::AGENCY) << "Non-number value assigned to ttl: " << ttl_v.toJson(); diff --git a/arangod/Agency/Node.h b/arangod/Agency/Node.h index 1c6b593b60..d32a14f559 100644 --- a/arangod/Agency/Node.h +++ b/arangod/Agency/Node.h @@ -336,7 +336,8 @@ public: protected: /// @brief Add time to live entry - virtual bool addTimeToLive(long millis); + virtual bool addTimeToLive( + std::chrono::time_point const& tp); /// @brief Remove time to live entry virtual bool removeTimeToLive(); diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 8f17ae9d42..175f1b3bde 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -72,14 +72,16 @@ State::~State() {} inline static std::string timestamp(uint64_t m) { + TRI_ASSERT(m != 0); + using namespace std::chrono; - - std::time_t t = (m == 0) ? std::time(nullptr) : + + std::time_t t = system_clock::to_time_t(system_clock::time_point(milliseconds(m))); char mbstr[100]; - return std::strftime(mbstr, sizeof(mbstr), "%Y-%m-%d %H:%M:%S %Z", std::localtime(&t)) - ? std::string(mbstr) - : std::string(); + return std::strftime(mbstr, sizeof(mbstr), "%Y-%m-%d %H:%M:%S %Z", std::gmtime(&t)) + ? std::string(mbstr) + : std::string(); } inline static std::string stringify(index_t index) { @@ -103,6 +105,7 @@ bool State::persist(index_t index, term_t term, uint64_t millis, body.add("request", entry); body.add("clientId", Value(clientId)); body.add("timestamp", Value(timestamp(millis))); + body.add("epoch_millis", Value(millis)); } TRI_ASSERT(_vocbase != nullptr); @@ -151,6 +154,7 @@ bool State::persistconf(index_t index, term_t term, uint64_t millis, log.add("request", entry); log.add("clientId", Value(clientId)); log.add("timestamp", Value(timestamp(millis))); + log.add("epoch_millis", Value(millis)); } // The new configuration to be persisted.------------------------------------- @@ -225,6 +229,9 @@ bool State::persistconf(index_t index, term_t term, uint64_t millis, std::vector State::logLeaderMulti(query_t const& transactions, std::vector const& applicable, term_t term) { + + using namespace std::chrono; + std::vector idx(applicable.size()); size_t j = 0; auto const& slice = transactions->slice(); @@ -257,8 +264,10 @@ std::vector State::logLeaderMulti(query_t const& transactions, TRI_ASSERT(transaction.length() > 0); size_t pos = transaction.keyAt(0).copyString().find(RECONFIGURE); - idx[j] = logNonBlocking(_log.back().index + 1, i[0], term, 0, clientId, true, - pos == 0 || pos == 1); + idx[j] = logNonBlocking( + _log.back().index + 1, i[0], term, + duration_cast(system_clock::now().time_since_epoch()).count(), + clientId, true, pos == 0 || pos == 1); } ++j; } @@ -269,7 +278,10 @@ std::vector State::logLeaderMulti(query_t const& transactions, index_t State::logLeaderSingle(velocypack::Slice const& slice, term_t term, std::string const& clientId) { MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order - return logNonBlocking(_log.back().index + 1, slice, term, 0, clientId, true); + using namespace std::chrono; + return logNonBlocking( + _log.back().index + 1, slice, term, + duration_cast(system_clock::now().time_since_epoch()).count(), clientId, true); } /// Log transaction (leader) @@ -290,7 +302,7 @@ index_t State::logNonBlocking(index_t idx, velocypack::Slice const& slice, FATAL_ERROR_EXIT(); } - logEmplaceBackNoLock(log_t(idx, term, buf, clientId)); + logEmplaceBackNoLock(log_t(idx, term, buf, clientId, millis)); return _log.back().index; } @@ -515,7 +527,7 @@ size_t State::removeConflicts(query_t const& transactions, bool gotSnapshot) { void State::logEraseNoLock( std::deque::iterator rbegin, std::deque::iterator rend) { - + for (auto lit = rbegin; lit != rend; lit++) { std::string const& clientId = lit->clientId; if (!clientId.empty()) { @@ -664,8 +676,23 @@ VPackBuilder State::slices(index_t start, index_t end) const { } for (size_t i = start - _cur; i <= end - _cur; ++i) { - try { - slices.add(VPackSlice(_log.at(i).entry->data())); + try { //{ "a" : {"op":"set", "ttl":20, ...}} + auto slice = VPackSlice(_log.at(i).entry->data()); + VPackObjectBuilder o(&slices); + for (auto const& oper : VPackObjectIterator(slice)) { + slices.add(VPackValue(oper.key.copyString())); + + if (oper.value.isObject() && oper.value.hasKey("op") && + oper.value.get("op").isEqualString("set") && oper.value.hasKey("ttl")) { + VPackObjectBuilder oo(&slices); + for (auto const& i : VPackObjectIterator(oper.value)) { + slices.add(i.key.copyString(), i.value); + } + slices.add("epoch_millis", VPackValue(_log.at(i).timestamp.count())); + } else { + slices.add(oper.value); + } + } } catch (std::exception const&) { break; } @@ -673,9 +700,7 @@ VPackBuilder State::slices(index_t start, index_t end) const { } mutexLocker.unlock(); - slices.close(); - return slices; } @@ -753,6 +778,11 @@ bool State::ready() const { return _ready; } /// Load collections bool State::loadCollections(TRI_vocbase_t* vocbase, QueryRegistry* queryRegistry, bool waitForSync) { + + using namespace std::chrono; + auto const epoch_millis = + duration_cast(system_clock::now().time_since_epoch()).count(); + _vocbase = vocbase; _queryRegistry = queryRegistry; @@ -767,8 +797,8 @@ bool State::loadCollections(TRI_vocbase_t* vocbase, std::shared_ptr> buf = std::make_shared>(); VPackSlice value = arangodb::velocypack::Slice::emptyObjectSlice(); buf->append(value.startAs(), value.byteSize()); - _log.emplace_back(log_t(index_t(0), term_t(0), buf, std::string())); - persist(0, 0, 0, value, std::string()); + _log.emplace_back(log_t(index_t(0), term_t(0), buf, std::string(), epoch_millis)); + persist(0, 0, epoch_millis, value, std::string()); } _ready = true; return true; @@ -1025,7 +1055,7 @@ bool State::loadRemaining() { MUTEX_LOCKER(logLock, _logLock); if (result.isArray() && result.length() > 0) { TRI_ASSERT(_log.empty()); // was cleared in loadCompacted - std::string clientId; + std::string clientId, tstamp; // We know that _cur has been set in loadCompacted to the index of the // snapshot that was loaded or to 0 if there is no snapshot. index_t lastIndex = _cur; @@ -1040,6 +1070,22 @@ bool State::loadRemaining() { clientId = req.hasKey("clientId") ? req.get("clientId").copyString() : std::string(); + uint64_t millis = 0; + if (ii.hasKey("epoch_millis")) { + if (ii.get("epoch_millis").isInteger()) { + try { + millis = ii.get("epoch_millis").getNumber(); + } catch (std::exception const& e) { + LOG_TOPIC("2ee75", ERR, Logger::AGENCY) + << "Failed to parse integer value for epoch_millis " << e.what(); + FATAL_ERROR_EXIT(); + } + } else { + LOG_TOPIC("52ee7", ERR, Logger::AGENCY) << "epoch_millis is not an integer type"; + FATAL_ERROR_EXIT(); + } + } + // Dummy fill missing entries (Not good at all.) index_t index(StringUtils::uint64(ii.get(StaticStrings::KeyString).copyString())); @@ -1065,7 +1111,7 @@ bool State::loadRemaining() { // Real entries logEmplaceBackNoLock( log_t(StringUtils::uint64(ii.get(StaticStrings::KeyString).copyString()), - ii.get("term").getNumber(), tmp, clientId)); + ii.get("term").getNumber(), tmp, clientId, millis)); lastIndex = index; } } @@ -1541,8 +1587,12 @@ std::shared_ptr State::latestAgencyState(TRI_vocbase_t& vocbase, std::string clientId = req.hasKey("clientId") ? req.get("clientId").copyString() : std::string(); + uint64_t epoch_millis = + (req.hasKey("epoch_millis") && req.get("epoch_millis").isInteger()) ? + req.get("epoch_millis").getNumber() : 0; + log_t entry(StringUtils::uint64(ii.get(StaticStrings::KeyString).copyString()), - ii.get("term").getNumber(), tmp, clientId); + ii.get("term").getNumber(), tmp, clientId, epoch_millis); if (entry.index <= index) { LOG_TOPIC("c8f91", WARN, Logger::AGENCY) @@ -1575,7 +1625,7 @@ std::shared_ptr State::latestAgencyState(TRI_vocbase_t& vocbase, uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const { TRI_ASSERT(builder.isOpenObject()); - + auto bindVars = std::make_shared(); { VPackObjectBuilder b(bindVars.get()); } @@ -1595,7 +1645,7 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const { VPackSlice result = logQueryResult.data->slice().resolveExternals(); std::string firstIndex; uint64_t n = 0; - + auto copyWithoutId = [&](VPackSlice slice, VPackBuilder& builder) { // Need to remove custom attribute in _id: { VPackObjectBuilder guard(&builder); @@ -1632,7 +1682,7 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const { std::string const compQueryStr = std::string("FOR c in compact FILTER c._key >= '") + firstIndex + std::string("' SORT c._key LIMIT 1 RETURN c"); - + arangodb::aql::Query compQuery(false, *_vocbase, aql::QueryString(compQueryStr), bindVars, nullptr, arangodb::aql::PART_MAIN); @@ -1641,7 +1691,7 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const { if (compQueryResult.result.fail()) { THROW_ARANGO_EXCEPTION(compQueryResult.result); } - + result = compQueryResult.data->slice().resolveExternals(); if (result.isArray()) { @@ -1657,7 +1707,6 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const { } } } - + return n; } -