diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 0517d579bf..f5e3161ce3 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -38,7 +38,8 @@ namespace arangodb { namespace consensus { // Agent configuration -Agent::Agent (TRI_server_t* server, config_t const& config, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) +Agent::Agent (TRI_server_t* server, config_t const& config, + ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) : Thread ("Agent"), _server(server), _vocbase(nullptr), @@ -218,7 +219,8 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, if (queries->slice().length()) { LOG_TOPIC(INFO, Logger::AGENCY) << "Appending "<< queries->slice().length() << " entries to state machine."; - /* bool success = */_state.log (queries, term, leaderId, prevIndex, prevTerm); + /* bool success = */ + _state.log (queries, term, leaderId, prevIndex, prevTerm); } else { // heart-beat } @@ -298,12 +300,13 @@ bool Agent::load () { LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state."; if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry)) { - LOG_TOPIC(WARN, Logger::AGENCY) << "Failed to load persistent state on statup."; + LOG_TOPIC(WARN, Logger::AGENCY) + << "Failed to load persistent state on statup."; } LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores."; -// _read_db.apply(_state.slices()); _spearhead.apply(_state.slices(_last_commit_index+1)); + reportIn(id(),_state.lastLog().index); LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker."; _spearhead.start(this); @@ -311,6 +314,11 @@ bool Agent::load () { LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality."; _constituent.start(_vocbase, _applicationV8, _queryRegistry); + + if (_config.sanity_check) { + LOG_TOPIC(INFO, Logger::AGENCY) << "Starting cluster sanity facilities"; + _sanity_check.start(this); + } return true; } @@ -334,7 +342,7 @@ write_ret_t Agent::write (query_t const& query) { _cv.signal(); // Wake up run } - reportIn(0,maxind); + reportIn(id(),maxind); return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest @@ -386,6 +394,9 @@ void Agent::beginShutdown() { _constituent.beginShutdown(); _spearhead.beginShutdown(); _read_db.beginShutdown(); + if (_config.sanity_check) { + _sanity_check.beginShutdown(); + } // Wake up all waiting REST handler (waitFor) CONDITION_LOCKER(guard, _cv); diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 5f9ee1a2f5..0e14afdd3f 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -27,6 +27,7 @@ #include "AgencyCommon.h" #include "AgentCallback.h" #include "Constituent.h" +#include "SanityCheck.h" #include "State.h" #include "Store.h" @@ -42,7 +43,8 @@ class QueryRegistry; namespace consensus { class Agent : public arangodb::Thread { - public: + +public: /// @brief Construct with program options Agent(TRI_server_t*, config_t const&, ApplicationV8*, aql::QueryRegistry*); @@ -77,6 +79,8 @@ class Agent : public arangodb::Thread { /// @brief Leader ID id_t leaderID() const; + + /// @brief Are we leading? bool leading() const; /// @brief Pick up leadership tasks @@ -148,7 +152,9 @@ class Agent : public arangodb::Thread { aql::QueryRegistry* _queryRegistry; Constituent _constituent; /**< @brief Leader election delegate */ + SanityCheck _sanity_check; /**< @brief sanitychecking */ State _state; /**< @brief Log replica */ + config_t _config; /**< @brief Command line arguments */ std::atomic _last_commit_index; /**< @brief Last commit index */ @@ -165,7 +171,7 @@ class Agent : public arangodb::Thread { _confirmed; /**< @brief Confirmed log index of each slave */ arangodb::Mutex _ioLock; /**< @brief Read/Write lock */ }; -} -} + +}} #endif diff --git a/arangod/Agency/AgentCallback.h b/arangod/Agency/AgentCallback.h index c974563c0d..64754766c2 100644 --- a/arangod/Agency/AgentCallback.h +++ b/arangod/Agency/AgentCallback.h @@ -38,7 +38,7 @@ public: AgentCallback(); - explicit AgentCallback(Agent* agent, id_t slave_id, index_t last); + AgentCallback(Agent* agent, id_t slave_id, index_t last); virtual bool operator()(arangodb::ClusterCommResult*) override final; diff --git a/arangod/Agency/ApplicationAgency.cpp b/arangod/Agency/ApplicationAgency.cpp index e9f1c0e3a2..9c7ab5882d 100644 --- a/arangod/Agency/ApplicationAgency.cpp +++ b/arangod/Agency/ApplicationAgency.cpp @@ -38,17 +38,21 @@ using namespace arangodb::basics; using namespace arangodb::rest; using namespace arangodb; -ApplicationAgency::ApplicationAgency(TRI_server_t* server, - ApplicationEndpointServer* aes, - ApplicationV8* applicationV8, - aql::QueryRegistry* queryRegistry) - : ApplicationFeature("agency"), _server(server), _size(1), - _min_election_timeout(0.15), _max_election_timeout(1.0), - _election_call_rate_mul(0.85), _notify(false), _sanity_check(false), - _agent_id((std::numeric_limits::max)()), - _endpointServer(aes), - _applicationV8(applicationV8), - _queryRegistry(queryRegistry) { +ApplicationAgency::ApplicationAgency( + TRI_server_t* server, ApplicationEndpointServer* aes, + ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) + : ApplicationFeature("agency"), + _server(server), + _size(1), + _min_election_timeout(0.15), + _max_election_timeout(1.0), + _election_call_rate_mul(0.85), + _notify(false), + _sanity_check(false), + _agent_id((std::numeric_limits::max)()), + _endpointServer(aes), + _applicationV8(applicationV8), + _queryRegistry(queryRegistry) { } diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index b37f40283f..0ea68ca8d8 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -1,3 +1,4 @@ + //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// @@ -67,14 +68,28 @@ void Constituent::configure(Agent* agent) { // Default ctor Constituent::Constituent() : - Thread("Constituent"), _term(0), _leader_id(0), _id(0), _gen(std::random_device()()), - _role(FOLLOWER), _agent(0), _voted_for(0) {} + Thread("Constituent"), + _vocbase(nullptr), + _applicationV8(nullptr), + _queryRegistry(nullptr), + _term(0), + _leader_id(0), + _id(0), + _gen(std::random_device()()), + _role(FOLLOWER), + _agent(0), + _voted_for(0) {} // Shutdown if not already Constituent::~Constituent() { shutdown(); } +// Configuration +config_t const& Constituent::config () const { + return _agent->config(); +} + // Random sleep times in election process duration_t Constituent::sleepFor (double min_t, double max_t) { dist_t dis(min_t, max_t); @@ -105,8 +120,10 @@ void Constituent::term(term_t t) { body.close(); TRI_ASSERT(_vocbase != nullptr); - auto transactionContext = std::make_shared(_vocbase); - SingleCollectionTransaction trx(transactionContext, "election", TRI_TRANSACTION_WRITE); + auto transactionContext = + std::make_shared(_vocbase); + SingleCollectionTransaction trx(transactionContext, "election", + TRI_TRANSACTION_WRITE); int res = trx.begin(); @@ -133,7 +150,8 @@ role_t Constituent::role () const { /// @brief Become follower in term void Constituent::follow (term_t t) { if (_role != FOLLOWER) { - LOG_TOPIC(INFO, Logger::AGENCY) << "Role change: Converted to follower in term " << t; + LOG_TOPIC(INFO, Logger::AGENCY) + << "Role change: Converted to follower in term " << t; } this->term(t); _role = FOLLOWER; @@ -142,7 +160,8 @@ void Constituent::follow (term_t t) { /// @brief Become leader void Constituent::lead () { if (_role != LEADER) { - LOG_TOPIC(INFO, Logger::AGENCY) << "Role change: Converted to leader in term " << _term ; + LOG_TOPIC(INFO, Logger::AGENCY) + << "Role change: Converted to leader in term " << _term ; _agent->lead(); // We need to rebuild spear_head and read_db; } _role = LEADER; @@ -152,7 +171,8 @@ void Constituent::lead () { /// @brief Become follower void Constituent::candidate () { if (_role != CANDIDATE) - LOG_TOPIC(INFO, Logger::AGENCY) << "Role change: Converted to candidate in term " << _term ; + LOG_TOPIC(INFO, Logger::AGENCY) + << "Role change: Converted to candidate in term " << _term ; _role = CANDIDATE; } @@ -178,17 +198,17 @@ id_t Constituent::leaderID () const { /// @brief Agency size size_t Constituent::size() const { - return _agent->config().size(); + return config().size(); } /// @brief Get endpoint to an id std::string const& Constituent::end_point(id_t id) const { - return _agent->config().end_points[id]; + return config().end_points[id]; } /// @brief Get all endpoints std::vector const& Constituent::end_points() const { - return _agent->config().end_points; + return config().end_points; } /// @brief Notify peers of updated endpoints @@ -265,32 +285,31 @@ void Constituent::callElection() { } std::string body; - std::vector results(_agent->config().end_points.size()); + std::vector results(config().end_points.size()); std::stringstream path; - path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" << _id - << "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm=" + path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" + << _id << "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm=" << _agent->lastLog().term; // Ask everyone for their vote - for (id_t i = 0; i < _agent->config().end_points.size(); ++i) { + for (id_t i = 0; i < config().end_points.size(); ++i) { if (i != _id && end_point(i) != "") { std::unique_ptr> headerFields = std::make_unique >(); results[i] = arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, _agent->config().end_points[i], GeneralRequest::RequestType::GET, + "1", 1, config().end_points[i], GeneralRequest::RequestType::GET, path.str(), std::make_shared(body), headerFields, nullptr, - _agent->config().min_ping, true); + config().min_ping, true); } } // Wait randomized timeout std::this_thread::sleep_for( - sleepFor(.5*_agent->config().min_ping, - .8*_agent->config().min_ping)); + sleepFor(.5*config().min_ping, .8*config().min_ping)); // Collect votes - for (id_t i = 0; i < _agent->config().end_points.size(); ++i) { + for (id_t i = 0; i < config().end_points.size(); ++i) { if (i != _id && end_point(i) != "") { ClusterCommResult res = arangodb::ClusterComm::instance()-> enquire(results[i].operationID); @@ -341,8 +360,6 @@ void Constituent::beginShutdown() { } -#include - bool Constituent::start (TRI_vocbase_t* vocbase, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) { @@ -356,6 +373,7 @@ bool Constituent::start (TRI_vocbase_t* vocbase, void Constituent::run() { + TRI_ASSERT(_vocbase != nullptr); auto bindVars = std::make_shared(); bindVars->openObject(); @@ -367,8 +385,8 @@ void Constituent::run() { // Query std::string const aql ("FOR l IN election SORT l._key DESC LIMIT 1 RETURN l"); arangodb::aql::Query query(_applicationV8, false, _vocbase, - aql.c_str(), aql.size(), bindVars, nullptr, - arangodb::aql::PART_MAIN); + aql.c_str(), aql.size(), bindVars, nullptr, + arangodb::aql::PART_MAIN); auto queryResult = query.execute(_queryRegistry); if (queryResult.code != TRI_ERROR_NO_ERROR) { @@ -376,25 +394,25 @@ void Constituent::run() { } VPackSlice result = queryResult.result->slice(); - + if (result.isArray()) { for (auto const& i : VPackArrayIterator(result)) { try { - _term = i.get("term").getUInt(); - _voted_for = i.get("voted_for").getUInt(); + _term = i.get("term").getUInt(); + _voted_for = i.get("voted_for").getUInt(); } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << "Persisted election entries corrupt! Defaulting term,vote (0,0)"; } } } - + // Always start off as follower while (!this->isStopping() && size() > 1) { if (_role == FOLLOWER) { _cast = false; // New round set not cast vote std::this_thread::sleep_for( // Sleep for random time - sleepFor(_agent->config().min_ping, _agent->config().max_ping)); + sleepFor(config().min_ping, config().max_ping)); if (!_cast) { candidate(); // Next round, we are running } diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index a10df4efe1..714df335f6 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -33,7 +33,6 @@ #include "AgencyCommon.h" #include "Basics/Thread.h" -struct TRI_server_t; struct TRI_vocbase_t; namespace arangodb { @@ -93,6 +92,9 @@ public: /// @brief Who is leading id_t leaderID () const; + /// @brief Configuration + config_t const& config () const; + /// @brief Become follower void follow(term_t); @@ -135,7 +137,6 @@ private: /// @brief Sleep for how long duration_t sleepFor(double, double); - TRI_server_t* _server; TRI_vocbase_t* _vocbase; ApplicationV8* _applicationV8; aql::QueryRegistry* _queryRegistry; diff --git a/arangod/Agency/SanityCheck.cpp b/arangod/Agency/SanityCheck.cpp index d24d196316..ca9e2db7bb 100644 --- a/arangod/Agency/SanityCheck.cpp +++ b/arangod/Agency/SanityCheck.cpp @@ -1,28 +1,58 @@ #include "SanityCheck.h" - #include "Agent.h" +#include "Basics/ConditionLocker.h" + using namespace arangodb::consensus; SanityCheck::SanityCheck() : arangodb::Thread("SanityCheck"), _agent(nullptr) {} -SanityCheck::~SanityCheck() {}; - -void SanityCheck::configure(Agent* agent) { - _agent = agent; -} +SanityCheck::~SanityCheck() { + shutdown(); +}; void SanityCheck::wakeUp () { + _cv.signal(); } -void SanityCheck::passOut () { +bool SanityCheck::doChecks (bool timedout) { + LOG_TOPIC(INFO, Logger::AGENCY) << "Sanity checks"; + return true; } void SanityCheck::run() { + + CONDITION_LOCKER(guard, _cv); + TRI_ASSERT(_agent!=nullptr); + bool timedout = false; + + while (!this->isStopping()) { + + if (_agent->leading()) { + timedout = _cv.wait(1000000); + } else { + _cv.wait(); + } + + doChecks(timedout); + + } + +} + +// Start thread +bool SanityCheck::start () { + Thread::start(); + return true; +} + +// Start thread with agent +bool SanityCheck::start (Agent* agent) { + _agent = agent; + return start(); } void SanityCheck::beginShutdown() { + // Personal hygiene + Thread::beginShutdown(); } - - - diff --git a/arangod/Agency/SanityCheck.h b/arangod/Agency/SanityCheck.h index 8a21d339e6..36fb547d0e 100644 --- a/arangod/Agency/SanityCheck.h +++ b/arangod/Agency/SanityCheck.h @@ -42,9 +42,12 @@ public: /// @brief Default dtor ~SanityCheck (); - /// @brief Configure with agent - void configure (Agent* agent); - + /// @brief Start thread + bool start (); + + /// @brief Start thread with access to agent + bool start (Agent*); + /// @brief Run woker void run() override final; @@ -54,16 +57,15 @@ public: /// @brief Wake up to task void wakeUp (); - /// @brief Stop task and wait - void passOut (); - private: + + /// @brief Perform sanity checking + bool doChecks(bool); - Agent* _agent; + Agent* _agent; /**< @brief My agent */ arangodb::basics::ConditionVariable _cv; /**< @brief Control if thread should run */ - }; diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 5cc9df9e36..9e75c996c9 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -75,8 +75,10 @@ bool State::persist(index_t index, term_t term, id_t lid, body.close(); TRI_ASSERT(_vocbase != nullptr); - auto transactionContext = std::make_shared(_vocbase); - SingleCollectionTransaction trx(transactionContext, "log", TRI_TRANSACTION_WRITE); + auto transactionContext = + std::make_shared(_vocbase); + SingleCollectionTransaction trx ( + transactionContext, "log", TRI_TRANSACTION_WRITE); int res = trx.begin(); @@ -131,7 +133,7 @@ bool State::log(query_t const& queries, term_t term, id_t lid, buf->append((char const*)i.get("query").begin(), i.get("query").byteSize()); _log.push_back(log_t(i.get("index").getUInt(), term, lid, buf)); - persist(i.get("index").getUInt(), term, lid, i.get("query")); // log to disk + persist(i.get("index").getUInt(), term, lid, i.get("query")); // to disk } catch (std::exception const& e) { LOG(ERR) << e.what(); } @@ -194,7 +196,8 @@ bool State::createCollections() { bool State::checkCollection(std::string const& name) { if (!_collections_checked) { - return (TRI_LookupCollectionByNameVocBase(_vocbase, name.c_str()) != nullptr); + return ( + TRI_LookupCollectionByNameVocBase(_vocbase, name.c_str()) != nullptr); } return true; } @@ -204,7 +207,8 @@ bool State::createCollection(std::string const& name) { body.add(VPackValue(VPackValueType::Object)); body.close(); - VocbaseCollectionInfo parameters(_vocbase, name.c_str(), TRI_COL_TYPE_DOCUMENT, body.slice()); + VocbaseCollectionInfo parameters(_vocbase, name.c_str(), + TRI_COL_TYPE_DOCUMENT, body.slice()); TRI_vocbase_col_t const* collection = TRI_CreateCollectionVocBase(_vocbase, parameters, parameters.id(), true); @@ -216,8 +220,7 @@ bool State::createCollection(std::string const& name) { } -bool State::loadCollections(TRI_vocbase_t* vocbase, - ApplicationV8* applicationV8, +bool State::loadCollections(TRI_vocbase_t* vocbase, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) { _vocbase = vocbase; _applicationV8 = applicationV8; @@ -227,20 +230,21 @@ bool State::loadCollections(TRI_vocbase_t* vocbase, bool State::loadCollection(std::string const& name) { TRI_ASSERT(_vocbase != nullptr); - + if (checkCollection(name)) { auto bindVars = std::make_shared(); bindVars->openObject(); bindVars->close(); // ^^^ TODO: check if bindvars are actually needed - + TRI_ASSERT(_applicationV8 != nullptr); TRI_ASSERT(_queryRegistry != nullptr); - std::string const aql(std::string("FOR l IN ") + name + " SORT l._key RETURN l"); + std::string const aql(std::string("FOR l IN ") + name + + " SORT l._key RETURN l"); arangodb::aql::Query query(_applicationV8, false, _vocbase, aql.c_str(), aql.size(), bindVars, nullptr, arangodb::aql::PART_MAIN); - + auto queryResult = query.execute(_queryRegistry); if (queryResult.code != TRI_ERROR_NO_ERROR) { @@ -255,9 +259,10 @@ bool State::loadCollection(std::string const& name) { std::make_shared>(); VPackSlice req = i.get("request"); tmp->append(req.startAs(), req.byteSize()); - _log.push_back(log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()), - i.get("term").getUInt(), - i.get("leader").getUInt(), tmp)); + _log.push_back( + log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()), + i.get("term").getUInt(), + i.get("leader").getUInt(), tmp)); } } diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 5de4c87fed..8251a52d03 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -22,6 +22,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "Store.h" +#include "StoreCallback.h" #include "Agency/Agent.h" #include "Basics/ConditionLocker.h" #include "Basics/VelocyPackHelper.h" @@ -37,6 +38,39 @@ using namespace arangodb::consensus; +inline static bool endpointPathFromUrl ( + std::string const& url, std::string& endpoint, std::string& path) { + + std::stringstream ep; + path = "/"; + size_t pos = 7; + if (url.find("http://")==0) { + ep << "tcp://"; + } else if (url.find("https://")==0) { + ep << "ssl://"; + ++pos; + } else { + return false; + } + + size_t slash_p = url.find("/",pos); + if (slash_p==std::string::npos) { + ep << url.substr(pos); + } else { + ep << url.substr(pos,slash_p-pos); + path = url.substr(slash_p); + } + + if (ep.str().find(':')==std::string::npos) { + ep << ":8529"; + } + + endpoint = ep.str(); + + return true; + +} + struct NotEmpty { bool operator()(const std::string& s) { return !s.empty(); } }; @@ -647,7 +681,8 @@ std::vector Store::apply (query_t const& query) { } //template std::multimap -std::ostream& operator<< (std::ostream& os, std::multimap const& m) { +std::ostream& operator<< ( + std::ostream& os, std::multimap const& m) { for (auto const& i : m) { os << i.first << ": " << i.second << std::endl; } @@ -700,30 +735,45 @@ std::vector Store::apply ( } std::vector urls; - for(auto it = in.begin(), end = in.end(); it != end; it = in.upper_bound(it->first)) { + for (auto it = in.begin(), end = in.end(); it != end; + it = in.upper_bound(it->first)) { urls.push_back(it->first); } - + for (auto const& url : urls) { - Builder tmp; // host - tmp.openObject(); - tmp.add("term",VPackValue(0)); - tmp.add("index",VPackValue(0)); + + Builder body; // host + body.openObject(); + body.add("term",VPackValue(0)); + body.add("index",VPackValue(0)); auto ret = in.equal_range(url); for (auto it = ret.first; it!=ret.second; ++it) { - //tmp.add(url,VPackValue(VPackValueType::Object)); - tmp.add(it->second->key,VPackValue(VPackValueType::Object)); - tmp.add("op",VPackValue(it->second->oper)); - //tmp.close(); - tmp.close(); - } + body.add(it->second->key,VPackValue(VPackValueType::Object)); + body.add("op",VPackValue(it->second->oper)); + body.close(); + } - tmp.close(); - std::cout << tmp.toJson() << std::endl; + body.close(); + + std::string endpoint, path; + if (endpointPathFromUrl (url,endpoint,path)) { + + std::unique_ptr> headerFields = + std::make_unique >(); + + ClusterCommResult res = + arangodb::ClusterComm::instance()->asyncRequest( + "1", 1, endpoint, GeneralRequest::RequestType::POST, path, + std::make_shared(body.toString()), headerFields, + std::make_shared(), 0.0, true); + + } else { + LOG_TOPIC(WARN, Logger::AGENCY) << "Malformed URL " << url; + } } - + return applied; } @@ -791,7 +841,7 @@ std::vector Store::read (query_t const& queries, query_t& result) const { // read single query into ret bool Store::read (VPackSlice const& query, Builder& ret) const { - + bool success = true; // Collect all paths @@ -837,7 +887,7 @@ bool Store::read (VPackSlice const& query, Builder& ret) const { } } } - + // Into result builder copy.toBuilder(ret); diff --git a/arangod/Agency/StoreCallback.cpp b/arangod/Agency/StoreCallback.cpp new file mode 100644 index 0000000000..b62c4644cf --- /dev/null +++ b/arangod/Agency/StoreCallback.cpp @@ -0,0 +1,11 @@ +#include "StoreCallback.h" + +using namespace arangodb::consensus; +using namespace arangodb::velocypack; + +StoreCallback::StoreCallback() {} + +bool StoreCallback::operator()(arangodb::ClusterCommResult* res) { + return true; +} + diff --git a/arangod/Agency/StoreCallback.h b/arangod/Agency/StoreCallback.h new file mode 100644 index 0000000000..b281a9ee0a --- /dev/null +++ b/arangod/Agency/StoreCallback.h @@ -0,0 +1,48 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_STORE_CALLBACK__ +#define __ARANGODB_CONSENSUS_STORE_CALLBACK__ + +#include "Cluster/ClusterComm.h" + +namespace arangodb { +namespace consensus { + +class StoreCallback : public arangodb::ClusterCommCallback { + +public: + + StoreCallback(); + + virtual bool operator()(arangodb::ClusterCommResult*) override final; + + void shutdown(); + +private: + +}; + +}} // namespace + +#endif diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index f54478ce6d..0fd113b3c0 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -69,12 +69,13 @@ add_executable(${BIN_ARANGOD} Actions/RestActionHandler.cpp Actions/actions.cpp Agency/Agent.cpp + Agency/AgentCallback.cpp Agency/ApplicationAgency.cpp Agency/Constituent.cpp Agency/SanityCheck.cpp Agency/State.cpp Agency/Store.cpp - Agency/AgentCallback.cpp + Agency/StoreCallback.cpp ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationServer.cpp Aql/Aggregator.cpp diff --git a/arangod/RestHandler/RestAgencyHandler.cpp b/arangod/RestHandler/RestAgencyHandler.cpp index f9872542e9..992929882a 100644 --- a/arangod/RestHandler/RestAgencyHandler.cpp +++ b/arangod/RestHandler/RestAgencyHandler.cpp @@ -73,15 +73,23 @@ inline HttpHandler::status_t RestAgencyHandler::reportUnknownMethod() { void RestAgencyHandler::redirectRequest(id_t leaderId) { + /* std::shared_ptr ep ( Endpoint::clientFactory (_agent->config().end_points.at(leaderId))); std::stringstream url; - url << ep->transport() << "://" << ep->hostAndPort() - << _request->requestPath(); + + url << ep->transport() << "://"; + if (ep->encryption() == arangodb::Endpoint::EncryptionType::SSL) { + url << "s"; + } + url << ep->hostAndPort() << _request->requestPath(); + */ + + std::string url = Endpoint::uriForm(_agent->config().end_points.at(leaderId)); createResponse(GeneralResponse::ResponseCode::TEMPORARY_REDIRECT); static std::string const location = "location"; - _response->setHeaderNC(location, url.str()); + _response->setHeaderNC(location, url); } HttpHandler::status_t RestAgencyHandler::handleStores () { diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index 390e17e826..ec28f89d11 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -1232,6 +1232,26 @@ function startInstanceCluster(instanceInfo, protocol, options, httpOptions.method = 'POST'; httpOptions.returnBodyOnError = true; + let count = 0; + instanceInfo.arangods.forEach(arangod => { + while (true) { + const reply = download(arangod.url + "/_api/version", "", makeAuthorizationHeaders(options)); + + if (!reply.error && reply.code === 200) { + break; + } + + ++count; + + if (count % 60 === 0) { + if (!checkArangoAlive(arangod, options)) { + throw new Error("startup failed! bailing out!"); + } + } + wait(0.5, false); + } + }); + response = download(coordinatorUrl + '/_admin/cluster/bootstrapDbServers', '{"isRelaunch":false}', httpOptions); while (response.code !== 200) { @@ -1294,24 +1314,6 @@ function startArango(protocol, options, addArgs, name, rootDir) { const startTime = time(); instanceInfo.pid = executeValgrind(ARANGOD_BIN, toArgv(args), options, name).pid; - let count = 0; - while (true) { - wait(0.5, false); - - const reply = download(instanceInfo.url + "/_api/version", "", makeAuthorizationHeaders(options)); - - if (!reply.error && reply.code === 200) { - break; - } - - ++count; - - if (count % 60 === 0) { - if (!checkArangoAlive(instanceInfo, options)) { - throw new Error("startup failed! bailing out!"); - } - } - } if (platform.substr(0, 3) === 'win') { const procdumpArgs = [ '-accepteula', @@ -1371,6 +1373,7 @@ function startInstanceSingleServer(instanceInfo, protocol, options, addArgs, testname, rootDir) { instanceInfo.arangods.push(startArango(protocol, options, addArgs, testname, rootDir)); + instanceInfo.endpoint = instanceInfo.arangods[instanceInfo.arangods.length - 1].endpoint; instanceInfo.url = instanceInfo.arangods[instanceInfo.arangods.length - 1].url; @@ -1380,7 +1383,7 @@ function startInstanceSingleServer(instanceInfo, protocol, options, function startInstance(protocol, options, addArgs, testname, tmpDir) { let rootDir = fs.join(tmpDir || fs.getTempFile(), testname); let instanceInfo = {rootDir, arangods: []}; - + try { if (options.cluster) { startInstanceCluster(instanceInfo, protocol, options, @@ -1390,12 +1393,36 @@ function startInstance(protocol, options, addArgs, testname, tmpDir) { addArgs, testname, rootDir); } else { startInstanceSingleServer(instanceInfo, protocol, options, - addArgs, testname, rootDir); + addArgs, testname, rootDir); + } + + if (!options.cluster) { + let count = 0; + instanceInfo.arangods.forEach(arangod => { + while (true) { + const reply = download(arangod.url + "/_api/version", "", makeAuthorizationHeaders(options)); + + if (!reply.error && reply.code === 200) { + break; + } + + ++count; + + if (count % 60 === 0) { + if (!checkArangoAlive(arangod, options)) { + throw new Error("startup failed! bailing out!"); + } + } + wait(0.5, false); + } + }); } } catch (e) { print(e, e.stack); return false; } + + return instanceInfo; } diff --git a/lib/Endpoint/Endpoint.cpp b/lib/Endpoint/Endpoint.cpp index 092be6acfc..e60c0dec26 100644 --- a/lib/Endpoint/Endpoint.cpp +++ b/lib/Endpoint/Endpoint.cpp @@ -51,6 +51,27 @@ Endpoint::Endpoint(DomainType domainType, EndpointType type, TRI_invalidatesocket(&_socket); } + +std::string Endpoint::uriForm (std::string const& endpoint) { + + std::stringstream url; + size_t const prefix_len = 6; + + if (StringUtils::isPrefix(endpoint, "tcp://")) { + url << "http://"; + } else if (StringUtils::isPrefix(endpoint, "ssl://")) { + url << "https://"; + } else { + throw arangodb::basics::Exception ( + 0, std::string("malformed URL ") + endpoint + + ". Support only for ssl:// and tcp:// endpoints." , __FILE__, __LINE__); + } + + url << endpoint.substr(prefix_len,endpoint.size()+1-(prefix_len+1)); + + return url.str(); + +} //////////////////////////////////////////////////////////////////////////////// /// @brief return the endpoint specification in a unified form //////////////////////////////////////////////////////////////////////////////// @@ -68,9 +89,9 @@ std::string Endpoint::unifiedForm(std::string const& specification) { std::string copy = StringUtils::tolower(specification); StringUtils::trimInPlace(copy); - if (specification[specification.size() - 1] == '/') { + if (specification.back() == '/') { // address ends with a slash => remove - copy = copy.substr(0, copy.size() - 1); + copy.pop_back(); } // read protocol from string diff --git a/lib/Endpoint/Endpoint.h b/lib/Endpoint/Endpoint.h index 3e63d5acf1..d0c73c4f08 100644 --- a/lib/Endpoint/Endpoint.h +++ b/lib/Endpoint/Endpoint.h @@ -52,6 +52,7 @@ class Endpoint { virtual ~Endpoint() {} public: + static std::string uriForm(std::string const&); static std::string unifiedForm(std::string const&); static Endpoint* serverFactory(std::string const&, int, bool reuseAddress); static Endpoint* clientFactory(std::string const&);