1
0
Fork 0

merge from 3.1

This commit is contained in:
Kaveh Vahedipour 2017-02-20 20:05:52 +01:00
parent 98c8038cdd
commit 4cc830b0df
33 changed files with 336 additions and 223 deletions

View File

@ -25,13 +25,7 @@
#include "Agency/Agent.h"
#include "Agency/Job.h"
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
AddFollower::AddFollower(Node const& snapshot, Agent* agent,
std::string const& jobId, std::string const& creator,

View File

@ -1,4 +1,4 @@
////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
@ -811,7 +811,7 @@ bool AgencyComm::exists(std::string const& key) {
return false;
}
auto parts = arangodb::basics::StringUtils::split(key, "/");
auto parts = basics::StringUtils::split(key, "/");
std::vector<std::string> allParts;
allParts.reserve(parts.size() + 1);
allParts.push_back(AgencyCommManager::path());
@ -1130,7 +1130,7 @@ bool AgencyComm::ensureStructureInitialized() {
std::vector<std::string>({AgencyCommManager::path(), "Secret"}));
if (!secretValue.isString()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Couldn't find secret in agency!";
LOG_TOPIC(ERR, Logger::CLUSTER) << "Couldn't find secret in agency!";
return false;
}
std::string const secret = secretValue.copyString();
@ -1489,16 +1489,7 @@ AgencyCommResult AgencyComm::send(
<< "': " << body;
arangodb::httpclient::SimpleHttpClient client(connection, timeout, false);
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr only happens during controlled shutdown
result._message = "could not send request to agency because of shutdown";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "could not send request to agency because of shutdown";
return result;
}
client.setJwt(cc->jwt());
client.setJwt(ClusterComm::instance()->jwt());
client.keepConnectionOnDestruction(true);
// set up headers
@ -1699,10 +1690,10 @@ bool AgencyComm::tryInitializeStructure(std::string const& jwtSecret) {
return result.successful();
} catch (std::exception const& e) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Fatal error initializing agency " << e.what();
LOG_TOPIC(FATAL, Logger::CLUSTER) << "Fatal error initializing agency " << e.what();
FATAL_ERROR_EXIT();
} catch (...) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Fatal error initializing agency";
LOG_TOPIC(FATAL, Logger::CLUSTER) << "Fatal error initializing agency";
FATAL_ERROR_EXIT();
}
}

View File

@ -629,6 +629,14 @@ class AgencyComm {
void updateEndpoints(arangodb::velocypack::Slice const&);
bool lockRead(std::string const&, double, double);
bool lockWrite(std::string const&, double, double);
bool unlockRead(std::string const&, double);
bool unlockWrite(std::string const&, double);
AgencyCommResult sendTransactionWithFailover(AgencyTransaction const&,
double timeout = 0.0);

View File

@ -1,3 +1,4 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
@ -47,10 +48,12 @@ Agent::Agent(config_t const& config)
_config(config),
_lastCommitIndex(0),
_lastAppliedIndex(0),
_lastCompactionIndex(0),
_leaderCommitIndex(0),
_spearhead(this),
_readDB(this),
_transient(this),
_compacted(this),
_nextCompationAfter(_config.compactionStepSize()),
_inception(std::make_unique<Inception>(this)),
_activator(nullptr),
@ -272,14 +275,14 @@ bool Agent::recvAppendEntriesRPC(
// Update commit index
if (queries->slice().type() != VPackValueType::Array) {
LOG_TOPIC(WARN, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Received malformed entries for appending. Discarding!";
return false;
}
if (!_constituent.checkLeader(term, leaderId, prevIndex, prevTerm)) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Not accepting appendEntries from "
<< leaderId;
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Not accepting appendEntries from " << leaderId;
return false;
}
@ -324,8 +327,8 @@ bool Agent::recvAppendEntriesRPC(
/// Leader's append entries
void Agent::sendAppendEntriesRPC() {
std::chrono::duration<int, std::ratio<1, 1000000>> const dt (
(_config.waitForSync() ? 40000 : 2000));
std::chrono::duration<int, std::ratio<1, 1000>> const dt (
(_config.waitForSync() ? 40 : 2));
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr only happens during controlled shutdown
@ -413,8 +416,9 @@ void Agent::sendAppendEntriesRPC() {
"1", 1, _config.poolAt(followerId),
arangodb::rest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, followerId, highest, toLog),
5.0 * _config.maxPing(), true);
std::make_shared<AgentCallback>(
this, followerId, (toLog) ? highest : 0, toLog),
std::max(1.0e-3 * toLog * dt.count(), 0.25 * _config.minPing()), true);
// _lastSent, _lastHighest: local and single threaded access
_lastSent[followerId] = system_clock::now();
@ -422,7 +426,7 @@ void Agent::sendAppendEntriesRPC() {
if (toLog > 0) {
_earliestPackage[followerId] = system_clock::now() + toLog * dt;
LOG_TOPIC(TRACE, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Appending " << unconfirmed.size() - 1 << " entries up to index "
<< highest << " to follower " << followerId << ". Message: "
<< builder.toJson()
@ -430,7 +434,7 @@ void Agent::sendAppendEntriesRPC() {
<< std::chrono::duration<double, std::milli>(
_earliestPackage[followerId]-system_clock::now()).count() << "ms";
} else {
LOG_TOPIC(TRACE, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Just keeping follower " << followerId
<< " devout with " << builder.toJson();
}
@ -837,7 +841,7 @@ void Agent::run() {
sendAppendEntriesRPC();
// Don't panic
_appendCV.wait(1000);
_appendCV.wait(100);
// Detect faulty agent and replace
// if possible and only if not already activating
@ -1173,13 +1177,19 @@ arangodb::consensus::index_t Agent::rebuildDBs() {
<< _lastAppliedIndex << " to " << _leaderCommitIndex;
_spearhead.apply(
_state.slices(_lastAppliedIndex+1, _leaderCommitIndex),
_state.slices(_lastAppliedIndex+1, _leaderCommitIndex+1),
_leaderCommitIndex, _constituent.term());
_readDB.apply(
_state.slices(_lastAppliedIndex+1, _leaderCommitIndex),
_state.slices(_lastAppliedIndex+1, _leaderCommitIndex+1),
_leaderCommitIndex, _constituent.term());
_compacted.apply(
_state.slices(_lastAppliedIndex+1, _leaderCommitIndex+1),
_leaderCommitIndex, _constituent.term());
_lastAppliedIndex = _leaderCommitIndex;
_lastCompactionIndex = _leaderCommitIndex;
return _lastAppliedIndex;
@ -1195,9 +1205,11 @@ void Agent::compact() {
/// Last commit index
arangodb::consensus::index_t Agent::lastCommitted() const {
std::pair<arangodb::consensus::index_t, arangodb::consensus::index_t>
Agent::lastCommitted() const {
MUTEX_LOCKER(ioLocker, _ioLock);
return _lastCommitIndex;
return std::pair<arangodb::consensus::index_t, arangodb::consensus::index_t>(
_lastCommitIndex,_leaderCommitIndex);
}
/// Last commit index
@ -1382,8 +1394,42 @@ bool Agent::ready() const {
return true;
}
return _ready.load();
return _ready;
}
query_t Agent::buildDB(arangodb::consensus::index_t index) {
auto builder = std::make_shared<VPackBuilder>();
arangodb::consensus::index_t start = 0, end = 0;
Store store(this);
{
MUTEX_LOCKER(ioLocker, _ioLock);
store = _compacted;
MUTEX_LOCKER(liLocker, _liLock);
end = _leaderCommitIndex;
start = _lastCompactionIndex+1;
}
if (index > end) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Cannot snapshot beyond leaderCommitIndex: " << end;
index = end;
} else if (index < start) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Cannot snapshot before last compaction index: " << start;
index = start+1;
}
store.apply(_state.slices(start+1, index), index, _constituent.term());
store.toBuilder(*builder);
return builder;
}
}} // namespace

View File

@ -77,7 +77,7 @@ class Agent : public arangodb::Thread {
bool fitness() const;
/// @brief Leader ID
index_t lastCommitted() const;
std::pair<index_t, index_t> lastCommitted() const;
/// @brief Leader ID
std::string leaderID() const;
@ -222,6 +222,9 @@ class Agent : public arangodb::Thread {
/// @brief Update a peers endpoint in my configuration
void updatePeerEndpoint(std::string const& id, std::string const& ep);
/// @brief Assemble an agency to commitId
query_t buildDB(index_t);
/// @brief State reads persisted state and prepares the agent
friend class State;
friend class Compactor;
@ -270,6 +273,9 @@ class Agent : public arangodb::Thread {
/// @brief Last compaction index
index_t _lastAppliedIndex;
/// @brief Last compaction index
index_t _lastCompactionIndex;
/// @brief Last compaction index
index_t _leaderCommitIndex;
@ -282,6 +288,9 @@ class Agent : public arangodb::Thread {
/// @brief Committed (read) kv-store
Store _transient;
/// @brief Last compacted store
Store _compacted;
/// @brief Condition variable for appendEntries
arangodb::basics::ConditionVariable _appendCV;

View File

@ -39,23 +39,43 @@ AgentCallback::AgentCallback(Agent* agent, std::string const& slaveID,
void AgentCallback::shutdown() { _agent = 0; }
bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_SENT) {
if (_agent) {
_agent->reportIn(_slaveID, _last, _toLog);
try { // Check success
if (res->result->getBodyVelocyPack()->slice().get("success").getBool()) {
_agent->reportIn(_slaveID, _last, _toLog);
}
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "success: true " << res->result->getBodyVelocyPack()->toJson();
} catch (...) {
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "success: false" << res->result->getBodyVelocyPack()->toJson();
}
}
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Got good callback from AppendEntriesRPC: "
<< "comm_status(" << res->status
<< "), last(" << _last << "), follower("
<< _slaveID << "), time("
<< TRI_microtime() - _startTime << ")";
} else {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Got bad callback from AppendEntriesRPC: "
<< "comm_status(" << res->status
<< "), last(" << _last << "), follower("
<< _slaveID << "), time("
<< TRI_microtime() - _startTime << ")";
}
return true;
}

View File

@ -28,7 +28,6 @@
#include "Agency/MoveShard.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
CleanOutServer::CleanOutServer(Node const& snapshot, Agent* agent,
std::string const& jobId,

View File

@ -146,6 +146,21 @@ void Constituent::termNoLock(term_t t) {
}
}
bool Constituent::logUpToDate(
arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) const {
log_t myLastLogEntry = _agent->state().lastLog();
return (prevLogTerm > myLastLogEntry.term ||
(prevLogTerm == myLastLogEntry.term &&
prevLogIndex >= myLastLogEntry.index));
}
bool Constituent::logMatches(
arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) const {
return _agent->state().has(prevLogIndex, prevLogTerm);
}
/// My role
role_t Constituent::role() const {
MUTEX_LOCKER(guard, _castLock);
@ -257,8 +272,8 @@ std::string Constituent::endpoint(std::string id) const {
}
/// @brief Check leader
bool Constituent::checkLeader(term_t term, std::string id, index_t prevLogIndex,
term_t prevLogTerm) {
bool Constituent::checkLeader(
term_t term, std::string id, index_t prevLogIndex, term_t prevLogTerm) {
TRI_ASSERT(_vocbase != nullptr);
@ -277,6 +292,11 @@ bool Constituent::checkLeader(term_t term, std::string id, index_t prevLogIndex,
if (term > _term) {
termNoLock(term);
}
if (!logMatches(prevLogIndex,prevLogTerm)) {
return false;
}
if (_leaderID != id) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Set _leaderID to " << id << " in term " << _term;
@ -421,7 +441,7 @@ void Constituent::callElection() {
auto res = ClusterComm::instance()->wait(
"", coordinatorTransactionID, 0, "",
duration<double>(steady_clock::now()-timeout).count());
duration<double>(timeout - steady_clock::now()).count());
if (res.status == CL_COMM_SENT) {
auto body = res.result->getBodyVelocyPack();
@ -571,6 +591,11 @@ void Constituent::run() {
if (_lastHeartbeatSeen > 0.0) {
double now = TRI_microtime();
randWait -= static_cast<int64_t>(M * (now-_lastHeartbeatSeen));
if (randWait < a) {
randWait = a;
} else if (randWait > b) {
randWait = b;
}
}
}

View File

@ -126,6 +126,12 @@ class Constituent : public Thread {
// Wait for sync
bool waitForSync() const;
// Check if log up to date with ours
bool logUpToDate(index_t, term_t) const;
// Check if log start matches entry in my log
bool logMatches(index_t, term_t) const;
// Sleep for how long
duration_t sleepFor(double, double);

View File

@ -27,7 +27,6 @@
#include "Agency/Job.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
FailedFollower::FailedFollower(Node const& snapshot, Agent* agent,
std::string const& jobId,
@ -122,9 +121,12 @@ bool FailedFollower::start() {
// DBservers
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
std::string curPath =
curColPrefix + _database + "/" + _collection + "/" + _shard + "/servers";
Node const& planned = _snapshot(planPath);
// Copy todo to pending
Builder todo, pending;

View File

@ -30,7 +30,6 @@
#include <vector>
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
FailedLeader::FailedLeader(Node const& snapshot, Agent* agent,
std::string const& jobId, std::string const& creator,
@ -173,17 +172,23 @@ bool FailedLeader::start() {
// Distribute shards like to come!
std::vector<std::string> planv;
for (auto const& i : VPackArrayIterator(planned)) {
planv.push_back(i.copyString());
auto s = i.copyString();
if (s != _from && s != _to) {
planv.push_back(i.copyString());
}
}
pending.add(_agencyPrefix + planPath, VPackValue(VPackValueType::Array));
pending.add(VPackValue(_to));
for (auto const& i : VPackArrayIterator(current)) {
std::string s = i.copyString();
if (s != _from) {
if (s != _from && s != _to) {
pending.add(i);
planv.erase(std::remove(planv.begin(), planv.end(), s), planv.end());
}
}
pending.add(VPackValue(_from));
for (auto const& i : planv) {
pending.add(VPackValue(i));

View File

@ -30,7 +30,6 @@
#include "Agency/UnassumedLeadership.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
FailedServer::FailedServer(Node const& snapshot, Agent* agent,
std::string const& jobId, std::string const& creator,
@ -286,7 +285,9 @@ JOB_STATUS FailedServer::status() {
deleteTodos->openArray();
deleteTodos->openObject();
}
deleteTodos->add(_agencyPrefix + toDoPrefix + subJob.first, VPackValue(VPackValueType::Object));
deleteTodos->add(
_agencyPrefix + toDoPrefix + subJob.first,
VPackValue(VPackValueType::Object));
deleteTodos->add("op", VPackValue("delete"));
deleteTodos->close();
} else {
@ -302,7 +303,9 @@ JOB_STATUS FailedServer::status() {
}
if (deleteTodos) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Server " << _server << " is healthy again. Will try to delete any jobs which have not yet started!";
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Server " << _server << " is healthy again. Will try to delete"
"any jobs which have not yet started!";
deleteTodos->close();
deleteTodos->close();
// Transact to agency

View File

@ -36,7 +36,6 @@
#include <thread>
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
Inception::Inception() : Thread("Inception"), _agent(nullptr) {}

View File

@ -24,7 +24,6 @@
#include "Job.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
bool arangodb::consensus::compareServerLists(Slice plan, Slice current) {
if (!plan.isArray() || !current.isArray()) {

View File

@ -28,7 +28,6 @@
#include "Node.h"
#include "Supervision.h"
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
@ -42,7 +41,7 @@ namespace consensus {
// and all others followers. Both arguments must be arrays. Returns true,
// if the first items in both slice are equal and if both arrays contain
// the same set of strings.
bool compareServerLists(arangodb::velocypack::Slice plan, arangodb::velocypack::Slice current);
bool compareServerLists(Slice plan, Slice current);
enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND };
const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/",
@ -64,9 +63,9 @@ static std::string const plannedServers = "/Plan/DBServers";
static std::string const healthPrefix = "/Supervision/Health/";
inline arangodb::consensus::write_ret_t transact(Agent* _agent,
arangodb::velocypack::Builder const& transaction,
Builder const& transaction,
bool waitForCommit = true) {
query_t envelope = std::make_shared<arangodb::velocypack::Builder>();
query_t envelope = std::make_shared<Builder>();
try {
envelope->openArray();
@ -138,7 +137,7 @@ struct Job {
std::string _creator;
std::string _agencyPrefix;
std::shared_ptr<arangodb::velocypack::Builder> _jb;
std::shared_ptr<Builder> _jb;
};

View File

@ -29,7 +29,6 @@
static std::string const DBServer = "DBServer";
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
MoveShard::MoveShard(Node const& snapshot, Agent* agent,
std::string const& jobId, std::string const& creator,

View File

@ -33,9 +33,8 @@
#include <deque>
#include <regex>
using namespace arangodb::basics;
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
using namespace arangodb::basics;
struct NotEmpty {
bool operator()(const std::string& s) { return !s.empty(); }
@ -700,28 +699,6 @@ void Node::toBuilder(Builder& builder, bool showHidden) const {
}
}
void Node::toObject(Builder& builder, bool showHidden) const {
try {
if (type() == NODE) {
VPackObjectBuilder guard(&builder);
for (auto const& child : _children) {
if (child.first[0] == '.' && !showHidden) {
continue;
}
builder.add(VPackValue(child.first));
child.second->toBuilder(builder);
}
} else {
if (!slice().isNone()) {
builder.add(slice());
}
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
}
// Print internals to ostream
std::ostream& Node::print(std::ostream& o) const {
Node const* par = _parent;

View File

@ -27,9 +27,6 @@
#include "AgencyCommon.h"
#include <velocypack/Buffer.h>
#include <velocypack/Builder.h>
#include <velocypack/Slice.h>
#include <velocypack/ValueType.h>
#include <velocypack/velocypack-aliases.h>
#include <type_traits>
@ -53,6 +50,8 @@ enum Operation {
REPLACE
};
using namespace arangodb::velocypack;
class StoreException : public std::exception {
public:
explicit StoreException(std::string const& message) : _message(message) {}
@ -162,10 +161,7 @@ class Node {
bool handle(arangodb::velocypack::Slice const&);
/// @brief Create Builder representing this store
void toBuilder(arangodb::velocypack::Builder&, bool showHidden = false) const;
/// @brief Create Builder representing this store
void toObject(arangodb::velocypack::Builder&, bool showHidden = false) const;
void toBuilder(Builder&, bool showHidden = false) const;
/// @brief Access children
Children& children();
@ -174,10 +170,10 @@ class Node {
Children const& children() const;
/// @brief Create slice from value
arangodb::velocypack::Slice slice() const;
Slice slice() const;
/// @brief Get value type
arangodb::velocypack::ValueType valueType() const;
ValueType valueType() const;
/// @brief Add observer for this node
bool addObserver(std::string const&);
@ -222,7 +218,7 @@ class Node {
std::string getString() const;
/// @brief Get array value
arangodb::velocypack::Slice getArray() const;
Slice getArray() const;
protected:
/// @brief Add time to live entry
@ -238,8 +234,8 @@ class Node {
Store* _store; ///< @brief Store
Children _children; ///< @brief child nodes
TimePoint _ttl; ///< @brief my expiry
std::vector<arangodb::velocypack::Buffer<uint8_t>> _value; ///< @brief my value
mutable arangodb::velocypack::Buffer<uint8_t> _vecBuf;
std::vector<Buffer<uint8_t>> _value; ///< @brief my value
mutable Buffer<uint8_t> _vecBuf;
mutable bool _vecBufDirty;
bool _isArray;
};

View File

@ -27,7 +27,6 @@
#include "Agency/Job.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
RemoveServer::RemoveServer(Node const& snapshot, Agent* agent,
std::string const& jobId, std::string const& creator,

View File

@ -32,12 +32,13 @@
#include "Basics/StaticStrings.h"
#include "Logger/Logger.h"
#include "Rest/HttpRequest.h"
#include "Rest/Version.h"
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::consensus;
using namespace arangodb::rest;
using namespace arangodb::velocypack;
using namespace arangodb::consensus;
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoDB server
@ -218,6 +219,31 @@ RestStatus RestAgencyHandler::handleStores() {
return RestStatus::DONE;
}
RestStatus RestAgencyHandler::handleStore() {
if (_request->requestType() == rest::RequestType::POST) {
arangodb::velocypack::Options options;
auto query = _request->toVelocyPackBuilderPtr(&options);
arangodb::consensus::index_t index = 0;
try {
index = query->slice().getUInt();
} catch (...) {
index = _agent->lastCommitted().second;
}
query_t builder = _agent->buildDB(index);
generateResult(rest::ResponseCode::OK, builder->slice());
} else {
generateError(rest::ResponseCode::BAD, 400);
}
return RestStatus::DONE;
}
RestStatus RestAgencyHandler::handleWrite() {
if (_request->requestType() != rest::RequestType::POST) {
@ -624,12 +650,14 @@ RestStatus RestAgencyHandler::handleConfig() {
}
// Respond with configuration
auto last = _agent->lastCommitted();
Builder body;
{
VPackObjectBuilder b(&body);
body.add("term", Value(_agent->term()));
body.add("leaderId", Value(_agent->leaderID()));
body.add("lastCommitted", Value(_agent->lastCommitted()));
body.add("lastCommitted", Value(last.first));
body.add("leaderCommitted", Value(last.second));
body.add("lastAcked", _agent->lastAckedAgo()->slice());
body.add("configuration", _agent->config().toBuilder()->slice());
}
@ -691,6 +719,8 @@ RestStatus RestAgencyHandler::execute() {
return handleState();
} else if (suffixes[0] == "stores") {
return handleStores();
} else if (suffixes[0] == "store") {
return handleStore();
} else {
return reportUnknownMethod();
}

View File

@ -47,6 +47,7 @@ class RestAgencyHandler : public RestBaseHandler {
RestStatus reportTooManySuffices();
RestStatus reportUnknownMethod();
RestStatus handleStores();
RestStatus handleStore();
RestStatus handleRead();
RestStatus handleWrite();
RestStatus handleTransact();

View File

@ -32,6 +32,7 @@
#include "Logger/Logger.h"
#include "Rest/HttpRequest.h"
#include "Rest/Version.h"
using namespace arangodb;

View File

@ -330,6 +330,51 @@ std::vector<log_t> State::get(arangodb::consensus::index_t start,
return entries;
}
/// Get log entries from indices "start" to "end"
/// Throws std::out_of_range exception
log_t State::at(arangodb::consensus::index_t index) const {
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
if (_cur > index) {
std::string excMessage =
std::string(
"Access before the start of the log deque: (first, requested): (") +
std::to_string(_cur) + ", " + std::to_string(index);
LOG_TOPIC(DEBUG, Logger::AGENCY) << excMessage;
throw std::out_of_range(excMessage);
}
auto pos = index - _cur;
if (pos > _log.size()) {
std::string excMessage =
std::string(
"Access beyond the end of the log deque: (last, requested): (") +
std::to_string(_cur+_log.size()) + ", " + std::to_string(index);
LOG_TOPIC(DEBUG, Logger::AGENCY) << excMessage;
throw std::out_of_range(excMessage);
}
return _log[pos];
}
/// Have log with specified index and term
bool State::has(arangodb::consensus::index_t index, term_t term) const {
MUTEX_LOCKER(mutexLocker, _logLock); // Cannot be read lock (Compaction)
try {
return _log.at(index-_cur).term == term;
} catch (...) {}
return false;
}
/// Get vector of past transaction from 'start' to 'end'
std::vector<VPackSlice> State::slices(arangodb::consensus::index_t start,
arangodb::consensus::index_t end) const {
@ -906,3 +951,8 @@ std::vector<std::vector<log_t>> State::inquire(query_t const& query) const {
}
// Index of last log entry
arangodb::consensus::index_t State::lastIndex() const {
MUTEX_LOCKER(mutexLocker, _logLock);
return (!_log.empty()) ? _log.back().index : 0;
}

View File

@ -66,21 +66,27 @@ class State {
std::vector<bool> const& indices, term_t term);
/// @brief Single log entry (leader)
arangodb::consensus::index_t log(
velocypack::Slice const& slice, term_t term,
std::string const& clientId = std::string());
index_t log(velocypack::Slice const& slice, term_t term,
std::string const& clientId = std::string());
/// @brief Log entries (followers)
arangodb::consensus::index_t log(query_t const& queries, size_t ndups = 0);
/// @brief Find entry at index with term
bool find(index_t index, term_t term);
/// @brief Get complete log entries bound by lower and upper bounds.
/// Default: [first, last]
std::vector<log_t> get(
index_t = 0, index_t = (std::numeric_limits<uint64_t>::max)()) const;
index_t = 0, index_t = (std::numeric_limits<uint64_t>::max)()) const;
/// @brief Get complete log entries bound by lower and upper bounds.
/// Default: [first, last]
log_t at(index_t) const;
/// @brief Has entry with index und term
bool has(index_t, term_t) const;
/// @brief Get log entries by client Id
std::vector<std::vector<log_t>> inquire(query_t const&) const;
@ -96,6 +102,10 @@ class State {
/// after the return
log_t lastLog() const;
/// @brief last log entry, copy entry because we do no longer have the lock
/// after the return
index_t lastIndex() const;
/// @brief Set endpoint
bool configure(Agent* agent);

View File

@ -40,9 +40,8 @@
#include <iomanip>
#include <regex>
using namespace arangodb::basics;
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
using namespace arangodb::basics;
/// Non-Emptyness of string
struct NotEmpty {
@ -353,15 +352,11 @@ std::vector<bool> Store::apply(
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
auto cc = ClusterComm::instance();
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
cc->asyncRequest(
"1", 1, endpoint, rest::RequestType::POST, path,
std::make_shared<std::string>(body.toString()), headerFields,
std::make_shared<StoreCallback>(path, body.toJson()), 1.0, true,
0.01);
}
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, endpoint, rest::RequestType::POST, path,
std::make_shared<std::string>(body.toString()), headerFields,
std::make_shared<StoreCallback>(path, body.toJson()), 1.0, true, 0.01);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) << "Malformed URL " << url;
}

View File

@ -60,10 +60,10 @@ class Store : public arangodb::Thread {
std::vector<bool> apply(query_t const& query, bool verbose = false);
/// @brief Apply single entry in query
bool apply(arangodb::velocypack::Slice const& query, bool verbose = false);
bool apply(Slice const& query, bool verbose = false);
/// @brief Apply entry in query
std::vector<bool> apply(std::vector<arangodb::velocypack::Slice> const& query,
std::vector<bool> apply(std::vector<Slice> const& query,
index_t lastCommitIndex, term_t term,
bool inform = true);
@ -81,7 +81,7 @@ class Store : public arangodb::Thread {
bool start();
/// @brief Dump everything to builder
void dumpToBuilder(arangodb::velocypack::Builder&) const;
void dumpToBuilder(Builder&) const;
/// @brief Notify observers
void notifyObservers() const;
@ -92,7 +92,7 @@ class Store : public arangodb::Thread {
Store& operator=(VPackSlice const& slice);
/// @brief Create Builder representing this store
void toBuilder(arangodb::velocypack::Builder&, bool showHidden = false) const;
void toBuilder(Builder&, bool showHidden = false) const;
/// @brief Copy out a node
Node get(std::string const& path = std::string("/")) const;

View File

@ -41,9 +41,9 @@
#include "Basics/MutexLocker.h"
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
using namespace arangodb::application_features;
std::string Supervision::_agencyPrefix = "/arango";
@ -552,11 +552,13 @@ void Supervision::handleShutdown() {
del->close();
auto result = _agent->write(del);
if (result.indices.size() != 1) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Invalid resultsize of " << result.indices.size()
<< " found during shutdown";
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Invalid resultsize of " << result.indices.size()
<< " found during shutdown";
} else {
if (_agent->waitFor(result.indices.at(0)) != Agent::raft_commit_t::OK) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Result was not written to followers during shutdown";
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Result was not written to followers during shutdown";
}
}
}

View File

@ -27,7 +27,6 @@
#include "Agency/Job.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
UnassumedLeadership::UnassumedLeadership(
Node const& snapshot, Agent* agent, std::string const& jobId,

View File

@ -39,7 +39,6 @@ using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::basics;
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
static void JS_EnabledAgent(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);

View File

@ -46,9 +46,7 @@ AgencyCallbackRegistry::AgencyCallbackRegistry(std::string const& callbackBasePa
AgencyCallbackRegistry::~AgencyCallbackRegistry() {
}
AgencyCommResult AgencyCallbackRegistry::registerCallback(
std::shared_ptr<AgencyCallback> cb) {
bool AgencyCallbackRegistry::registerCallback(std::shared_ptr<AgencyCallback> cb) {
uint32_t rand;
{
WRITE_LOCKER(locker, _lock);
@ -60,28 +58,23 @@ AgencyCommResult AgencyCallbackRegistry::registerCallback(
}
}
AgencyCommResult result;
bool ok = false;
try {
result = _agency.registerCallback(cb->key, getEndpointUrl(rand));
if (!result.successful()) {
LOG_TOPIC(ERR, arangodb::Logger::AGENCY)
<< "Registering callback failed with " << result.errorCode() << ": "
<< result.errorMessage();
ok = _agency.registerCallback(cb->key, getEndpointUrl(rand)).successful();
if (!ok) {
LOG_TOPIC(ERR, Logger::CLUSTER) << "Registering callback failed";
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "Couldn't register callback " << e.what();
LOG_TOPIC(ERR, Logger::CLUSTER) << "Couldn't register callback " << e.what();
} catch (...) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
LOG_TOPIC(ERR, Logger::CLUSTER)
<< "Couldn't register callback. Unknown exception";
}
if (!result.successful()) {
if (!ok) {
WRITE_LOCKER(locker, _lock);
_endpoints.erase(rand);
}
return result;
return ok;
}
std::shared_ptr<AgencyCallback> AgencyCallbackRegistry::getCallback(uint32_t id) {

View File

@ -44,7 +44,7 @@ public:
//////////////////////////////////////////////////////////////////////////////
/// @brief register a callback
//////////////////////////////////////////////////////////////////////////////
AgencyCommResult registerCallback(std::shared_ptr<AgencyCallback>);
bool registerCallback(std::shared_ptr<AgencyCallback>);
//////////////////////////////////////////////////////////////////////////////
/// @brief unregister a callback

View File

@ -511,12 +511,12 @@ void ClusterInfo::loadPlan() {
// This should not happen in healthy situations.
// If it happens in unhealthy situations the
// cluster should not fail.
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Failed to load information for collection '"
<< collectionId << "': " << ex.what()
<< ". invalid information in plan. The collection will "
"be ignored for now and the invalid information will "
"be repaired. VelocyPack: "
<< collectionSlice.toJson();
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to load information for collection '" << collectionId
<< "': " << ex.what() << ". invalid information in plan. The"
"collection will be ignored for now and the invalid information"
"will be repaired. VelocyPack: "
<< collectionSlice.toJson();
TRI_ASSERT(false);
continue;
@ -525,12 +525,12 @@ void ClusterInfo::loadPlan() {
// This should not happen in healthy situations.
// If it happens in unhealthy situations the
// cluster should not fail.
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Failed to load information for collection '"
<< collectionId
<< ". invalid information in plan. The collection will "
"be ignored for now and the invalid information will "
"be repaired. VelocyPack: "
<< collectionSlice.toJson();
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to load information for collection '" << collectionId
<< ". invalid information in plan. The collection will "
"be ignored for now and the invalid information will "
"be repaired. VelocyPack: "
<< collectionSlice.toJson();
TRI_ASSERT(false);
continue;
@ -885,14 +885,7 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
// AgencyCallback for this.
auto agencyCallback = std::make_shared<AgencyCallback>(
ac, "Current/Databases/" + name, dbServerChanged, true, false);
auto regres = _agencyCallbackRegistry->registerCallback(agencyCallback);
if (!regres.successful()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) <<
"Could not register call back with error: " << regres.errorCode() <<
" - " << regres.errorMessage();
}
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
AgencyOperation newVal("Plan/Databases/" + name,
@ -987,14 +980,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
// AgencyCallback for this.
auto agencyCallback =
std::make_shared<AgencyCallback>(ac, where, dbServerChanged, true, false);
auto regres = _agencyCallbackRegistry->registerCallback(agencyCallback);
if (!regres.successful()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) <<
"Could not register call back with error: " << regres.errorCode() <<
" - " << regres.errorMessage();
}
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
// Transact to agency
@ -1150,14 +1136,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
auto agencyCallback = std::make_shared<AgencyCallback>(
ac, "Current/Collections/" + databaseName + "/" + collectionID,
dbServerChanged, true, false);
auto regres = _agencyCallbackRegistry->registerCallback(agencyCallback);
if (!regres.successful()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) <<
"Could not register call back with error: " << regres.errorCode() <<
" - " << regres.errorMessage();
}
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
VPackBuilder builder;
@ -1285,14 +1264,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
// AgencyCallback for this.
auto agencyCallback =
std::make_shared<AgencyCallback>(ac, where, dbServerChanged, true, false);
auto regres = _agencyCallbackRegistry->registerCallback(agencyCallback);
if (!regres.successful()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) <<
"Could not register call back with error: " << regres.errorCode() <<
" - " << regres.errorMessage();
}
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
size_t numberOfShards = 0;
@ -1543,8 +1515,6 @@ int ClusterInfo::ensureIndexCoordinator(
if (idxSlice.isString()) {
// use predefined index id
iid = arangodb::basics::StringUtils::uint64(idxSlice.copyString());
} else if (idxSlice.isNumber()) {
iid = idxSlice.getNumber<uint64_t>();
}
if (iid == 0) {
@ -1775,14 +1745,7 @@ int ClusterInfo::ensureIndexCoordinator(
// AgencyCallback for this.
auto agencyCallback =
std::make_shared<AgencyCallback>(ac, where, dbServerChanged, true, false);
auto regres = _agencyCallbackRegistry->registerCallback(agencyCallback);
if (!regres.successful()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) <<
"Could not register call back with error: " << regres.errorCode() <<
" - " << regres.errorMessage();
}
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
AgencyOperation newValue(key, AgencyValueOperationType::SET,
@ -1809,7 +1772,6 @@ int ClusterInfo::ensureIndexCoordinator(
errorMsg += trx.toJson();
errorMsg += "ClientId: " + result._clientId + " ";
errorMsg += " ResultCode: " + std::to_string(result.errorCode()) + " ";
errorMsg += " Result: " + result.errorMessage() + " ";
errorMsg += std::string(__FILE__) + ":" + std::to_string(__LINE__);
resultBuilder = *resBuilder;
}
@ -1949,14 +1911,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
// AgencyCallback for this.
auto agencyCallback =
std::make_shared<AgencyCallback>(ac, where, dbServerChanged, true, false);
auto regres = _agencyCallbackRegistry->registerCallback(agencyCallback);
if (!regres.successful()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER) <<
"Could not register call back with error: " << regres.errorCode() <<
" - " << regres.errorMessage();
}
_agencyCallbackRegistry->registerCallback(agencyCallback);
TRI_DEFER(_agencyCallbackRegistry->unregisterCallback(agencyCallback));
loadPlan();
@ -2439,26 +2394,28 @@ std::shared_ptr<std::vector<ServerID>> ClusterInfo::getResponsibleServer(
while (true) {
{
READ_LOCKER(readLocker, _currentProt.lock);
// _shardIds is a map-type <ShardId,
// std::shared_ptr<std::vector<ServerId>>>
auto it = _shardIds.find(shardID);
{
READ_LOCKER(readLocker, _currentProt.lock);
// _shardIds is a map-type <ShardId,
// std::shared_ptr<std::vector<ServerId>>>
auto it = _shardIds.find(shardID);
if (it != _shardIds.end()) {
auto serverList = (*it).second;
if (serverList != nullptr && serverList->size() > 0 &&
(*serverList)[0].size() > 0 && (*serverList)[0][0] == '_') {
// This is a temporary situation in which the leader has already
// resigned, let's wait half a second and try again.
--tries;
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "getResponsibleServer: found resigned leader,"
<< "waiting for half a second...";
usleep(500000);
} else {
return (*it).second;
if (it != _shardIds.end()) {
auto serverList = (*it).second;
if (serverList != nullptr && serverList->size() > 0 &&
(*serverList)[0].size() > 0 && (*serverList)[0][0] == '_') {
// This is a temporary situation in which the leader has already
// resigned, let's wait half a second and try again.
--tries;
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "getResponsibleServer: found resigned leader,"
<< "waiting for half a second...";
} else {
return (*it).second;
}
}
}
usleep(500000);
}
if (++tries >= 2) {

View File

@ -241,7 +241,7 @@ void HeartbeatThread::runDBServer() {
bool registered = false;
while (!registered) {
registered =
_agencyCallbackRegistry->registerCallback(planAgencyCallback).successful();
_agencyCallbackRegistry->registerCallback(planAgencyCallback);
if (!registered) {
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Couldn't register plan change in agency!";