1
0
Fork 0

[3.5] yet another agency ttl bug (#10242)

* port from devel

* Update CHANGELOG
This commit is contained in:
Kaveh Vahedipour 2019-10-14 15:46:06 +02:00 committed by KVS85
parent 5d528fffb2
commit 9044f7de97
5 changed files with 100 additions and 36 deletions

View File

@ -1,6 +1,8 @@
v3.5.2 (XXXX-XX-XX)
-------------------
* Fixed agency TTL bug happening under certain rare conditions.
* Improved performance of some agency helper functions.
* Fixed search not working in document view while in code mode.

View File

@ -110,13 +110,13 @@ struct log_t {
std::chrono::milliseconds timestamp; // Timestamp
log_t(index_t idx, term_t t, buffer_t const& e,
std::string const& clientId = std::string())
std::string const& clientId,
uint64_t const& m = 0)
: index(idx),
term(t),
entry(std::make_shared<arangodb::velocypack::Buffer<uint8_t>>(*e.get())),
clientId(clientId),
timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())) {
timestamp(m) {
}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {

View File

@ -415,8 +415,7 @@ Store* Node::getStore() {
ValueType Node::valueType() const { return slice().type(); }
// file time to live entry for this node to now + millis
bool Node::addTimeToLive(long millis) {
auto tkey = std::chrono::system_clock::now() + std::chrono::milliseconds(millis);
bool Node::addTimeToLive(std::chrono::time_point<std::chrono::system_clock> const& tkey) {
store().timeTable().insert(std::pair<TimePoint, std::string>(tkey, uri()));
_ttl = tkey;
return true;
@ -451,6 +450,8 @@ namespace consensus {
template <>
bool Node::handle<SET>(VPackSlice const& slice) {
using namespace std::chrono;
if (!slice.hasKey("new")) {
LOG_TOPIC("ad662", WARN, Logger::AGENCY)
<< "Operator set without new value: " << slice.toJson();
@ -474,11 +475,22 @@ bool Node::handle<SET>(VPackSlice const& slice) {
if (slice.hasKey("ttl")) {
VPackSlice ttl_v = slice.get("ttl");
if (ttl_v.isNumber()) {
// ttl in millisconds
long ttl =
1000l * ((ttl_v.isDouble())
? static_cast<long>(slice.get("ttl").getNumber<double>())
: static_cast<long>(slice.get("ttl").getNumber<int>()));
addTimeToLive(ttl);
? static_cast<long>(slice.get("ttl").getNumber<double>())
: static_cast<long>(slice.get("ttl").getNumber<int>()));
// calclate expiry time
auto const expires = slice.hasKey("epoch_millis") ?
time_point<system_clock>(
milliseconds(slice.get("epoch_millis").getNumber<uint64_t>() + ttl)) :
system_clock::now() + milliseconds(ttl);
// set ttl limit
addTimeToLive(expires);
} else {
LOG_TOPIC("66da2", WARN, Logger::AGENCY)
<< "Non-number value assigned to ttl: " << ttl_v.toJson();

View File

@ -336,7 +336,8 @@ public:
protected:
/// @brief Add time to live entry
virtual bool addTimeToLive(long millis);
virtual bool addTimeToLive(
std::chrono::time_point<std::chrono::system_clock> const& tp);
/// @brief Remove time to live entry
virtual bool removeTimeToLive();

View File

@ -72,14 +72,16 @@ State::~State() {}
inline static std::string timestamp(uint64_t m) {
TRI_ASSERT(m != 0);
using namespace std::chrono;
std::time_t t = (m == 0) ? std::time(nullptr) :
std::time_t t =
system_clock::to_time_t(system_clock::time_point(milliseconds(m)));
char mbstr[100];
return std::strftime(mbstr, sizeof(mbstr), "%Y-%m-%d %H:%M:%S %Z", std::localtime(&t))
? std::string(mbstr)
: std::string();
return std::strftime(mbstr, sizeof(mbstr), "%Y-%m-%d %H:%M:%S %Z", std::gmtime(&t))
? std::string(mbstr)
: std::string();
}
inline static std::string stringify(index_t index) {
@ -103,6 +105,7 @@ bool State::persist(index_t index, term_t term, uint64_t millis,
body.add("request", entry);
body.add("clientId", Value(clientId));
body.add("timestamp", Value(timestamp(millis)));
body.add("epoch_millis", Value(millis));
}
TRI_ASSERT(_vocbase != nullptr);
@ -151,6 +154,7 @@ bool State::persistconf(index_t index, term_t term, uint64_t millis,
log.add("request", entry);
log.add("clientId", Value(clientId));
log.add("timestamp", Value(timestamp(millis)));
log.add("epoch_millis", Value(millis));
}
// The new configuration to be persisted.-------------------------------------
@ -225,6 +229,9 @@ bool State::persistconf(index_t index, term_t term, uint64_t millis,
std::vector<index_t> State::logLeaderMulti(query_t const& transactions,
std::vector<apply_ret_t> const& applicable,
term_t term) {
using namespace std::chrono;
std::vector<index_t> idx(applicable.size());
size_t j = 0;
auto const& slice = transactions->slice();
@ -257,8 +264,10 @@ std::vector<index_t> State::logLeaderMulti(query_t const& transactions,
TRI_ASSERT(transaction.length() > 0);
size_t pos = transaction.keyAt(0).copyString().find(RECONFIGURE);
idx[j] = logNonBlocking(_log.back().index + 1, i[0], term, 0, clientId, true,
pos == 0 || pos == 1);
idx[j] = logNonBlocking(
_log.back().index + 1, i[0], term,
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
clientId, true, pos == 0 || pos == 1);
}
++j;
}
@ -269,7 +278,10 @@ std::vector<index_t> State::logLeaderMulti(query_t const& transactions,
index_t State::logLeaderSingle(velocypack::Slice const& slice, term_t term,
std::string const& clientId) {
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
return logNonBlocking(_log.back().index + 1, slice, term, 0, clientId, true);
using namespace std::chrono;
return logNonBlocking(
_log.back().index + 1, slice, term,
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(), clientId, true);
}
/// Log transaction (leader)
@ -290,7 +302,7 @@ index_t State::logNonBlocking(index_t idx, velocypack::Slice const& slice,
FATAL_ERROR_EXIT();
}
logEmplaceBackNoLock(log_t(idx, term, buf, clientId));
logEmplaceBackNoLock(log_t(idx, term, buf, clientId, millis));
return _log.back().index;
}
@ -664,8 +676,23 @@ VPackBuilder State::slices(index_t start, index_t end) const {
}
for (size_t i = start - _cur; i <= end - _cur; ++i) {
try {
slices.add(VPackSlice(_log.at(i).entry->data()));
try { //{ "a" : {"op":"set", "ttl":20, ...}}
auto slice = VPackSlice(_log.at(i).entry->data());
VPackObjectBuilder o(&slices);
for (auto const& oper : VPackObjectIterator(slice)) {
slices.add(VPackValue(oper.key.copyString()));
if (oper.value.isObject() && oper.value.hasKey("op") &&
oper.value.get("op").isEqualString("set") && oper.value.hasKey("ttl")) {
VPackObjectBuilder oo(&slices);
for (auto const& i : VPackObjectIterator(oper.value)) {
slices.add(i.key.copyString(), i.value);
}
slices.add("epoch_millis", VPackValue(_log.at(i).timestamp.count()));
} else {
slices.add(oper.value);
}
}
} catch (std::exception const&) {
break;
}
@ -673,9 +700,7 @@ VPackBuilder State::slices(index_t start, index_t end) const {
}
mutexLocker.unlock();
slices.close();
return slices;
}
@ -753,6 +778,11 @@ bool State::ready() const { return _ready; }
/// Load collections
bool State::loadCollections(TRI_vocbase_t* vocbase,
QueryRegistry* queryRegistry, bool waitForSync) {
using namespace std::chrono;
auto const epoch_millis =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
_vocbase = vocbase;
_queryRegistry = queryRegistry;
@ -767,8 +797,8 @@ bool State::loadCollections(TRI_vocbase_t* vocbase,
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::velocypack::Slice::emptyObjectSlice();
buf->append(value.startAs<char const>(), value.byteSize());
_log.emplace_back(log_t(index_t(0), term_t(0), buf, std::string()));
persist(0, 0, 0, value, std::string());
_log.emplace_back(log_t(index_t(0), term_t(0), buf, std::string(), epoch_millis));
persist(0, 0, epoch_millis, value, std::string());
}
_ready = true;
return true;
@ -1025,7 +1055,7 @@ bool State::loadRemaining() {
MUTEX_LOCKER(logLock, _logLock);
if (result.isArray() && result.length() > 0) {
TRI_ASSERT(_log.empty()); // was cleared in loadCompacted
std::string clientId;
std::string clientId, tstamp;
// We know that _cur has been set in loadCompacted to the index of the
// snapshot that was loaded or to 0 if there is no snapshot.
index_t lastIndex = _cur;
@ -1040,6 +1070,22 @@ bool State::loadRemaining() {
clientId = req.hasKey("clientId") ? req.get("clientId").copyString()
: std::string();
uint64_t millis = 0;
if (ii.hasKey("epoch_millis")) {
if (ii.get("epoch_millis").isInteger()) {
try {
millis = ii.get("epoch_millis").getNumber<uint64_t>();
} catch (std::exception const& e) {
LOG_TOPIC("2ee75", ERR, Logger::AGENCY)
<< "Failed to parse integer value for epoch_millis " << e.what();
FATAL_ERROR_EXIT();
}
} else {
LOG_TOPIC("52ee7", ERR, Logger::AGENCY) << "epoch_millis is not an integer type";
FATAL_ERROR_EXIT();
}
}
// Dummy fill missing entries (Not good at all.)
index_t index(StringUtils::uint64(ii.get(StaticStrings::KeyString).copyString()));
@ -1065,7 +1111,7 @@ bool State::loadRemaining() {
// Real entries
logEmplaceBackNoLock(
log_t(StringUtils::uint64(ii.get(StaticStrings::KeyString).copyString()),
ii.get("term").getNumber<uint64_t>(), tmp, clientId));
ii.get("term").getNumber<uint64_t>(), tmp, clientId, millis));
lastIndex = index;
}
}
@ -1541,8 +1587,12 @@ std::shared_ptr<VPackBuilder> State::latestAgencyState(TRI_vocbase_t& vocbase,
std::string clientId =
req.hasKey("clientId") ? req.get("clientId").copyString() : std::string();
uint64_t epoch_millis =
(req.hasKey("epoch_millis") && req.get("epoch_millis").isInteger()) ?
req.get("epoch_millis").getNumber<uint64_t>() : 0;
log_t entry(StringUtils::uint64(ii.get(StaticStrings::KeyString).copyString()),
ii.get("term").getNumber<uint64_t>(), tmp, clientId);
ii.get("term").getNumber<uint64_t>(), tmp, clientId, epoch_millis);
if (entry.index <= index) {
LOG_TOPIC("c8f91", WARN, Logger::AGENCY)
@ -1660,4 +1710,3 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const {
return n;
}