1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into spdvpk

This commit is contained in:
Michael Hackstein 2016-04-12 14:12:06 +02:00
commit 50535f6ad8
17 changed files with 370 additions and 126 deletions

View File

@ -38,7 +38,8 @@ namespace arangodb {
namespace consensus { namespace consensus {
// Agent configuration // Agent configuration
Agent::Agent (TRI_server_t* server, config_t const& config, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) Agent::Agent (TRI_server_t* server, config_t const& config,
ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry)
: Thread ("Agent"), : Thread ("Agent"),
_server(server), _server(server),
_vocbase(nullptr), _vocbase(nullptr),
@ -218,7 +219,8 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
if (queries->slice().length()) { if (queries->slice().length()) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Appending "<< queries->slice().length() LOG_TOPIC(INFO, Logger::AGENCY) << "Appending "<< queries->slice().length()
<< " entries to state machine."; << " entries to state machine.";
/* bool success = */_state.log (queries, term, leaderId, prevIndex, prevTerm); /* bool success = */
_state.log (queries, term, leaderId, prevIndex, prevTerm);
} else { } else {
// heart-beat // heart-beat
} }
@ -298,12 +300,13 @@ bool Agent::load () {
LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state."; LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state.";
if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry)) { if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry)) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Failed to load persistent state on statup."; LOG_TOPIC(WARN, Logger::AGENCY)
<< "Failed to load persistent state on statup.";
} }
LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores."; LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores.";
// _read_db.apply(_state.slices());
_spearhead.apply(_state.slices(_last_commit_index+1)); _spearhead.apply(_state.slices(_last_commit_index+1));
reportIn(id(),_state.lastLog().index);
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker."; LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker.";
_spearhead.start(this); _spearhead.start(this);
@ -311,6 +314,11 @@ bool Agent::load () {
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality."; LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality.";
_constituent.start(_vocbase, _applicationV8, _queryRegistry); _constituent.start(_vocbase, _applicationV8, _queryRegistry);
if (_config.sanity_check) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting cluster sanity facilities";
_sanity_check.start(this);
}
return true; return true;
} }
@ -334,7 +342,7 @@ write_ret_t Agent::write (query_t const& query) {
_cv.signal(); // Wake up run _cv.signal(); // Wake up run
} }
reportIn(0,maxind); reportIn(id(),maxind);
return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest
@ -386,6 +394,9 @@ void Agent::beginShutdown() {
_constituent.beginShutdown(); _constituent.beginShutdown();
_spearhead.beginShutdown(); _spearhead.beginShutdown();
_read_db.beginShutdown(); _read_db.beginShutdown();
if (_config.sanity_check) {
_sanity_check.beginShutdown();
}
// Wake up all waiting REST handler (waitFor) // Wake up all waiting REST handler (waitFor)
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);

View File

@ -27,6 +27,7 @@
#include "AgencyCommon.h" #include "AgencyCommon.h"
#include "AgentCallback.h" #include "AgentCallback.h"
#include "Constituent.h" #include "Constituent.h"
#include "SanityCheck.h"
#include "State.h" #include "State.h"
#include "Store.h" #include "Store.h"
@ -42,7 +43,8 @@ class QueryRegistry;
namespace consensus { namespace consensus {
class Agent : public arangodb::Thread { class Agent : public arangodb::Thread {
public:
public:
/// @brief Construct with program options /// @brief Construct with program options
Agent(TRI_server_t*, config_t const&, ApplicationV8*, aql::QueryRegistry*); Agent(TRI_server_t*, config_t const&, ApplicationV8*, aql::QueryRegistry*);
@ -77,6 +79,8 @@ class Agent : public arangodb::Thread {
/// @brief Leader ID /// @brief Leader ID
id_t leaderID() const; id_t leaderID() const;
/// @brief Are we leading?
bool leading() const; bool leading() const;
/// @brief Pick up leadership tasks /// @brief Pick up leadership tasks
@ -148,7 +152,9 @@ class Agent : public arangodb::Thread {
aql::QueryRegistry* _queryRegistry; aql::QueryRegistry* _queryRegistry;
Constituent _constituent; /**< @brief Leader election delegate */ Constituent _constituent; /**< @brief Leader election delegate */
SanityCheck _sanity_check; /**< @brief sanitychecking */
State _state; /**< @brief Log replica */ State _state; /**< @brief Log replica */
config_t _config; /**< @brief Command line arguments */ config_t _config; /**< @brief Command line arguments */
std::atomic<index_t> _last_commit_index; /**< @brief Last commit index */ std::atomic<index_t> _last_commit_index; /**< @brief Last commit index */
@ -165,7 +171,7 @@ class Agent : public arangodb::Thread {
_confirmed; /**< @brief Confirmed log index of each slave */ _confirmed; /**< @brief Confirmed log index of each slave */
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */ arangodb::Mutex _ioLock; /**< @brief Read/Write lock */
}; };
}
} }}
#endif #endif

View File

@ -38,7 +38,7 @@ public:
AgentCallback(); AgentCallback();
explicit AgentCallback(Agent* agent, id_t slave_id, index_t last); AgentCallback(Agent* agent, id_t slave_id, index_t last);
virtual bool operator()(arangodb::ClusterCommResult*) override final; virtual bool operator()(arangodb::ClusterCommResult*) override final;

View File

@ -38,17 +38,21 @@ using namespace arangodb::basics;
using namespace arangodb::rest; using namespace arangodb::rest;
using namespace arangodb; using namespace arangodb;
ApplicationAgency::ApplicationAgency(TRI_server_t* server, ApplicationAgency::ApplicationAgency(
ApplicationEndpointServer* aes, TRI_server_t* server, ApplicationEndpointServer* aes,
ApplicationV8* applicationV8, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry)
aql::QueryRegistry* queryRegistry) : ApplicationFeature("agency"),
: ApplicationFeature("agency"), _server(server), _size(1), _server(server),
_min_election_timeout(0.15), _max_election_timeout(1.0), _size(1),
_election_call_rate_mul(0.85), _notify(false), _sanity_check(false), _min_election_timeout(0.15),
_agent_id((std::numeric_limits<uint32_t>::max)()), _max_election_timeout(1.0),
_endpointServer(aes), _election_call_rate_mul(0.85),
_applicationV8(applicationV8), _notify(false),
_queryRegistry(queryRegistry) { _sanity_check(false),
_agent_id((std::numeric_limits<uint32_t>::max)()),
_endpointServer(aes),
_applicationV8(applicationV8),
_queryRegistry(queryRegistry) {
} }

View File

@ -1,3 +1,4 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER /// DISCLAIMER
/// ///
@ -67,14 +68,28 @@ void Constituent::configure(Agent* agent) {
// Default ctor // Default ctor
Constituent::Constituent() : Constituent::Constituent() :
Thread("Constituent"), _term(0), _leader_id(0), _id(0), _gen(std::random_device()()), Thread("Constituent"),
_role(FOLLOWER), _agent(0), _voted_for(0) {} _vocbase(nullptr),
_applicationV8(nullptr),
_queryRegistry(nullptr),
_term(0),
_leader_id(0),
_id(0),
_gen(std::random_device()()),
_role(FOLLOWER),
_agent(0),
_voted_for(0) {}
// Shutdown if not already // Shutdown if not already
Constituent::~Constituent() { Constituent::~Constituent() {
shutdown(); shutdown();
} }
// Configuration
config_t const& Constituent::config () const {
return _agent->config();
}
// Random sleep times in election process // Random sleep times in election process
duration_t Constituent::sleepFor (double min_t, double max_t) { duration_t Constituent::sleepFor (double min_t, double max_t) {
dist_t dis(min_t, max_t); dist_t dis(min_t, max_t);
@ -105,8 +120,10 @@ void Constituent::term(term_t t) {
body.close(); body.close();
TRI_ASSERT(_vocbase != nullptr); TRI_ASSERT(_vocbase != nullptr);
auto transactionContext = std::make_shared<StandaloneTransactionContext>(_vocbase); auto transactionContext =
SingleCollectionTransaction trx(transactionContext, "election", TRI_TRANSACTION_WRITE); std::make_shared<StandaloneTransactionContext>(_vocbase);
SingleCollectionTransaction trx(transactionContext, "election",
TRI_TRANSACTION_WRITE);
int res = trx.begin(); int res = trx.begin();
@ -133,7 +150,8 @@ role_t Constituent::role () const {
/// @brief Become follower in term /// @brief Become follower in term
void Constituent::follow (term_t t) { void Constituent::follow (term_t t) {
if (_role != FOLLOWER) { if (_role != FOLLOWER) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Role change: Converted to follower in term " << t; LOG_TOPIC(INFO, Logger::AGENCY)
<< "Role change: Converted to follower in term " << t;
} }
this->term(t); this->term(t);
_role = FOLLOWER; _role = FOLLOWER;
@ -142,7 +160,8 @@ void Constituent::follow (term_t t) {
/// @brief Become leader /// @brief Become leader
void Constituent::lead () { void Constituent::lead () {
if (_role != LEADER) { if (_role != LEADER) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Role change: Converted to leader in term " << _term ; LOG_TOPIC(INFO, Logger::AGENCY)
<< "Role change: Converted to leader in term " << _term ;
_agent->lead(); // We need to rebuild spear_head and read_db; _agent->lead(); // We need to rebuild spear_head and read_db;
} }
_role = LEADER; _role = LEADER;
@ -152,7 +171,8 @@ void Constituent::lead () {
/// @brief Become follower /// @brief Become follower
void Constituent::candidate () { void Constituent::candidate () {
if (_role != CANDIDATE) if (_role != CANDIDATE)
LOG_TOPIC(INFO, Logger::AGENCY) << "Role change: Converted to candidate in term " << _term ; LOG_TOPIC(INFO, Logger::AGENCY)
<< "Role change: Converted to candidate in term " << _term ;
_role = CANDIDATE; _role = CANDIDATE;
} }
@ -178,17 +198,17 @@ id_t Constituent::leaderID () const {
/// @brief Agency size /// @brief Agency size
size_t Constituent::size() const { size_t Constituent::size() const {
return _agent->config().size(); return config().size();
} }
/// @brief Get endpoint to an id /// @brief Get endpoint to an id
std::string const& Constituent::end_point(id_t id) const { std::string const& Constituent::end_point(id_t id) const {
return _agent->config().end_points[id]; return config().end_points[id];
} }
/// @brief Get all endpoints /// @brief Get all endpoints
std::vector<std::string> const& Constituent::end_points() const { std::vector<std::string> const& Constituent::end_points() const {
return _agent->config().end_points; return config().end_points;
} }
/// @brief Notify peers of updated endpoints /// @brief Notify peers of updated endpoints
@ -265,32 +285,31 @@ void Constituent::callElection() {
} }
std::string body; std::string body;
std::vector<ClusterCommResult> results(_agent->config().end_points.size()); std::vector<ClusterCommResult> results(config().end_points.size());
std::stringstream path; std::stringstream path;
path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" << _id path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId="
<< "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm=" << _id << "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm="
<< _agent->lastLog().term; << _agent->lastLog().term;
// Ask everyone for their vote // Ask everyone for their vote
for (id_t i = 0; i < _agent->config().end_points.size(); ++i) { for (id_t i = 0; i < config().end_points.size(); ++i) {
if (i != _id && end_point(i) != "") { if (i != _id && end_point(i) != "") {
std::unique_ptr<std::map<std::string, std::string>> headerFields = std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string> >(); std::make_unique<std::map<std::string, std::string> >();
results[i] = arangodb::ClusterComm::instance()->asyncRequest( results[i] = arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, _agent->config().end_points[i], GeneralRequest::RequestType::GET, "1", 1, config().end_points[i], GeneralRequest::RequestType::GET,
path.str(), std::make_shared<std::string>(body), headerFields, nullptr, path.str(), std::make_shared<std::string>(body), headerFields, nullptr,
_agent->config().min_ping, true); config().min_ping, true);
} }
} }
// Wait randomized timeout // Wait randomized timeout
std::this_thread::sleep_for( std::this_thread::sleep_for(
sleepFor(.5*_agent->config().min_ping, sleepFor(.5*config().min_ping, .8*config().min_ping));
.8*_agent->config().min_ping));
// Collect votes // Collect votes
for (id_t i = 0; i < _agent->config().end_points.size(); ++i) { for (id_t i = 0; i < config().end_points.size(); ++i) {
if (i != _id && end_point(i) != "") { if (i != _id && end_point(i) != "") {
ClusterCommResult res = arangodb::ClusterComm::instance()-> ClusterCommResult res = arangodb::ClusterComm::instance()->
enquire(results[i].operationID); enquire(results[i].operationID);
@ -341,8 +360,6 @@ void Constituent::beginShutdown() {
} }
#include <iostream>
bool Constituent::start (TRI_vocbase_t* vocbase, bool Constituent::start (TRI_vocbase_t* vocbase,
ApplicationV8* applicationV8, ApplicationV8* applicationV8,
aql::QueryRegistry* queryRegistry) { aql::QueryRegistry* queryRegistry) {
@ -356,6 +373,7 @@ bool Constituent::start (TRI_vocbase_t* vocbase,
void Constituent::run() { void Constituent::run() {
TRI_ASSERT(_vocbase != nullptr); TRI_ASSERT(_vocbase != nullptr);
auto bindVars = std::make_shared<VPackBuilder>(); auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject(); bindVars->openObject();
@ -367,8 +385,8 @@ void Constituent::run() {
// Query // Query
std::string const aql ("FOR l IN election SORT l._key DESC LIMIT 1 RETURN l"); std::string const aql ("FOR l IN election SORT l._key DESC LIMIT 1 RETURN l");
arangodb::aql::Query query(_applicationV8, false, _vocbase, arangodb::aql::Query query(_applicationV8, false, _vocbase,
aql.c_str(), aql.size(), bindVars, nullptr, aql.c_str(), aql.size(), bindVars, nullptr,
arangodb::aql::PART_MAIN); arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry); auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) { if (queryResult.code != TRI_ERROR_NO_ERROR) {
@ -376,25 +394,25 @@ void Constituent::run() {
} }
VPackSlice result = queryResult.result->slice(); VPackSlice result = queryResult.result->slice();
if (result.isArray()) { if (result.isArray()) {
for (auto const& i : VPackArrayIterator(result)) { for (auto const& i : VPackArrayIterator(result)) {
try { try {
_term = i.get("term").getUInt(); _term = i.get("term").getUInt();
_voted_for = i.get("voted_for").getUInt(); _voted_for = i.get("voted_for").getUInt();
} catch (std::exception const& e) { } catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) LOG_TOPIC(ERR, Logger::AGENCY)
<< "Persisted election entries corrupt! Defaulting term,vote (0,0)"; << "Persisted election entries corrupt! Defaulting term,vote (0,0)";
} }
} }
} }
// Always start off as follower // Always start off as follower
while (!this->isStopping() && size() > 1) { while (!this->isStopping() && size() > 1) {
if (_role == FOLLOWER) { if (_role == FOLLOWER) {
_cast = false; // New round set not cast vote _cast = false; // New round set not cast vote
std::this_thread::sleep_for( // Sleep for random time std::this_thread::sleep_for( // Sleep for random time
sleepFor(_agent->config().min_ping, _agent->config().max_ping)); sleepFor(config().min_ping, config().max_ping));
if (!_cast) { if (!_cast) {
candidate(); // Next round, we are running candidate(); // Next round, we are running
} }

View File

@ -33,7 +33,6 @@
#include "AgencyCommon.h" #include "AgencyCommon.h"
#include "Basics/Thread.h" #include "Basics/Thread.h"
struct TRI_server_t;
struct TRI_vocbase_t; struct TRI_vocbase_t;
namespace arangodb { namespace arangodb {
@ -93,6 +92,9 @@ public:
/// @brief Who is leading /// @brief Who is leading
id_t leaderID () const; id_t leaderID () const;
/// @brief Configuration
config_t const& config () const;
/// @brief Become follower /// @brief Become follower
void follow(term_t); void follow(term_t);
@ -135,7 +137,6 @@ private:
/// @brief Sleep for how long /// @brief Sleep for how long
duration_t sleepFor(double, double); duration_t sleepFor(double, double);
TRI_server_t* _server;
TRI_vocbase_t* _vocbase; TRI_vocbase_t* _vocbase;
ApplicationV8* _applicationV8; ApplicationV8* _applicationV8;
aql::QueryRegistry* _queryRegistry; aql::QueryRegistry* _queryRegistry;

View File

@ -1,28 +1,58 @@
#include "SanityCheck.h" #include "SanityCheck.h"
#include "Agent.h" #include "Agent.h"
#include "Basics/ConditionLocker.h"
using namespace arangodb::consensus; using namespace arangodb::consensus;
SanityCheck::SanityCheck() : arangodb::Thread("SanityCheck"), _agent(nullptr) {} SanityCheck::SanityCheck() : arangodb::Thread("SanityCheck"), _agent(nullptr) {}
SanityCheck::~SanityCheck() {}; SanityCheck::~SanityCheck() {
shutdown();
void SanityCheck::configure(Agent* agent) { };
_agent = agent;
}
void SanityCheck::wakeUp () { void SanityCheck::wakeUp () {
_cv.signal();
} }
void SanityCheck::passOut () { bool SanityCheck::doChecks (bool timedout) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Sanity checks";
return true;
} }
void SanityCheck::run() { void SanityCheck::run() {
CONDITION_LOCKER(guard, _cv);
TRI_ASSERT(_agent!=nullptr);
bool timedout = false;
while (!this->isStopping()) {
if (_agent->leading()) {
timedout = _cv.wait(1000000);
} else {
_cv.wait();
}
doChecks(timedout);
}
}
// Start thread
bool SanityCheck::start () {
Thread::start();
return true;
}
// Start thread with agent
bool SanityCheck::start (Agent* agent) {
_agent = agent;
return start();
} }
void SanityCheck::beginShutdown() { void SanityCheck::beginShutdown() {
// Personal hygiene
Thread::beginShutdown();
} }

View File

@ -42,9 +42,12 @@ public:
/// @brief Default dtor /// @brief Default dtor
~SanityCheck (); ~SanityCheck ();
/// @brief Configure with agent /// @brief Start thread
void configure (Agent* agent); bool start ();
/// @brief Start thread with access to agent
bool start (Agent*);
/// @brief Run woker /// @brief Run woker
void run() override final; void run() override final;
@ -54,16 +57,15 @@ public:
/// @brief Wake up to task /// @brief Wake up to task
void wakeUp (); void wakeUp ();
/// @brief Stop task and wait
void passOut ();
private: private:
/// @brief Perform sanity checking
bool doChecks(bool);
Agent* _agent; Agent* _agent; /**< @brief My agent */
arangodb::basics::ConditionVariable _cv; /**< @brief Control if thread should run */ arangodb::basics::ConditionVariable _cv; /**< @brief Control if thread should run */
}; };

View File

@ -75,8 +75,10 @@ bool State::persist(index_t index, term_t term, id_t lid,
body.close(); body.close();
TRI_ASSERT(_vocbase != nullptr); TRI_ASSERT(_vocbase != nullptr);
auto transactionContext = std::make_shared<StandaloneTransactionContext>(_vocbase); auto transactionContext =
SingleCollectionTransaction trx(transactionContext, "log", TRI_TRANSACTION_WRITE); std::make_shared<StandaloneTransactionContext>(_vocbase);
SingleCollectionTransaction trx (
transactionContext, "log", TRI_TRANSACTION_WRITE);
int res = trx.begin(); int res = trx.begin();
@ -131,7 +133,7 @@ bool State::log(query_t const& queries, term_t term, id_t lid,
buf->append((char const*)i.get("query").begin(), buf->append((char const*)i.get("query").begin(),
i.get("query").byteSize()); i.get("query").byteSize());
_log.push_back(log_t(i.get("index").getUInt(), term, lid, buf)); _log.push_back(log_t(i.get("index").getUInt(), term, lid, buf));
persist(i.get("index").getUInt(), term, lid, i.get("query")); // log to disk persist(i.get("index").getUInt(), term, lid, i.get("query")); // to disk
} catch (std::exception const& e) { } catch (std::exception const& e) {
LOG(ERR) << e.what(); LOG(ERR) << e.what();
} }
@ -194,7 +196,8 @@ bool State::createCollections() {
bool State::checkCollection(std::string const& name) { bool State::checkCollection(std::string const& name) {
if (!_collections_checked) { if (!_collections_checked) {
return (TRI_LookupCollectionByNameVocBase(_vocbase, name.c_str()) != nullptr); return (
TRI_LookupCollectionByNameVocBase(_vocbase, name.c_str()) != nullptr);
} }
return true; return true;
} }
@ -204,7 +207,8 @@ bool State::createCollection(std::string const& name) {
body.add(VPackValue(VPackValueType::Object)); body.add(VPackValue(VPackValueType::Object));
body.close(); body.close();
VocbaseCollectionInfo parameters(_vocbase, name.c_str(), TRI_COL_TYPE_DOCUMENT, body.slice()); VocbaseCollectionInfo parameters(_vocbase, name.c_str(),
TRI_COL_TYPE_DOCUMENT, body.slice());
TRI_vocbase_col_t const* collection = TRI_vocbase_col_t const* collection =
TRI_CreateCollectionVocBase(_vocbase, parameters, parameters.id(), true); TRI_CreateCollectionVocBase(_vocbase, parameters, parameters.id(), true);
@ -216,8 +220,7 @@ bool State::createCollection(std::string const& name) {
} }
bool State::loadCollections(TRI_vocbase_t* vocbase, bool State::loadCollections(TRI_vocbase_t* vocbase, ApplicationV8* applicationV8,
ApplicationV8* applicationV8,
aql::QueryRegistry* queryRegistry) { aql::QueryRegistry* queryRegistry) {
_vocbase = vocbase; _vocbase = vocbase;
_applicationV8 = applicationV8; _applicationV8 = applicationV8;
@ -227,20 +230,21 @@ bool State::loadCollections(TRI_vocbase_t* vocbase,
bool State::loadCollection(std::string const& name) { bool State::loadCollection(std::string const& name) {
TRI_ASSERT(_vocbase != nullptr); TRI_ASSERT(_vocbase != nullptr);
if (checkCollection(name)) { if (checkCollection(name)) {
auto bindVars = std::make_shared<VPackBuilder>(); auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject(); bindVars->openObject();
bindVars->close(); bindVars->close();
// ^^^ TODO: check if bindvars are actually needed // ^^^ TODO: check if bindvars are actually needed
TRI_ASSERT(_applicationV8 != nullptr); TRI_ASSERT(_applicationV8 != nullptr);
TRI_ASSERT(_queryRegistry != nullptr); TRI_ASSERT(_queryRegistry != nullptr);
std::string const aql(std::string("FOR l IN ") + name + " SORT l._key RETURN l"); std::string const aql(std::string("FOR l IN ") + name
+ " SORT l._key RETURN l");
arangodb::aql::Query query(_applicationV8, false, _vocbase, arangodb::aql::Query query(_applicationV8, false, _vocbase,
aql.c_str(), aql.size(), bindVars, nullptr, aql.c_str(), aql.size(), bindVars, nullptr,
arangodb::aql::PART_MAIN); arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry); auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) { if (queryResult.code != TRI_ERROR_NO_ERROR) {
@ -255,9 +259,10 @@ bool State::loadCollection(std::string const& name) {
std::make_shared<arangodb::velocypack::Buffer<uint8_t>>(); std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
VPackSlice req = i.get("request"); VPackSlice req = i.get("request");
tmp->append(req.startAs<char const>(), req.byteSize()); tmp->append(req.startAs<char const>(), req.byteSize());
_log.push_back(log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()), _log.push_back(
i.get("term").getUInt(), log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()),
i.get("leader").getUInt(), tmp)); i.get("term").getUInt(),
i.get("leader").getUInt(), tmp));
} }
} }

View File

@ -22,6 +22,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include "Store.h" #include "Store.h"
#include "StoreCallback.h"
#include "Agency/Agent.h" #include "Agency/Agent.h"
#include "Basics/ConditionLocker.h" #include "Basics/ConditionLocker.h"
#include "Basics/VelocyPackHelper.h" #include "Basics/VelocyPackHelper.h"
@ -37,6 +38,39 @@
using namespace arangodb::consensus; using namespace arangodb::consensus;
inline static bool endpointPathFromUrl (
std::string const& url, std::string& endpoint, std::string& path) {
std::stringstream ep;
path = "/";
size_t pos = 7;
if (url.find("http://")==0) {
ep << "tcp://";
} else if (url.find("https://")==0) {
ep << "ssl://";
++pos;
} else {
return false;
}
size_t slash_p = url.find("/",pos);
if (slash_p==std::string::npos) {
ep << url.substr(pos);
} else {
ep << url.substr(pos,slash_p-pos);
path = url.substr(slash_p);
}
if (ep.str().find(':')==std::string::npos) {
ep << ":8529";
}
endpoint = ep.str();
return true;
}
struct NotEmpty { struct NotEmpty {
bool operator()(const std::string& s) { return !s.empty(); } bool operator()(const std::string& s) { return !s.empty(); }
}; };
@ -647,7 +681,8 @@ std::vector<bool> Store::apply (query_t const& query) {
} }
//template<class T, class U> std::multimap<std::string, std::string> //template<class T, class U> std::multimap<std::string, std::string>
std::ostream& operator<< (std::ostream& os, std::multimap<std::string,std::string> const& m) { std::ostream& operator<< (
std::ostream& os, std::multimap<std::string,std::string> const& m) {
for (auto const& i : m) { for (auto const& i : m) {
os << i.first << ": " << i.second << std::endl; os << i.first << ": " << i.second << std::endl;
} }
@ -700,30 +735,45 @@ std::vector<bool> Store::apply (
} }
std::vector<std::string> urls; std::vector<std::string> urls;
for(auto it = in.begin(), end = in.end(); it != end; it = in.upper_bound(it->first)) { for (auto it = in.begin(), end = in.end(); it != end;
it = in.upper_bound(it->first)) {
urls.push_back(it->first); urls.push_back(it->first);
} }
for (auto const& url : urls) { for (auto const& url : urls) {
Builder tmp; // host
tmp.openObject(); Builder body; // host
tmp.add("term",VPackValue(0)); body.openObject();
tmp.add("index",VPackValue(0)); body.add("term",VPackValue(0));
body.add("index",VPackValue(0));
auto ret = in.equal_range(url); auto ret = in.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) { for (auto it = ret.first; it!=ret.second; ++it) {
//tmp.add(url,VPackValue(VPackValueType::Object)); body.add(it->second->key,VPackValue(VPackValueType::Object));
tmp.add(it->second->key,VPackValue(VPackValueType::Object)); body.add("op",VPackValue(it->second->oper));
tmp.add("op",VPackValue(it->second->oper)); body.close();
//tmp.close(); }
tmp.close();
}
tmp.close(); body.close();
std::cout << tmp.toJson() << std::endl;
std::string endpoint, path;
if (endpointPathFromUrl (url,endpoint,path)) {
std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string> >();
ClusterCommResult res =
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, endpoint, GeneralRequest::RequestType::POST, path,
std::make_shared<std::string>(body.toString()), headerFields,
std::make_shared<StoreCallback>(), 0.0, true);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) << "Malformed URL " << url;
}
} }
return applied; return applied;
} }
@ -791,7 +841,7 @@ std::vector<bool> Store::read (query_t const& queries, query_t& result) const {
// read single query into ret // read single query into ret
bool Store::read (VPackSlice const& query, Builder& ret) const { bool Store::read (VPackSlice const& query, Builder& ret) const {
bool success = true; bool success = true;
// Collect all paths // Collect all paths
@ -837,7 +887,7 @@ bool Store::read (VPackSlice const& query, Builder& ret) const {
} }
} }
} }
// Into result builder // Into result builder
copy.toBuilder(ret); copy.toBuilder(ret);

View File

@ -0,0 +1,11 @@
#include "StoreCallback.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
StoreCallback::StoreCallback() {}
bool StoreCallback::operator()(arangodb::ClusterCommResult* res) {
return true;
}

View File

@ -0,0 +1,48 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef __ARANGODB_CONSENSUS_STORE_CALLBACK__
#define __ARANGODB_CONSENSUS_STORE_CALLBACK__
#include "Cluster/ClusterComm.h"
namespace arangodb {
namespace consensus {
class StoreCallback : public arangodb::ClusterCommCallback {
public:
StoreCallback();
virtual bool operator()(arangodb::ClusterCommResult*) override final;
void shutdown();
private:
};
}} // namespace
#endif

View File

@ -69,12 +69,13 @@ add_executable(${BIN_ARANGOD}
Actions/RestActionHandler.cpp Actions/RestActionHandler.cpp
Actions/actions.cpp Actions/actions.cpp
Agency/Agent.cpp Agency/Agent.cpp
Agency/AgentCallback.cpp
Agency/ApplicationAgency.cpp Agency/ApplicationAgency.cpp
Agency/Constituent.cpp Agency/Constituent.cpp
Agency/SanityCheck.cpp Agency/SanityCheck.cpp
Agency/State.cpp Agency/State.cpp
Agency/Store.cpp Agency/Store.cpp
Agency/AgentCallback.cpp Agency/StoreCallback.cpp
ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationFeature.cpp
ApplicationServer/ApplicationServer.cpp ApplicationServer/ApplicationServer.cpp
Aql/Aggregator.cpp Aql/Aggregator.cpp

View File

@ -73,15 +73,23 @@ inline HttpHandler::status_t RestAgencyHandler::reportUnknownMethod() {
void RestAgencyHandler::redirectRequest(id_t leaderId) { void RestAgencyHandler::redirectRequest(id_t leaderId) {
/*
std::shared_ptr<Endpoint> ep ( std::shared_ptr<Endpoint> ep (
Endpoint::clientFactory (_agent->config().end_points.at(leaderId))); Endpoint::clientFactory (_agent->config().end_points.at(leaderId)));
std::stringstream url; std::stringstream url;
url << ep->transport() << "://" << ep->hostAndPort()
<< _request->requestPath(); url << ep->transport() << "://";
if (ep->encryption() == arangodb::Endpoint::EncryptionType::SSL) {
url << "s";
}
url << ep->hostAndPort() << _request->requestPath();
*/
std::string url = Endpoint::uriForm(_agent->config().end_points.at(leaderId));
createResponse(GeneralResponse::ResponseCode::TEMPORARY_REDIRECT); createResponse(GeneralResponse::ResponseCode::TEMPORARY_REDIRECT);
static std::string const location = "location"; static std::string const location = "location";
_response->setHeaderNC(location, url.str()); _response->setHeaderNC(location, url);
} }
HttpHandler::status_t RestAgencyHandler::handleStores () { HttpHandler::status_t RestAgencyHandler::handleStores () {

View File

@ -1232,6 +1232,26 @@ function startInstanceCluster(instanceInfo, protocol, options,
httpOptions.method = 'POST'; httpOptions.method = 'POST';
httpOptions.returnBodyOnError = true; httpOptions.returnBodyOnError = true;
let count = 0;
instanceInfo.arangods.forEach(arangod => {
while (true) {
const reply = download(arangod.url + "/_api/version", "", makeAuthorizationHeaders(options));
if (!reply.error && reply.code === 200) {
break;
}
++count;
if (count % 60 === 0) {
if (!checkArangoAlive(arangod, options)) {
throw new Error("startup failed! bailing out!");
}
}
wait(0.5, false);
}
});
response = download(coordinatorUrl + '/_admin/cluster/bootstrapDbServers', '{"isRelaunch":false}', httpOptions); response = download(coordinatorUrl + '/_admin/cluster/bootstrapDbServers', '{"isRelaunch":false}', httpOptions);
while (response.code !== 200) { while (response.code !== 200) {
@ -1294,24 +1314,6 @@ function startArango(protocol, options, addArgs, name, rootDir) {
const startTime = time(); const startTime = time();
instanceInfo.pid = executeValgrind(ARANGOD_BIN, toArgv(args), options, name).pid; instanceInfo.pid = executeValgrind(ARANGOD_BIN, toArgv(args), options, name).pid;
let count = 0;
while (true) {
wait(0.5, false);
const reply = download(instanceInfo.url + "/_api/version", "", makeAuthorizationHeaders(options));
if (!reply.error && reply.code === 200) {
break;
}
++count;
if (count % 60 === 0) {
if (!checkArangoAlive(instanceInfo, options)) {
throw new Error("startup failed! bailing out!");
}
}
}
if (platform.substr(0, 3) === 'win') { if (platform.substr(0, 3) === 'win') {
const procdumpArgs = [ const procdumpArgs = [
'-accepteula', '-accepteula',
@ -1371,6 +1373,7 @@ function startInstanceSingleServer(instanceInfo, protocol, options,
addArgs, testname, rootDir) { addArgs, testname, rootDir) {
instanceInfo.arangods.push(startArango(protocol, options, addArgs, testname, rootDir)); instanceInfo.arangods.push(startArango(protocol, options, addArgs, testname, rootDir));
instanceInfo.endpoint = instanceInfo.arangods[instanceInfo.arangods.length - 1].endpoint; instanceInfo.endpoint = instanceInfo.arangods[instanceInfo.arangods.length - 1].endpoint;
instanceInfo.url = instanceInfo.arangods[instanceInfo.arangods.length - 1].url; instanceInfo.url = instanceInfo.arangods[instanceInfo.arangods.length - 1].url;
@ -1380,7 +1383,7 @@ function startInstanceSingleServer(instanceInfo, protocol, options,
function startInstance(protocol, options, addArgs, testname, tmpDir) { function startInstance(protocol, options, addArgs, testname, tmpDir) {
let rootDir = fs.join(tmpDir || fs.getTempFile(), testname); let rootDir = fs.join(tmpDir || fs.getTempFile(), testname);
let instanceInfo = {rootDir, arangods: []}; let instanceInfo = {rootDir, arangods: []};
try { try {
if (options.cluster) { if (options.cluster) {
startInstanceCluster(instanceInfo, protocol, options, startInstanceCluster(instanceInfo, protocol, options,
@ -1390,12 +1393,36 @@ function startInstance(protocol, options, addArgs, testname, tmpDir) {
addArgs, testname, rootDir); addArgs, testname, rootDir);
} else { } else {
startInstanceSingleServer(instanceInfo, protocol, options, startInstanceSingleServer(instanceInfo, protocol, options,
addArgs, testname, rootDir); addArgs, testname, rootDir);
}
if (!options.cluster) {
let count = 0;
instanceInfo.arangods.forEach(arangod => {
while (true) {
const reply = download(arangod.url + "/_api/version", "", makeAuthorizationHeaders(options));
if (!reply.error && reply.code === 200) {
break;
}
++count;
if (count % 60 === 0) {
if (!checkArangoAlive(arangod, options)) {
throw new Error("startup failed! bailing out!");
}
}
wait(0.5, false);
}
});
} }
} catch (e) { } catch (e) {
print(e, e.stack); print(e, e.stack);
return false; return false;
} }
return instanceInfo; return instanceInfo;
} }

View File

@ -51,6 +51,27 @@ Endpoint::Endpoint(DomainType domainType, EndpointType type,
TRI_invalidatesocket(&_socket); TRI_invalidatesocket(&_socket);
} }
std::string Endpoint::uriForm (std::string const& endpoint) {
std::stringstream url;
size_t const prefix_len = 6;
if (StringUtils::isPrefix(endpoint, "tcp://")) {
url << "http://";
} else if (StringUtils::isPrefix(endpoint, "ssl://")) {
url << "https://";
} else {
throw arangodb::basics::Exception (
0, std::string("malformed URL ") + endpoint
+ ". Support only for ssl:// and tcp:// endpoints." , __FILE__, __LINE__);
}
url << endpoint.substr(prefix_len,endpoint.size()+1-(prefix_len+1));
return url.str();
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief return the endpoint specification in a unified form /// @brief return the endpoint specification in a unified form
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -68,9 +89,9 @@ std::string Endpoint::unifiedForm(std::string const& specification) {
std::string copy = StringUtils::tolower(specification); std::string copy = StringUtils::tolower(specification);
StringUtils::trimInPlace(copy); StringUtils::trimInPlace(copy);
if (specification[specification.size() - 1] == '/') { if (specification.back() == '/') {
// address ends with a slash => remove // address ends with a slash => remove
copy = copy.substr(0, copy.size() - 1); copy.pop_back();
} }
// read protocol from string // read protocol from string

View File

@ -52,6 +52,7 @@ class Endpoint {
virtual ~Endpoint() {} virtual ~Endpoint() {}
public: public:
static std::string uriForm(std::string const&);
static std::string unifiedForm(std::string const&); static std::string unifiedForm(std::string const&);
static Endpoint* serverFactory(std::string const&, int, bool reuseAddress); static Endpoint* serverFactory(std::string const&, int, bool reuseAddress);
static Endpoint* clientFactory(std::string const&); static Endpoint* clientFactory(std::string const&);