1
0
Fork 0

Handle InitDone correctly. (#8552)

* precondition plan / version in compaction / store TTL removal independent of local _ttl set
* Agency init loops break when shutting down.
* assertion failures in store on restarting following agents
* Minor porting fixes from 3.4
This commit is contained in:
Max Neunhöffer 2019-04-01 17:01:05 +02:00 committed by GitHub
parent 14429d9163
commit 02281d3be4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 211 additions and 159 deletions

View File

@ -1181,11 +1181,11 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover(AgencyTransaction const
bool AgencyComm::ensureStructureInitialized() {
LOG_TOPIC("748e2", TRACE, Logger::AGENCYCOMM) << "checking if agency is initialized";
while (true) {
while (shouldInitializeStructure()) {
while (!application_features::ApplicationServer::isStopping() &&
shouldInitializeStructure()) {
LOG_TOPIC("17e16", TRACE, Logger::AGENCYCOMM)
<< "Agency is fresh. Needs initial structure.";
// mop: we are the chosen one .. great success
if (tryInitializeStructure()) {
LOG_TOPIC("4c5aa", TRACE, Logger::AGENCYCOMM)
@ -1193,29 +1193,10 @@ bool AgencyComm::ensureStructureInitialized() {
break;
}
LOG_TOPIC("e05d1", WARN, Logger::AGENCYCOMM)
LOG_TOPIC("63f7b", WARN, Logger::AGENCYCOMM)
<< "Initializing agency failed. We'll try again soon";
// We should really have exclusive access, here, this is strange!
std::this_thread::sleep_for(std::chrono::seconds(1));
}
AgencyCommResult result = getValues("InitDone");
if (result.successful()) {
VPackSlice value = result.slice()[0].get(
std::vector<std::string>({AgencyCommManager::path(), "InitDone"}));
if (value.isBoolean() && value.getBoolean()) {
// expecting a value of "true"
LOG_TOPIC("e8450", TRACE, Logger::AGENCYCOMM) << "Found an initialized agency";
break;
}
} else {
if (result.httpCode() == 401) {
// unauthorized
LOG_TOPIC("e0376", FATAL, Logger::STARTUP) << "Unauthorized. Wrong credentials.";
FATAL_ERROR_EXIT();
}
}
LOG_TOPIC("9d265", TRACE, Logger::AGENCYCOMM)
<< "Waiting for agency to get initialized";
@ -1346,10 +1327,7 @@ AgencyCommResult AgencyComm::sendWithFailover(arangodb::rest::RequestType method
auto waitSomeTime = [&waitInterval, &result]() -> bool {
// Returning true means timeout because of shutdown:
auto serverFeature = application_features::ApplicationServer::getFeature<ServerFeature>(
"Server");
if (serverFeature->isStopping() ||
!application_features::ApplicationServer::isRetryOK()) {
if (!application_features::ApplicationServer::isRetryOK()) {
LOG_TOPIC("53e58", INFO, Logger::AGENCYCOMM)
<< "Unsuccessful AgencyComm: Timeout because of shutdown "
<< "errorCode: " << result.errorCode()
@ -1404,8 +1382,6 @@ AgencyCommResult AgencyComm::sendWithFailover(arangodb::rest::RequestType method
// Some reporting:
if (tries > 20) {
auto serverState = application_features::ApplicationServer::server->state();
application_features::ApplicationServer::getFeature<ServerFeature>(
"Server");
std::string serverStateStr;
switch (serverState) {
case arangodb::application_features::ServerState::UNINITIALIZED:
@ -1532,8 +1508,7 @@ AgencyCommResult AgencyComm::sendWithFailover(arangodb::rest::RequestType method
continue;
}
} else {
// How odd, we are supposed to get at least {results=[...]}, let's
// retry...
// How odd, we are supposed to get at least {results=[...]}, let's retry...
isInquiry = false;
continue;
}
@ -1749,7 +1724,9 @@ bool AgencyComm::tryInitializeStructure() {
{
VPackObjectBuilder c(&builder);
builder.add("LatestID", VPackValue(1));
addEmptyVPackObject("Problems", builder);
builder.add("UserVersion", VPackValue(1));
addEmptyVPackObject("ServerStates", builder);
builder.add("HeartbeatIntervalMs", VPackValue(1000));
}
@ -1796,10 +1773,9 @@ bool AgencyComm::tryInitializeStructure() {
LOG_TOPIC("58ffe", TRACE, Logger::AGENCYCOMM)
<< "Initializing agency with " << builder.toJson();
AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice());
AgencyWriteTransaction initTransaction;
initTransaction.operations.push_back(initOperation);
AgencyWriteTransaction initTransaction(
AgencyOperation("", AgencyValueOperationType::SET, builder.slice()),
AgencyPrecondition("Plan", AgencyPrecondition::Type::EMPTY, true));
AgencyCommResult result = sendTransactionWithFailover(initTransaction);
if (result.httpCode() == TRI_ERROR_HTTP_UNAUTHORIZED) {
@ -1820,19 +1796,60 @@ bool AgencyComm::tryInitializeStructure() {
}
bool AgencyComm::shouldInitializeStructure() {
VPackBuilder builder;
builder.add(VPackValue(false));
// "InitDone" key should not previously exist
auto result = casValue("InitDone", builder.slice(), false, 60.0,
AgencyCommManager::CONNECTION_OPTIONS._requestTimeout);
size_t nFail = 0;
if (!result.successful()) {
// somebody else has or is initializing the agency
LOG_TOPIC("8a39b", TRACE, Logger::AGENCYCOMM)
<< "someone else is initializing the agency";
return false;
while (!application_features::ApplicationServer::isStopping()) {
auto result = getValues("Plan");
if (!result.successful()) { // Not 200 - 299
if (result.httpCode() == 401) {
// unauthorized
LOG_TOPIC("32781", FATAL, Logger::STARTUP) << "Unauthorized. Wrong credentials.";
FATAL_ERROR_EXIT();
}
// Agency not ready yet
LOG_TOPIC("36253", TRACE, Logger::AGENCYCOMM)
<< "waiting for agency to become ready";
continue;
} else {
// Sanity
if (result.slice().isArray() && result.slice().length() == 1) {
// No plan entry? Should initialise
if (result.slice()[0] == VPackSlice::emptyObjectSlice()) {
LOG_TOPIC("98732", DEBUG, Logger::AGENCYCOMM)
<< "agency initialisation should be performed";
return true;
} else {
LOG_TOPIC("abedb", DEBUG, Logger::AGENCYCOMM)
<< "agency initialisation under way or done";
return false;
}
} else {
// Should never get here
TRI_ASSERT(false);
if (nFail++ < 3) {
LOG_TOPIC("fed52", DEBUG, Logger::AGENCYCOMM) << "What the hell just happened?";
} else {
LOG_TOPIC("54fea", FATAL, Logger::AGENCYCOMM)
<< "Illegal response from agency during bootstrap: "
<< result.slice().toJson();
FATAL_ERROR_EXIT();
}
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
return false;
}

View File

@ -306,9 +306,7 @@ class AgencyTransaction {
/*struct AgencyGeneralTransaction : public AgencyTransaction {
typedef
std::pair<std::vector<AgencyOperation>,std::vector<AgencyPrecondition>>
TransactionType;
typedef std::pair<std::vector<AgencyOperation>,std::vector<AgencyPrecondition>> TransactionType;
explicit AgencyGeneralTransaction(AgencyOperation const& op,
AgencyPrecondition const& pre) :

View File

@ -120,10 +120,11 @@ struct log_t {
std::string const& clientId = std::string())
: index(idx),
term(t),
entry(std::make_shared<arangodb::velocypack::Buffer<uint8_t>>(*e.get())),
clientId(clientId),
timestamp(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())) {}
std::chrono::system_clock::now().time_since_epoch())) {
entry = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>(*e.get());
}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " " << VPackSlice(l.entry->data()).toJson() << " "

View File

@ -35,6 +35,7 @@
using namespace arangodb::application_features;
using namespace arangodb::basics;
using namespace arangodb::options;
using namespace arangodb::rest;
namespace arangodb {
@ -123,10 +124,9 @@ void AgencyFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
new UInt64Parameter(&_maxAppendSize),
arangodb::options::makeFlags(arangodb::options::Flags::Hidden));
options->addOption(
"--agency.disaster-recovery-id",
"allows for specification of the id for this agent; dangerous option for "
"disaster recover only!",
options->addOption("--agency.disaster-recovery-id",
"allows for specification of the id for this agent; "
"dangerous option for disaster recover only!",
new StringParameter(&_recoveryId),
arangodb::options::makeFlags(arangodb::options::Flags::Hidden));
}

View File

@ -481,8 +481,7 @@ void Agent::sendAppendEntriesRPC() {
}
// If the follower is behind our first log entry send last snapshot and
// following logs. Else try to have the follower catch up in regular
// order.
// following logs. Else try to have the follower catch up in regular order.
bool needSnapshot = lastConfirmed < _state.firstIndex();
if (needSnapshot) {
lastConfirmed = _state.lastCompactionAt() - 1;
@ -859,15 +858,14 @@ bool Agent::challengeLeadership() {
// ensure that a leader resigns before another one even starts an
// election. However, the Raft paper does not mention this at all. Rather,
// in the paper it is written that the leader should resign immediately if
// it sees a higher term from another server. Currently we have not
// implemented to return the follower's term with a response to
// AppendEntriesRPC, so the leader cannot find out a higher term this
// way. The leader can, however, see a higher term in the incoming
// AppendEntriesRPC a new leader sends out, and it will immediately
// resign if it sees that. For the moment, this value here can stay.
// We should soon implement sending the follower's term back with
// each response and probably get rid of this method altogether,
// but this requires a bit more thought.
// it sees a higher term from another server. Currently we have not implemented
// to return the follower's term with a response to AppendEntriesRPC, so
// the leader cannot find out a higher term this way. The leader can,
// however, see a higher term in the incoming AppendEntriesRPC a new
// leader sends out, and it will immediately resign if it sees that. For
// the moment, this value here can stay. We should soon implement sending
// the follower's term back with each response and probably get rid of
// this method altogether, but this requires a bit more thought.
if (_config.maxPing() * _config.timeoutMult() > m.count()) {
++good;
}
@ -1646,13 +1644,13 @@ Store const& Agent::transient() const {
/// Rebuild from persisted state
void Agent::setPersistedState(VPackSlice const& compaction) {
// Catch up with compacted state, this is only called at startup
_spearhead = compaction.get("readDB");
_spearhead = compaction;
// Catch up with commit
try {
WRITE_LOCKER(oLocker, _outputLock);
CONDITION_LOCKER(guard, _waitForCV);
_readDB = compaction.get("readDB");
_readDB = compaction;
_commitIndex =
arangodb::basics::StringUtils::uint64(compaction.get("_key").copyString());
_waitForCV.broadcast();

View File

@ -26,6 +26,7 @@
#include "Agency/AgencyCommon.h"
#include "Agency/AgencyStrings.h"
#include "Agency/AgentCallback.h"
#include "Agency/AgentConfiguration.h"
#include "Agency/AgentInterface.h"
#include "Agency/Compactor.h"
@ -314,8 +315,7 @@ class Agent final : public arangodb::Thread, public AgentInterface {
/// @brief Activate this agent in single agent mode.
void activateAgency();
/// @brief add agent to configuration (from State after successful local
/// persistence)
/// @brief add agent to configuration (from State after successful local persistence)
void updateConfiguration(VPackSlice const&);
private:
@ -438,8 +438,7 @@ class Agent final : public arangodb::Thread, public AgentInterface {
/// For _ioLock: We put in assertions to ensure that when this lock is
/// acquired we do not have the _tiLock.
/// @brief Inception thread getting an agent up to join RAFT from cmd or
/// persistence
/// @brief Inception thread getting an agent up to join RAFT from cmd or persistence
std::unique_ptr<Inception> _inception;
/// @brief Compactor

View File

@ -22,9 +22,11 @@
////////////////////////////////////////////////////////////////////////////////
#include "AgentCallback.h"
#include "Agency/Agent.h"
#include "ApplicationFeatures/ApplicationServer.h"
using namespace arangodb::application_features;
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
@ -88,8 +90,7 @@ bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
<< "comm_status(" << res->status << "), last(" << _last << "), follower("
<< _slaveID << "), time(" << TRI_microtime() - _startTime << ")";
} else {
if (!application_features::ApplicationServer::isStopping() &&
(_agent == nullptr || !_agent->isStopping())) {
if (!ApplicationServer::isStopping() && (_agent == nullptr || !_agent->isStopping())) {
// Do not warn if we are already shutting down:
LOG_TOPIC("2c712", WARN, Logger::AGENCY)
<< "Got bad callback from AppendEntriesRPC: "

View File

@ -514,8 +514,7 @@ bool config_t::findInPool(std::string const& id) const {
/// @brief merge from persisted configuration
bool config_t::merge(VPackSlice const& conf) {
WRITE_LOCKER(writeLocker,
_lock); // All must happen under the lock or else ...
WRITE_LOCKER(writeLocker, _lock); // All must happen under the lock or else ...
// FIXME: All these "command line beats persistence" are wrong, since
// the given default values never happen. Only fixed _supervision with

View File

@ -43,8 +43,7 @@ class AgentInterface {
};
/// @brief Attempt write
virtual write_ret_t write(query_t const&,
WriteMode const& mode = WriteMode()) = 0; /// @brief Attempt write
virtual write_ret_t write(query_t const&, WriteMode const& mode = WriteMode()) = 0;
/// @brief Attempt write
virtual trans_ret_t transient(query_t const&) = 0;

View File

@ -97,7 +97,6 @@ void Inception::gossip() {
continue;
}
}
LOG_TOPIC("cc3fd", DEBUG, Logger::AGENCY)
<< "Sending gossip message 1: " << out->toJson() << " to peer " << p;
if (this->isStopping() || _agent->isStopping() || cc == nullptr) {

View File

@ -46,6 +46,7 @@ struct Empty {
};
const Node::Children Node::dummyChildren = Node::Children();
const Node Node::_dummyNode = Node("dumm-di-dumm");
/// @brief Split strings by separator
inline static std::vector<std::string> split(const std::string& str, char separator) {
@ -390,12 +391,22 @@ bool Node::addTimeToLive(long millis) {
return true;
}
void Node::timeToLive(TimePoint const& ttl) {
_ttl = ttl;
}
TimePoint const& Node::timeToLive() const {
return _ttl;
}
// remove time to live entry for this node
bool Node::removeTimeToLive() {
if (_store != nullptr) {
_store->removeTTL(uri());
if (_ttl != std::chrono::system_clock::time_point()) {
store().removeTTL(uri());
_ttl = std::chrono::system_clock::time_point();
}
}
return true;
}

View File

@ -231,6 +231,18 @@ class Node {
/// @brief Is string
bool isString() const;
/**
* @brief Get seconds this node still has to live. (Must be guarded by caller)
* @return seconds to live (int64_t::max, if none set)
*/
TimePoint const& timeToLive() const;
/**
* @brief Set expiry for this node
* @param Time point of expiry
*/
void timeToLive(TimePoint const& ttl);
/// @brief accessor to Node object
/// @return second is true if url exists, first populated if second true
std::pair<Node const&, bool> hasAsNode(std::string const&) const;
@ -316,6 +328,11 @@ class Node {
/// @brief Clear key value store
void clear();
// @brief Helper function to return static instance of dummy node below
static Node const& dummyNode() {
return _dummyNode;
}
protected:
/// @brief Add time to live entry
virtual bool addTimeToLive(long millis);
@ -335,6 +352,7 @@ class Node {
mutable bool _vecBufDirty;
bool _isArray;
static Children const dummyChildren;
static Node const _dummyNode;
};

View File

@ -183,8 +183,7 @@ bool RemoveFollower::start(bool&) {
// Now find some new servers to remove:
std::unordered_map<std::string, int> overview; // get an overview over the servers
// -1 : not "GOOD", can be in sync, or leader, or not
// >=0: number of servers for which it is in sync or confirmed
// leader
// >=0: number of servers for which it is in sync or confirmed leader
bool leaderBad = false;
for (auto const& srv : VPackArrayIterator(planned)) {
std::string serverName = srv.copyString();

View File

@ -562,18 +562,10 @@ RestStatus RestAgencyHandler::handleConfig() {
RestStatus RestAgencyHandler::handleState() {
VPackBuilder body;
body.add(VPackValue(VPackValueType::Array));
for (auto const& i : _agent->state().get()) {
body.add(VPackValue(VPackValueType::Object));
body.add("index", VPackValue(i.index));
body.add("term", VPackValue(i.term));
if (i.entry != nullptr) {
body.add("query", VPackSlice(i.entry->data()));
{
VPackObjectBuilder o(&body);
_agent->readDB(body);
}
body.add("clientId", VPackValue(i.clientId));
body.close();
}
body.close();
generateResult(rest::ResponseCode::OK, body.slice());
return RestStatus::DONE;
}

View File

@ -245,7 +245,7 @@ std::vector<index_t> State::logLeaderMulti(query_t const& transactions,
if (!i.isArray()) {
THROW_ARANGO_EXCEPTION_MESSAGE(30000,
"Transaction syntax is [{<operations>}, "
"<preconditions>}, \"clientId\"]");
"{<preconditions>}, \"clientId\"]");
}
if (applicable[j] == APPLIED) {
@ -380,7 +380,7 @@ index_t State::logFollower(query_t const& transactions) {
// Now we must completely erase our log and compaction snapshots and
// start from the snapshot
Store snapshot(_agent, "snapshot");
snapshot = slices[0].get("readDB");
snapshot = slices[0];
if (!storeLogFromSnapshot(snapshot, snapshotIndex, snapshotTerm)) {
LOG_TOPIC("f7250", FATAL, Logger::AGENCY)
<< "Could not restore received log snapshot.";
@ -809,7 +809,7 @@ bool State::loadLastCompactedSnapshot(Store& store, index_t& index, term_t& term
VPackSlice i = result[0];
VPackSlice ii = i.resolveExternals();
try {
store = ii.get("readDB");
store = ii;
index = basics::StringUtils::uint64(ii.get("_key").copyString());
term = ii.get("term").getNumber<uint64_t>();
return true;
@ -1246,6 +1246,7 @@ bool State::persistCompactionSnapshot(index_t cind, arangodb::consensus::term_t
}
store.add("term", VPackValue(static_cast<double>(term)));
store.add("_key", VPackValue(i_str.str()));
store.add("version", VPackValue(2));
}
TRI_ASSERT(_vocbase != nullptr);
@ -1493,7 +1494,7 @@ std::shared_ptr<VPackBuilder> State::latestAgencyState(TRI_vocbase_t& vocbase,
// Result can only have length 0 or 1.
VPackSlice ii = result[0].resolveExternals();
buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
store = ii.get("readDB");
store = ii;
index = arangodb::basics::StringUtils::uint64(ii.get("_key").copyString());
term = ii.get("term").getNumber<uint64_t>();
LOG_TOPIC("d838b", INFO, Logger::AGENCY)
@ -1598,11 +1599,12 @@ uint64_t State::toVelocyPack(index_t lastIndex, VPackBuilder& builder) const {
}
if (n > 0) {
std::string const compstr
= "FOR c in compact FILTER c._key >= '" + firstIndex +
"' SORT c._key LIMIT 1 RETURN c";
arangodb::aql::Query compQuery(false, *_vocbase, aql::QueryString(compstr),
std::string const compQueryStr =
std::string("FOR c in compact FILTER c._key >= '") + firstIndex
+ std::string("' SORT c._key LIMIT 1 RETURN c");
arangodb::aql::Query compQuery(false, *_vocbase, aql::QueryString(compQueryStr),
bindVars, nullptr, arangodb::aql::PART_MAIN);
aql::QueryResult compQueryResult = compQuery.executeSync(_queryRegistry);

View File

@ -122,6 +122,7 @@ Store::Store(Agent* agent, std::string const& name)
Store& Store::operator=(Store const& rhs) {
if (&rhs != this) {
MUTEX_LOCKER(otherLock, rhs._storeLock);
MUTEX_LOCKER(lock, _storeLock);
_agent = rhs._agent;
_timeTable = rhs._timeTable;
_observerTable = rhs._observerTable;
@ -135,6 +136,7 @@ Store& Store::operator=(Store const& rhs) {
Store& Store::operator=(Store&& rhs) {
if (&rhs != this) {
MUTEX_LOCKER(otherLock, rhs._storeLock);
MUTEX_LOCKER(lock, _storeLock);
_agent = std::move(rhs._agent);
_timeTable = std::move(rhs._timeTable);
_observerTable = std::move(rhs._observerTable);
@ -336,23 +338,23 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu
body.add("term", VPackValue(term));
body.add("index", VPackValue(index));
auto ret = in.equal_range(url);
std::string currentKey;
std::map<std::string,std::map<std::string, std::string>> result;
// key -> (modified -> op)
for (auto it = ret.first; it != ret.second; ++it) {
if (currentKey != it->second->key) {
if (!currentKey.empty()) {
body.close();
result[it->second->key][it->second->modified] = it->second->oper;
}
body.add(it->second->key, VPackValue(VPackValueType::Object));
currentKey = it->second->key;
}
body.add(VPackValue(it->second->modified));
for (auto const& m : result) {
body.add(VPackValue(m.first));
{
VPackObjectBuilder b(&body);
body.add("op", VPackValue(it->second->oper));
VPackObjectBuilder guard(&body);
for (auto const& m2 : m.second) {
body.add(VPackValue(m2.first));
{
VPackObjectBuilder guard2(&body);
body.add("op", VPackValue(m2.second));
}
}
}
if (!currentKey.empty()) {
body.close();
}
}
@ -388,26 +390,26 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const {
std::string key = precond.key.copyString();
std::vector<std::string> pv = split(key, '/');
Node node("precond");
Node const* node = &Node::dummyNode();
// Check is guarded in ::apply
bool found = _node.has(pv);
if (found) {
node = _node(pv);
node = &_node(pv);
}
if (precond.value.isObject()) {
for (auto const& op : VPackObjectIterator(precond.value)) {
std::string const& oper = op.key.copyString();
if (oper == "old") { // old
if (node != op.value) {
if (*node != op.value) {
ret.push_back(precond.key);
if (mode == FIRST_FAIL) {
break;
}
}
} else if (oper == "oldNot") { // oldNot
if (node == op.value) {
if (*node == op.value) {
ret.push_back(precond.key);
if (mode == FIRST_FAIL) {
break;
@ -422,7 +424,7 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const {
break;
}
}
bool isArray = (node.type() == LEAF && node.slice().isArray());
bool isArray = (node->type() == LEAF && node->slice().isArray());
if (op.value.getBool() ? !isArray : isArray) {
ret.push_back(precond.key);
if (mode == FIRST_FAIL) {
@ -446,9 +448,9 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const {
}
} else if (oper == "in") { // in
if (found) {
if (node.slice().isArray()) {
if (node->slice().isArray()) {
bool _found = false;
for (auto const& i : VPackArrayIterator(node.slice())) {
for (auto const& i : VPackArrayIterator(node->slice())) {
if (i == op.value) {
_found = true;
continue;
@ -469,9 +471,9 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const {
if (!found) {
continue;
} else {
if (node.slice().isArray()) {
if (node->slice().isArray()) {
bool _found = false;
for (auto const& i : VPackArrayIterator(node.slice())) {
for (auto const& i : VPackArrayIterator(node->slice())) {
if (i == op.value) {
_found = true;
continue;
@ -498,7 +500,7 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const {
}
}
} else {
if (node != precond.value) {
if (*node != precond.value) {
ret.push_back(precond.key);
if (mode == FIRST_FAIL) {
break;
@ -609,14 +611,25 @@ query_t Store::clearExpired() const {
void Store::dumpToBuilder(Builder& builder) const {
MUTEX_LOCKER(storeLocker, _storeLock);
toBuilder(builder, true);
std::map<std::string, int64_t> clean {};
for (auto const& i : _timeTable) {
auto ts = std::chrono::duration_cast<std::chrono::seconds>(
i.first.time_since_epoch()).count();
auto it = clean.find(i.second);
if (it == clean.end()) {
clean[i.second] = ts;
} else if (ts < it->second) {
it->second = ts;
}
}
{
VPackObjectBuilder guard(&builder);
for (auto const& i : _timeTable) {
auto ts = std::chrono::duration_cast<std::chrono::seconds>(i.first.time_since_epoch())
.count();
builder.add(i.second, VPackValue(ts));
for (auto const& c : clean) {
builder.add(c.first, VPackValue(c.second));
}
}
{
VPackArrayBuilder garray(&builder);
for (auto const& i : _observerTable) {
@ -678,19 +691,27 @@ void Store::clear() {
}
/// Apply a request to my key value store
Store& Store::operator=(VPackSlice const& slice) {
TRI_ASSERT(slice.isArray());
Store& Store::operator=(VPackSlice const& s) {
TRI_ASSERT(s.isObject());
TRI_ASSERT(s.hasKey("readDB"));
auto const& slice = s.get("readDB");
TRI_ASSERT(slice.length() == 4);
MUTEX_LOCKER(storeLocker, _storeLock);
_node.applies(slice[0]);
if (s.hasKey("version")) {
TRI_ASSERT(slice[1].isObject());
for (auto const& entry : VPackObjectIterator(slice[1])) {
long long tse = entry.value.getInt();
_timeTable.emplace(
std::pair<TimePoint, std::string>(TimePoint(std::chrono::duration<int>(tse)),
entry.key.copyString()));
if (entry.value.isNumber()) {
auto const& key = entry.key.copyString();
if (_node.has(key)) {
auto tp = TimePoint(std::chrono::seconds(entry.value.getNumber<int>()));
_node(key).timeToLive(tp);
_timeTable.emplace(std::pair<TimePoint, std::string>(tp, key));
}
}
}
}
TRI_ASSERT(slice[2].isArray());
@ -769,11 +790,10 @@ bool Store::has(std::string const& path) const {
/// Remove ttl entry for path, guarded by caller
void Store::removeTTL(std::string const& uri) {
_storeLock.assertLockedByCurrentThread();
if (!_timeTable.empty()) {
for (auto it = _timeTable.cbegin(); it != _timeTable.cend();) {
if (it->second == uri) {
_timeTable.erase(it++);
it = _timeTable.erase(it);
} else {
++it;
}

View File

@ -133,8 +133,7 @@ class Supervision : public arangodb::CriticalThread {
/// @brief Upgrade agency to supervision overhaul jobs
void upgradeHealthRecords(VPackBuilder&);
/// @brief Check for orphaned index creations, which have been successfully
/// built
/// @brief Check for orphaned index creations, which have been successfully built
void readyOrphanedIndexCreations();
/// @brief Check for inconsistencies in replication factor vs dbs entries

View File

@ -676,7 +676,7 @@ function agencyTestSuite () {
writeAndCheck([[{"/a/y":{"op":"set","new":12}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
wait(1.1);
assertEqual(readAndCheck([["/a/y"]]), [{"a":{"y":12}}]);
assertEqual(readAndCheck([["/a/y"]]), [{a:{}}]);
writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12}}}]]);
assertEqual(readAndCheck([["/foo/bar/baz"]]),
[{"foo":{"bar":{"baz":12}}}]);