mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'agency' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
c40a13f262
|
@ -103,11 +103,13 @@ struct AgentConfiguration {
|
||||||
std::string end_point_persist;
|
std::string end_point_persist;
|
||||||
bool notify;
|
bool notify;
|
||||||
bool sanity_check;
|
bool sanity_check;
|
||||||
|
bool wait_for_sync;
|
||||||
AgentConfiguration () : id(0), min_ping(.15), max_ping(.3f), notify(false) {};
|
AgentConfiguration () : id(0), min_ping(.15), max_ping(.3f), notify(false) {};
|
||||||
AgentConfiguration (uint32_t i, double min_p, double max_p, std::string ep,
|
AgentConfiguration (uint32_t i, double min_p, double max_p, std::string ep,
|
||||||
std::vector<std::string> const& eps, bool n = false, bool s = false) :
|
std::vector<std::string> const& eps, bool n = false,
|
||||||
|
bool s = false, bool w = true) :
|
||||||
id(i), min_ping(min_p), max_ping(max_p), end_point(ep), end_points(eps),
|
id(i), min_ping(min_p), max_ping(max_p), end_point(ep), end_points(eps),
|
||||||
notify(n), sanity_check(s) {
|
notify(n), sanity_check(s), wait_for_sync(w) {
|
||||||
end_point_persist = end_points[id];
|
end_point_persist = end_points[id];
|
||||||
}
|
}
|
||||||
inline size_t size() const {return end_points.size();}
|
inline size_t size() const {return end_points.size();}
|
||||||
|
|
|
@ -299,7 +299,8 @@ bool Agent::load () {
|
||||||
_vocbase = vocbase;
|
_vocbase = vocbase;
|
||||||
|
|
||||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state.";
|
LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state.";
|
||||||
if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry)) {
|
if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry,
|
||||||
|
_config.wait_for_sync)) {
|
||||||
LOG_TOPIC(WARN, Logger::AGENCY)
|
LOG_TOPIC(WARN, Logger::AGENCY)
|
||||||
<< "Failed to load persistent state on statup.";
|
<< "Failed to load persistent state on statup.";
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,8 @@ ApplicationAgency::ApplicationAgency(
|
||||||
_agent_id((std::numeric_limits<uint32_t>::max)()),
|
_agent_id((std::numeric_limits<uint32_t>::max)()),
|
||||||
_endpointServer(aes),
|
_endpointServer(aes),
|
||||||
_applicationV8(applicationV8),
|
_applicationV8(applicationV8),
|
||||||
_queryRegistry(queryRegistry) {
|
_queryRegistry(queryRegistry),
|
||||||
|
_wait_for_sync(true) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -75,7 +76,9 @@ void ApplicationAgency::setupOptions(
|
||||||
"to the minumum election timeout")
|
"to the minumum election timeout")
|
||||||
("agency.notify", &_notify, "Notify others")
|
("agency.notify", &_notify, "Notify others")
|
||||||
("agency.sanity-check", &_sanity_check,
|
("agency.sanity-check", &_sanity_check,
|
||||||
"Perform arangodb cluster sanity checking");
|
"Perform arangodb cluster sanity checking")
|
||||||
|
("agency.wait-for-sync", &_wait_for_sync,
|
||||||
|
"Wait for hard disk syncs on every persistence call (Must for production)");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,8 +155,8 @@ bool ApplicationAgency::prepare() {
|
||||||
new agent_t(
|
new agent_t(
|
||||||
_server, arangodb::consensus::config_t(
|
_server, arangodb::consensus::config_t(
|
||||||
_agent_id, _min_election_timeout, _max_election_timeout,
|
_agent_id, _min_election_timeout, _max_election_timeout,
|
||||||
endpoint, _agency_endpoints, _notify, _sanity_check), _applicationV8,
|
endpoint, _agency_endpoints, _notify, _sanity_check, _wait_for_sync),
|
||||||
_queryRegistry));
|
_applicationV8, _queryRegistry));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
|
|
|
@ -117,6 +117,8 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature {
|
||||||
ApplicationV8* _applicationV8;
|
ApplicationV8* _applicationV8;
|
||||||
aql::QueryRegistry* _queryRegistry;
|
aql::QueryRegistry* _queryRegistry;
|
||||||
|
|
||||||
|
bool _wait_for_sync;
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,6 +90,11 @@ config_t const& Constituent::config () const {
|
||||||
return _agent->config();
|
return _agent->config();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for sync
|
||||||
|
bool Constituent::waitForSync() const {
|
||||||
|
return _agent->config().wait_for_sync;
|
||||||
|
}
|
||||||
|
|
||||||
// Random sleep times in election process
|
// Random sleep times in election process
|
||||||
duration_t Constituent::sleepFor (double min_t, double max_t) {
|
duration_t Constituent::sleepFor (double min_t, double max_t) {
|
||||||
dist_t dis(min_t, max_t);
|
dist_t dis(min_t, max_t);
|
||||||
|
@ -132,7 +137,7 @@ void Constituent::term(term_t t) {
|
||||||
}
|
}
|
||||||
|
|
||||||
OperationOptions options;
|
OperationOptions options;
|
||||||
options.waitForSync = true;
|
options.waitForSync = waitForSync();
|
||||||
options.silent = true;
|
options.silent = true;
|
||||||
|
|
||||||
OperationResult result = trx.insert("election", body.slice(), options);
|
OperationResult result = trx.insert("election", body.slice(), options);
|
||||||
|
|
|
@ -129,6 +129,9 @@ private:
|
||||||
/// @brief Count my votes
|
/// @brief Count my votes
|
||||||
void countVotes();
|
void countVotes();
|
||||||
|
|
||||||
|
/// @brief Wait for sync
|
||||||
|
bool waitForSync () const;
|
||||||
|
|
||||||
/// @brief Notify everyone, that we are good to go.
|
/// @brief Notify everyone, that we are good to go.
|
||||||
/// This is the task of the last process starting up.
|
/// This is the task of the last process starting up.
|
||||||
/// Will be taken care of by gossip
|
/// Will be taken care of by gossip
|
||||||
|
|
|
@ -45,6 +45,10 @@ using namespace arangodb::velocypack;
|
||||||
using namespace arangodb::rest;
|
using namespace arangodb::rest;
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
|
|
||||||
|
bool State::waitForSync () const {
|
||||||
|
return _wait_for_sync;
|
||||||
|
}
|
||||||
|
|
||||||
State::State(std::string const& end_point)
|
State::State(std::string const& end_point)
|
||||||
: _vocbase(nullptr),
|
: _vocbase(nullptr),
|
||||||
_applicationV8(nullptr),
|
_applicationV8(nullptr),
|
||||||
|
@ -87,7 +91,7 @@ bool State::persist(index_t index, term_t term, id_t lid,
|
||||||
}
|
}
|
||||||
|
|
||||||
OperationOptions options;
|
OperationOptions options;
|
||||||
options.waitForSync = true;
|
options.waitForSync = waitForSync();
|
||||||
options.silent = true;
|
options.silent = true;
|
||||||
|
|
||||||
OperationResult result = trx.insert("log", body.slice(), options);
|
OperationResult result = trx.insert("log", body.slice(), options);
|
||||||
|
@ -221,10 +225,11 @@ bool State::createCollection(std::string const& name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool State::loadCollections(TRI_vocbase_t* vocbase, ApplicationV8* applicationV8,
|
bool State::loadCollections(TRI_vocbase_t* vocbase, ApplicationV8* applicationV8,
|
||||||
aql::QueryRegistry* queryRegistry) {
|
aql::QueryRegistry* queryRegistry, bool waitForSync) {
|
||||||
_vocbase = vocbase;
|
_vocbase = vocbase;
|
||||||
_applicationV8 = applicationV8;
|
_applicationV8 = applicationV8;
|
||||||
_queryRegistry = queryRegistry;
|
_queryRegistry = queryRegistry;
|
||||||
|
_wait_for_sync = waitForSync;
|
||||||
return loadCollection("log");
|
return loadCollection("log");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,12 +98,17 @@ public:
|
||||||
log_t const& lastLog () const;
|
log_t const& lastLog () const;
|
||||||
|
|
||||||
|
|
||||||
|
/// @brief Wait for sync?
|
||||||
|
bool waitForSync () const;
|
||||||
|
|
||||||
|
|
||||||
/// @brief Set endpoint
|
/// @brief Set endpoint
|
||||||
bool setEndPoint (std::string const&);
|
bool setEndPoint (std::string const&);
|
||||||
|
|
||||||
|
|
||||||
/// @brief Load persisted data from above or start with empty log
|
/// @brief Load persisted data from above or start with empty log
|
||||||
bool loadCollections (TRI_vocbase_t*, ApplicationV8*, aql::QueryRegistry*);
|
bool loadCollections (TRI_vocbase_t*, ApplicationV8*,
|
||||||
|
aql::QueryRegistry*, bool);
|
||||||
|
|
||||||
/// @brief Pipe to ostream
|
/// @brief Pipe to ostream
|
||||||
friend std::ostream& operator<< (std::ostream& os, State const& s) {
|
friend std::ostream& operator<< (std::ostream& os, State const& s) {
|
||||||
|
@ -149,6 +154,7 @@ private:
|
||||||
std::string _end_point; /**< @brief persistence end point */
|
std::string _end_point; /**< @brief persistence end point */
|
||||||
bool _collections_checked; /**< @brief Collections checked */
|
bool _collections_checked; /**< @brief Collections checked */
|
||||||
bool _collections_loaded;
|
bool _collections_loaded;
|
||||||
|
bool _wait_for_sync;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ build/bin/arangod -c etc/relative/arangod.conf \
|
||||||
--agency.size 1 \
|
--agency.size 1 \
|
||||||
--server.endpoint tcp://127.0.0.1:4001 \
|
--server.endpoint tcp://127.0.0.1:4001 \
|
||||||
--agency.endpoint tcp://127.0.0.1:4001 \
|
--agency.endpoint tcp://127.0.0.1:4001 \
|
||||||
|
--agency.wait-for-sync false \
|
||||||
--database.directory cluster/data4001 \
|
--database.directory cluster/data4001 \
|
||||||
--agency.id 0 \
|
--agency.id 0 \
|
||||||
--log.file cluster/4001.log \
|
--log.file cluster/4001.log \
|
||||||
|
|
Loading…
Reference in New Issue