mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of github.com:arangodb/arangodb into devel
This commit is contained in:
commit
2ea71da526
|
@ -105,6 +105,16 @@ std::string Node::uri() const {
|
||||||
return path.str();
|
return path.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Node::Node(Node&& other) :
|
||||||
|
_node_name(std::move(other._node_name)),
|
||||||
|
_children(std::move(other._children)),
|
||||||
|
_value(std::move(other._value)) {}
|
||||||
|
|
||||||
|
Node::Node(Node const& other) :
|
||||||
|
_node_name(other._node_name),
|
||||||
|
_children(other._children),
|
||||||
|
_value(other._value) {}
|
||||||
|
|
||||||
// Assignment of rhs slice
|
// Assignment of rhs slice
|
||||||
Node& Node::operator=(VPackSlice const& slice) {
|
Node& Node::operator=(VPackSlice const& slice) {
|
||||||
// 1. remove any existing time to live entry
|
// 1. remove any existing time to live entry
|
||||||
|
@ -119,17 +129,29 @@ Node& Node::operator=(VPackSlice const& slice) {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Assignment of rhs node
|
||||||
|
Node& Node::operator=(Node&& rhs) {
|
||||||
|
// 1. remove any existing time to live entry
|
||||||
|
// 2. copy children map
|
||||||
|
// 3. copy from rhs to buffer pointer
|
||||||
|
// Must not copy rhs's _parent, _ttl, _observers
|
||||||
|
removeTimeToLive();
|
||||||
|
_node_name = std::move(rhs._node_name);
|
||||||
|
_children = std::move(rhs._children);
|
||||||
|
_value = std::move(rhs._value);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
// Assignment of rhs node
|
// Assignment of rhs node
|
||||||
Node& Node::operator=(Node const& rhs) {
|
Node& Node::operator=(Node const& rhs) {
|
||||||
// 1. remove any existing time to live entry
|
// 1. remove any existing time to live entry
|
||||||
// 2. clear children map
|
// 2. clear children map
|
||||||
// 3. copy from rhs to buffer pointer
|
// 3. move from rhs to buffer pointer
|
||||||
// 4. inform all observers here and above
|
// Must not move rhs's _parent, _ttl, _observers
|
||||||
// Must not copy rhs's _parent, _ttl, _observers
|
|
||||||
removeTimeToLive();
|
removeTimeToLive();
|
||||||
_node_name = rhs._node_name;
|
_node_name = rhs._node_name;
|
||||||
_value = rhs._value;
|
|
||||||
_children = rhs._children;
|
_children = rhs._children;
|
||||||
|
_value = rhs._value;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,6 +88,9 @@ class Node {
|
||||||
/// @brief Construct with name
|
/// @brief Construct with name
|
||||||
explicit Node(std::string const& name);
|
explicit Node(std::string const& name);
|
||||||
|
|
||||||
|
Node(Node const& other);
|
||||||
|
Node(Node&& other);
|
||||||
|
|
||||||
/// @brief Construct with name and introduce to tree under parent
|
/// @brief Construct with name and introduce to tree under parent
|
||||||
Node(std::string const& name, Node* parent);
|
Node(std::string const& name, Node* parent);
|
||||||
|
|
||||||
|
@ -105,6 +108,7 @@ class Node {
|
||||||
|
|
||||||
/// @brief Apply rhs to this node (deep copy of rhs)
|
/// @brief Apply rhs to this node (deep copy of rhs)
|
||||||
Node& operator=(Node const& node);
|
Node& operator=(Node const& node);
|
||||||
|
Node& operator=(Node&& node);
|
||||||
|
|
||||||
/// @brief Apply value slice to this node
|
/// @brief Apply value slice to this node
|
||||||
Node& operator=(arangodb::velocypack::Slice const&);
|
Node& operator=(arangodb::velocypack::Slice const&);
|
||||||
|
|
|
@ -99,6 +99,36 @@ inline static bool endpointPathFromUrl(std::string const& url,
|
||||||
// Create with name
|
// Create with name
|
||||||
Store::Store(std::string const& name) : Thread(name), _node(name, this) {}
|
Store::Store(std::string const& name) : Thread(name), _node(name, this) {}
|
||||||
|
|
||||||
|
Store::Store(Store const& other) :
|
||||||
|
Thread(other._node.name()), _agent(other._agent), _timeTable(other._timeTable),
|
||||||
|
_observerTable(other._observerTable), _observedTable(other._observedTable),
|
||||||
|
_node(other._node) {}
|
||||||
|
|
||||||
|
Store::Store(Store&& other) :
|
||||||
|
Thread(other._node.name()), _agent(std::move(other._agent)),
|
||||||
|
_timeTable(std::move(other._timeTable)),
|
||||||
|
_observerTable(std::move(other._observerTable)),
|
||||||
|
_observedTable(std::move(other._observedTable)),
|
||||||
|
_node(std::move(other._node)) {}
|
||||||
|
|
||||||
|
Store& Store::operator=(Store const& rhs) {
|
||||||
|
_agent = rhs._agent;
|
||||||
|
_timeTable = rhs._timeTable;
|
||||||
|
_observerTable = rhs._observerTable;
|
||||||
|
_observedTable = rhs._observedTable;
|
||||||
|
_node = rhs._node;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
Store& Store::operator=(Store&& rhs) {
|
||||||
|
_agent = std::move(rhs._agent);
|
||||||
|
_timeTable = std::move(rhs._timeTable);
|
||||||
|
_observerTable = std::move(rhs._observerTable);
|
||||||
|
_observedTable = std::move(rhs._observedTable);
|
||||||
|
_node = std::move(rhs._node);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
// Default ctor
|
// Default ctor
|
||||||
Store::~Store() {}
|
Store::~Store() {}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,18 @@ class Store : public arangodb::Thread {
|
||||||
/// @brief Destruct
|
/// @brief Destruct
|
||||||
virtual ~Store();
|
virtual ~Store();
|
||||||
|
|
||||||
|
/// @brief Copy constructor
|
||||||
|
Store (Store const& other);
|
||||||
|
|
||||||
|
/// @brief Move constructor
|
||||||
|
Store (Store&& other);
|
||||||
|
|
||||||
|
// @brief Copy assignent
|
||||||
|
Store& operator= (Store const& rhs);
|
||||||
|
|
||||||
|
// @brief Move assigment
|
||||||
|
Store& operator= (Store&& rhs);
|
||||||
|
|
||||||
/// @brief Apply entry in query
|
/// @brief Apply entry in query
|
||||||
std::vector<bool> apply(query_t const& query);
|
std::vector<bool> apply(query_t const& query);
|
||||||
|
|
||||||
|
|
|
@ -29,11 +29,23 @@
|
||||||
#include "Basics/ConditionLocker.h"
|
#include "Basics/ConditionLocker.h"
|
||||||
#include "VocBase/server.h"
|
#include "VocBase/server.h"
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
namespace consensus {
|
namespace consensus {
|
||||||
|
|
||||||
|
std::string printTimestamp(Supervision::TimePoint const& t) {
|
||||||
|
time_t tt = std::chrono::system_clock::to_time_t(t);
|
||||||
|
struct tm tb;
|
||||||
|
size_t const len (21);
|
||||||
|
char buffer[len];
|
||||||
|
TRI_gmtime(tt, &tb);
|
||||||
|
::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb);
|
||||||
|
return std::string(buffer, len);
|
||||||
|
}
|
||||||
|
|
||||||
inline arangodb::consensus::write_ret_t makeReport(Agent* _agent,
|
inline arangodb::consensus::write_ret_t makeReport(Agent* _agent,
|
||||||
Builder const& report) {
|
Builder const& report) {
|
||||||
query_t envelope = std::make_shared<Builder>();
|
query_t envelope = std::make_shared<Builder>();
|
||||||
|
@ -48,17 +60,58 @@ inline arangodb::consensus::write_ret_t makeReport(Agent* _agent,
|
||||||
return _agent->write(envelope);
|
return _agent->write(envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::string const pendingPrefix = "/arango/Supervision/Jobs/Pending/";
|
static std::string const pendingPrefix = "/Supervision/Jobs/Pending/";
|
||||||
static std::string const collectionsPrefix = "/arango/Plan/Collections/";
|
static std::string const collectionsPrefix = "/Plan/Collections/";
|
||||||
|
static std::string const toDoPrefix = "/Target/ToDo";
|
||||||
|
|
||||||
struct FailedServerJob {
|
struct MoveShard : public Job {
|
||||||
FailedServerJob(Node const& snapshot, Agent* agent, uint64_t jobId,
|
|
||||||
std::string const& failed) {
|
MoveShard (std::string const& creator, std::string const& database,
|
||||||
|
std::string const& collection, std::string const& shard,
|
||||||
|
std::string const& fromServer, std::string const& toServer,
|
||||||
|
uint64_t const& jobId, std::string const& agencyPrefix,
|
||||||
|
Agent* agent) {
|
||||||
|
|
||||||
|
todoEntry (creator, database, collection, shard, fromServer, toServer,
|
||||||
|
jobId, agencyPrefix, agent);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void todoEntry (std::string const& creator, std::string const& database,
|
||||||
|
std::string const& collection, std::string const& shard,
|
||||||
|
std::string const& fromServer, std::string const& toServer,
|
||||||
|
uint64_t const& jobId, std::string const& agencyPrefix,
|
||||||
|
Agent* agent) {
|
||||||
|
Builder todo;
|
||||||
|
todo.openArray(); todo.openObject();
|
||||||
|
todo.add(VPackValue(agencyPrefix + toDoPrefix + "/"
|
||||||
|
+ std::to_string(jobId)));
|
||||||
|
{
|
||||||
|
VPackObjectBuilder entry(&todo);
|
||||||
|
todo.add("creator", VPackValue(creator));
|
||||||
|
todo.add("type", VPackValue("moveShard"));
|
||||||
|
todo.add("database", VPackValue(database));
|
||||||
|
todo.add("collection", VPackValue(collection));
|
||||||
|
todo.add("shard", VPackValue(shard));
|
||||||
|
todo.add("fromServer", VPackValue(fromServer));
|
||||||
|
todo.add("toServer", VPackValue(toServer));
|
||||||
|
}
|
||||||
|
todo.close(); todo.close();
|
||||||
|
write_ret_t ret = makeReport(agent, todo);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
struct FailedServer : public Job {
|
||||||
|
FailedServer(Node const& snapshot, Agent* agent, uint64_t jobId,
|
||||||
|
std::string const& failed, std::string agencyPrefix) {
|
||||||
// 1. find all shards in plan, where failed was leader.
|
// 1. find all shards in plan, where failed was leader.
|
||||||
// 2. swap positions in plan between failed and a random in sync follower
|
// 2. swap positions in plan between failed and a random in sync follower
|
||||||
|
|
||||||
Node::Children const& databases =
|
Node::Children const& databases =
|
||||||
snapshot("/arango/Plan/Collections").children();
|
snapshot("/Plan/Collections").children();
|
||||||
|
|
||||||
for (auto const& database : databases) {
|
for (auto const& database : databases) {
|
||||||
for (auto const& collptr : database.second->children()) {
|
for (auto const& collptr : database.second->children()) {
|
||||||
|
@ -72,22 +125,25 @@ struct FailedServerJob {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
reportJobInSupervision(jobId, shard, failed);
|
//MoveShard ()
|
||||||
planChanges(collptr, database, shard);
|
reportJobInSupervision(jobId, shard, failed, agencyPrefix);
|
||||||
|
planChanges(collptr, database, shard, agencyPrefix);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void reportJobInSupervision(
|
void reportJobInSupervision(uint64_t jobId,
|
||||||
uint64_t jobId,
|
std::pair<std::string,
|
||||||
std::pair<std::string, std::shared_ptr<Node>> const& shard,
|
std::shared_ptr<Node>> const& shard,
|
||||||
std::string const& serverID) {
|
std::string const& serverID,
|
||||||
|
std::string const& agencyPrefix) {
|
||||||
|
|
||||||
std::string const& shardId = shard.first;
|
std::string const& shardId = shard.first;
|
||||||
VPackSlice const& dbservers = shard.second->slice();
|
VPackSlice const& dbservers = shard.second->slice();
|
||||||
std::string path =
|
std::string path = agencyPrefix + pendingPrefix
|
||||||
pendingPrefix + arangodb::basics::StringUtils::itoa(jobId);
|
+ arangodb::basics::StringUtils::itoa(jobId);
|
||||||
query_t envelope = std::make_shared<Builder>();
|
query_t envelope = std::make_shared<Builder>();
|
||||||
|
|
||||||
Builder report;
|
Builder report;
|
||||||
|
@ -113,17 +169,16 @@ struct FailedServerJob {
|
||||||
report.close();
|
report.close();
|
||||||
// makeReport(envelope, report);
|
// makeReport(envelope, report);
|
||||||
|
|
||||||
LOG(WARN) << report.toJson();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void planChanges(
|
void planChanges(
|
||||||
std::pair<std::string, std::shared_ptr<Node>> const& database,
|
std::pair<std::string, std::shared_ptr<Node>> const& database,
|
||||||
std::pair<std::string, std::shared_ptr<Node>> const& collection,
|
std::pair<std::string, std::shared_ptr<Node>> const& collection,
|
||||||
std::pair<std::string, std::shared_ptr<Node>> const& shard) {
|
std::pair<std::string, std::shared_ptr<Node>> const& shard,
|
||||||
std::string path = collectionsPrefix + database.first + "/" +
|
std::string const& agencyPrefix) {
|
||||||
|
std::string path = agencyPrefix + collectionsPrefix + database.first + "/" +
|
||||||
collection.first + "/shards/" + shard.first;
|
collection.first + "/shards/" + shard.first;
|
||||||
|
|
||||||
LOG(WARN) << path;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -131,6 +186,8 @@ struct FailedServerJob {
|
||||||
|
|
||||||
using namespace arangodb::consensus;
|
using namespace arangodb::consensus;
|
||||||
|
|
||||||
|
std::string Supervision::_agencyPrefix = "/arango";
|
||||||
|
|
||||||
Supervision::Supervision()
|
Supervision::Supervision()
|
||||||
: arangodb::Thread("Supervision"),
|
: arangodb::Thread("Supervision"),
|
||||||
_agent(nullptr),
|
_agent(nullptr),
|
||||||
|
@ -144,22 +201,14 @@ Supervision::~Supervision() { shutdown(); };
|
||||||
|
|
||||||
void Supervision::wakeUp() {
|
void Supervision::wakeUp() {
|
||||||
TRI_ASSERT(_agent != nullptr);
|
TRI_ASSERT(_agent != nullptr);
|
||||||
_snapshot = _agent->readDB().get("/");
|
_snapshot = _agent->readDB().get(_agencyPrefix);
|
||||||
_cv.signal();
|
_cv.signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string printTimestamp(Supervision::TimePoint const& t) {
|
static std::string const syncPrefix = "/Sync/ServerStates/";
|
||||||
time_t tt = std::chrono::system_clock::to_time_t(t);
|
static std::string const supervisionPrefix = "/Supervision/Health/";
|
||||||
struct tm tb;
|
static std::string const planDBServersPrefix = "/Plan/DBServers";
|
||||||
char buffer[21];
|
|
||||||
TRI_gmtime(tt, &tb);
|
|
||||||
size_t len = ::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb);
|
|
||||||
return std::string(buffer, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
static std::string const syncPrefix = "/arango/Sync/ServerStates/";
|
|
||||||
static std::string const supervisionPrefix = "/arango/Supervision/Health/";
|
|
||||||
static std::string const planDBServersPrefix = "/arango/Plan/DBServers";
|
|
||||||
std::vector<check_t> Supervision::checkDBServers() {
|
std::vector<check_t> Supervision::checkDBServers() {
|
||||||
std::vector<check_t> ret;
|
std::vector<check_t> ret;
|
||||||
Node::Children const& machinesPlanned =
|
Node::Children const& machinesPlanned =
|
||||||
|
@ -179,7 +228,7 @@ std::vector<check_t> Supervision::checkDBServers() {
|
||||||
report->openArray();
|
report->openArray();
|
||||||
report->openArray();
|
report->openArray();
|
||||||
report->openObject();
|
report->openObject();
|
||||||
report->add(supervisionPrefix + serverID,
|
report->add(_agencyPrefix + supervisionPrefix + serverID,
|
||||||
VPackValue(VPackValueType::Object));
|
VPackValue(VPackValueType::Object));
|
||||||
report->add("LastHearbeatReceived",
|
report->add("LastHearbeatReceived",
|
||||||
VPackValue(printTimestamp(it->second->myTimestamp)));
|
VPackValue(printTimestamp(it->second->myTimestamp)));
|
||||||
|
@ -194,8 +243,8 @@ std::vector<check_t> Supervision::checkDBServers() {
|
||||||
if (t.count() > _gracePeriod) { // Failure
|
if (t.count() > _gracePeriod) { // Failure
|
||||||
if (it->second->maintenance() == 0) {
|
if (it->second->maintenance() == 0) {
|
||||||
it->second->maintenance(TRI_NewTickServer());
|
it->second->maintenance(TRI_NewTickServer());
|
||||||
FailedServerJob fsj(_snapshot, _agent, it->second->maintenance(),
|
FailedServer fsj(_snapshot, _agent, it->second->maintenance(),
|
||||||
serverID);
|
serverID, _agencyPrefix);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,7 +268,6 @@ std::vector<check_t> Supervision::checkDBServers() {
|
||||||
auto itr = _vitalSigns.begin();
|
auto itr = _vitalSigns.begin();
|
||||||
while (itr != _vitalSigns.end()) {
|
while (itr != _vitalSigns.end()) {
|
||||||
if (machinesPlanned.find(itr->first) == machinesPlanned.end()) {
|
if (machinesPlanned.find(itr->first) == machinesPlanned.end()) {
|
||||||
LOG(WARN) << itr->first << " shut down!";
|
|
||||||
itr = _vitalSigns.erase(itr);
|
itr = _vitalSigns.erase(itr);
|
||||||
} else {
|
} else {
|
||||||
++itr;
|
++itr;
|
||||||
|
@ -229,16 +277,12 @@ std::vector<check_t> Supervision::checkDBServers() {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Supervision::moveShard(std::string const& from, std::string const& to) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Supervision::doChecks(bool timedout) {
|
bool Supervision::doChecks(bool timedout) {
|
||||||
if (_agent == nullptr) {
|
if (_agent == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
_snapshot = _agent->readDB().get("/");
|
_snapshot = _agent->readDB().get(_agencyPrefix);
|
||||||
|
|
||||||
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sanity checks";
|
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sanity checks";
|
||||||
/*std::vector<check_t> ret = */checkDBServers();
|
/*std::vector<check_t> ret = */checkDBServers();
|
||||||
|
@ -247,27 +291,49 @@ bool Supervision::doChecks(bool timedout) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Supervision::run() {
|
void Supervision::run() {
|
||||||
|
|
||||||
CONDITION_LOCKER(guard, _cv);
|
CONDITION_LOCKER(guard, _cv);
|
||||||
TRI_ASSERT(_agent != nullptr);
|
TRI_ASSERT(_agent != nullptr);
|
||||||
bool timedout = false;
|
bool timedout = false;
|
||||||
|
|
||||||
while (!this->isStopping()) {
|
while (!this->isStopping()) {
|
||||||
|
|
||||||
|
// Get agency prefix after cluster init
|
||||||
|
if (_jobId == 0) {
|
||||||
|
if (!updateAgencyPrefix(10)) {
|
||||||
|
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||||
|
<< "Cannot get prefix from Agency. Stopping supervision for good.";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get bunch of job IDs from agency for future jobs
|
||||||
|
if (_jobId == 0 || _jobId == _jobIdMax) {
|
||||||
|
if (!getUniqueIds()) {
|
||||||
|
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||||
|
<< "Cannot get unique IDs from Agency. Stopping supervision for good.";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
MoveShard ("coordinator1", "_system", "41", "s42", "DBServer1",
|
||||||
|
"DBServer2", _jobId++, _agencyPrefix, _agent);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait unless leader
|
||||||
if (_agent->leading()) {
|
if (_agent->leading()) {
|
||||||
timedout = _cv.wait(_frequency * 1000000); // quarter second
|
timedout = _cv.wait(_frequency * 1000000); // quarter second
|
||||||
} else {
|
} else {
|
||||||
_cv.wait();
|
_cv.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_jobId == 0 || _jobId == _jobIdMax) {
|
// Do supervision
|
||||||
if (!getUniqueIds()) {
|
doChecks(timedout);
|
||||||
LOG_TOPIC(ERR, Logger::AGENCY) << "Cannot get unique IDs from Agency. "
|
|
||||||
"Stopping supervision for good.";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
doChecks(timedout);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start thread
|
// Start thread
|
||||||
|
@ -280,20 +346,37 @@ bool Supervision::start() {
|
||||||
bool Supervision::start(Agent* agent) {
|
bool Supervision::start(Agent* agent) {
|
||||||
_agent = agent;
|
_agent = agent;
|
||||||
_frequency = static_cast<long>(_agent->config().supervisionFrequency);
|
_frequency = static_cast<long>(_agent->config().supervisionFrequency);
|
||||||
_snapshot = _agent->readDB().get("/");
|
|
||||||
|
|
||||||
updateFromAgency();
|
|
||||||
|
|
||||||
return start();
|
return start();
|
||||||
}
|
}
|
||||||
|
|
||||||
#include <iostream>
|
// Get agency prefix fron agency
|
||||||
|
bool Supervision::updateAgencyPrefix (size_t nTries, int intervalSec) {
|
||||||
|
|
||||||
|
// Try nTries to get agency's prefix in intervals
|
||||||
|
for (size_t i = 0; i < nTries; i++) {
|
||||||
|
_snapshot = _agent->readDB().get("/");
|
||||||
|
if (_snapshot.children().size() > 0) {
|
||||||
|
_agencyPrefix = _snapshot.children().begin()->first;
|
||||||
|
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Agency prefix is " << _agencyPrefix;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for (std::chrono::seconds(intervalSec));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stand-alone agency
|
||||||
|
return false;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string const syncLatest = "/Sync/LatestID";
|
||||||
|
// Get bunch of cluster's unique ids from agency
|
||||||
bool Supervision::getUniqueIds() {
|
bool Supervision::getUniqueIds() {
|
||||||
uint64_t latestId;
|
uint64_t latestId;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
latestId = std::stoul(
|
latestId = std::stoul(
|
||||||
_agent->readDB().get("/arango/Sync/LatestID").slice().toJson());
|
_agent->readDB().get(_agencyPrefix + "/Sync/LatestID").slice().toJson());
|
||||||
} catch (std::exception const& e) {
|
} catch (std::exception const& e) {
|
||||||
LOG(WARN) << e.what();
|
LOG(WARN) << e.what();
|
||||||
return false;
|
return false;
|
||||||
|
@ -304,11 +387,10 @@ bool Supervision::getUniqueIds() {
|
||||||
Builder uniq;
|
Builder uniq;
|
||||||
uniq.openArray();
|
uniq.openArray();
|
||||||
uniq.openObject();
|
uniq.openObject();
|
||||||
uniq.add("/arango/Sync/LatestID",
|
uniq.add(_agencyPrefix + syncLatest, VPackValue(latestId + 100000)); // new
|
||||||
VPackValue(latestId + 100000)); // new val
|
|
||||||
uniq.close();
|
uniq.close();
|
||||||
uniq.openObject();
|
uniq.openObject();
|
||||||
uniq.add("/arango/Sync/LatestID", VPackValue(latestId)); // precond
|
uniq.add(_agencyPrefix + syncLatest, VPackValue(latestId)); // precond
|
||||||
uniq.close();
|
uniq.close();
|
||||||
uniq.close();
|
uniq.close();
|
||||||
|
|
||||||
|
@ -321,7 +403,7 @@ bool Supervision::getUniqueIds() {
|
||||||
}
|
}
|
||||||
|
|
||||||
latestId = std::stoul(
|
latestId = std::stoul(
|
||||||
_agent->readDB().get("/arango/Sync/LatestID").slice().toJson());
|
_agent->readDB().get(_agencyPrefix + "/Sync/LatestID").slice().toJson());
|
||||||
}
|
}
|
||||||
|
|
||||||
return success;
|
return success;
|
||||||
|
@ -329,12 +411,13 @@ bool Supervision::getUniqueIds() {
|
||||||
|
|
||||||
void Supervision::updateFromAgency() {
|
void Supervision::updateFromAgency() {
|
||||||
auto const& jobsPending =
|
auto const& jobsPending =
|
||||||
_snapshot("/arango/Supervision/Jobs/Pending").children();
|
_snapshot("/Supervision/Jobs/Pending").children();
|
||||||
|
|
||||||
for (auto const& jobent : jobsPending) {
|
for (auto const& jobent : jobsPending) {
|
||||||
auto const& job = *(jobent.second);
|
auto const& job = *(jobent.second);
|
||||||
|
|
||||||
LOG(WARN) << job.name() << " " << job("failed").toJson() << job("");
|
LOG_TOPIC(WARN, Logger::AGENCY)
|
||||||
|
<< job.name() << " " << job("failed").toJson() << job("");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,4 +426,6 @@ void Supervision::beginShutdown() {
|
||||||
Thread::beginShutdown();
|
Thread::beginShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
Store const& Supervision::store() const { return _agent->readDB(); }
|
Store const& Supervision::store() const {
|
||||||
|
return _agent->readDB();
|
||||||
|
}
|
||||||
|
|
|
@ -50,10 +50,6 @@ struct Job {
|
||||||
Job() {}
|
Job() {}
|
||||||
~Job() {}
|
~Job() {}
|
||||||
};
|
};
|
||||||
struct FailedServersJob : public Job {
|
|
||||||
FailedServersJob();
|
|
||||||
~FailedServersJob();
|
|
||||||
};
|
|
||||||
|
|
||||||
struct check_t {
|
struct check_t {
|
||||||
bool good;
|
bool good;
|
||||||
|
@ -128,6 +124,10 @@ class Supervision : public arangodb::Thread {
|
||||||
void wakeUp();
|
void wakeUp();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
/// @brief Update agency prefix from agency itself
|
||||||
|
bool updateAgencyPrefix (size_t nTries = 10, int intervalSec = 1);
|
||||||
|
|
||||||
/// @brief Move shard from one db server to other db server
|
/// @brief Move shard from one db server to other db server
|
||||||
bool moveShard(std::string const& from, std::string const& to);
|
bool moveShard(std::string const& from, std::string const& to);
|
||||||
|
|
||||||
|
@ -167,6 +167,8 @@ class Supervision : public arangodb::Thread {
|
||||||
long _gracePeriod;
|
long _gracePeriod;
|
||||||
long _jobId;
|
long _jobId;
|
||||||
long _jobIdMax;
|
long _jobIdMax;
|
||||||
|
|
||||||
|
static std::string _agencyPrefix;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -579,14 +579,13 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
builder.add(VPackValue("Target"));
|
builder.add(VPackValue("Target"));
|
||||||
{
|
{
|
||||||
VPackObjectBuilder c(&builder);
|
VPackObjectBuilder c(&builder);
|
||||||
addEmptyVPackObject("Coordinators", builder);
|
|
||||||
builder.add(VPackValue("Collections"));
|
builder.add(VPackValue("Collections"));
|
||||||
{
|
{
|
||||||
VPackObjectBuilder d(&builder);
|
VPackObjectBuilder d(&builder);
|
||||||
addEmptyVPackObject("_system", builder);
|
addEmptyVPackObject("_system", builder);
|
||||||
}
|
}
|
||||||
builder.add("Version", VPackValue(1));
|
addEmptyVPackObject("Coordinators", builder);
|
||||||
addEmptyVPackObject("MapLocalToID", builder);
|
addEmptyVPackObject("DBServers", builder);
|
||||||
builder.add(VPackValue("Databases"));
|
builder.add(VPackValue("Databases"));
|
||||||
{
|
{
|
||||||
VPackObjectBuilder d(&builder);
|
VPackObjectBuilder d(&builder);
|
||||||
|
@ -597,19 +596,20 @@ bool AgencyComm::tryInitializeStructure() {
|
||||||
builder.add("id", VPackValue("1"));
|
builder.add("id", VPackValue("1"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addEmptyVPackObject("DBServers", builder);
|
|
||||||
builder.add("Lock", VPackValue("UNLOCKED"));
|
builder.add("Lock", VPackValue("UNLOCKED"));
|
||||||
|
addEmptyVPackObject("MapLocalToID", builder);
|
||||||
|
addEmptyVPackObject("Failed", builder);
|
||||||
|
addEmptyVPackObject("Finished", builder);
|
||||||
|
addEmptyVPackObject("Pending", builder);
|
||||||
|
addEmptyVPackObject("ToDo", builder);
|
||||||
|
builder.add("Version", VPackValue(1));
|
||||||
}
|
}
|
||||||
builder.add(VPackValue("Supervision"));
|
builder.add(VPackValue("Supervision"));
|
||||||
{
|
{
|
||||||
VPackObjectBuilder c(&builder);
|
VPackObjectBuilder c(&builder);
|
||||||
builder.add(VPackValue("Jobs"));
|
addEmptyVPackObject("Health", builder);
|
||||||
{
|
addEmptyVPackObject("Shards", builder);
|
||||||
VPackObjectBuilder d(&builder);
|
addEmptyVPackObject("DBServers", builder);
|
||||||
addEmptyVPackObject("Pending", builder);
|
|
||||||
addEmptyVPackObject("Finished", builder);
|
|
||||||
addEmptyVPackObject("Failed", builder);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
builder.add("InitDone", VPackValue(true));
|
builder.add("InitDone", VPackValue(true));
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
|
|
@ -55,55 +55,25 @@ Dispatcher::~Dispatcher() {
|
||||||
/// @brief adds the standard queue
|
/// @brief adds the standard queue
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void Dispatcher::addStandardQueue(size_t nrThreads, size_t maxSize) {
|
void Dispatcher::addStandardQueue(size_t nrThreads, size_t nrExtraThreads,
|
||||||
|
size_t maxSize) {
|
||||||
TRI_ASSERT(_queues[STANDARD_QUEUE] == nullptr);
|
TRI_ASSERT(_queues[STANDARD_QUEUE] == nullptr);
|
||||||
|
|
||||||
_queues[STANDARD_QUEUE] =
|
_queues[STANDARD_QUEUE] =
|
||||||
new DispatcherQueue(_scheduler, this, STANDARD_QUEUE,
|
new DispatcherQueue(_scheduler, this, STANDARD_QUEUE,
|
||||||
CreateDispatcherThread, nrThreads, maxSize);
|
CreateDispatcherThread, nrThreads, nrExtraThreads, maxSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief adds the AQL queue (used for the cluster)
|
/// @brief adds the AQL queue (used for the cluster)
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void Dispatcher::addAQLQueue(size_t nrThreads, size_t maxSize) {
|
void Dispatcher::addAQLQueue(size_t nrThreads, size_t nrExtraThreads,
|
||||||
|
size_t maxSize) {
|
||||||
TRI_ASSERT(_queues[AQL_QUEUE] == nullptr);
|
TRI_ASSERT(_queues[AQL_QUEUE] == nullptr);
|
||||||
|
|
||||||
_queues[AQL_QUEUE] = new DispatcherQueue(
|
_queues[AQL_QUEUE] = new DispatcherQueue(
|
||||||
_scheduler, this, AQL_QUEUE, CreateDispatcherThread, nrThreads, maxSize);
|
_scheduler, this, AQL_QUEUE, CreateDispatcherThread, nrThreads, nrExtraThreads, maxSize);
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief starts a new named queue
|
|
||||||
///
|
|
||||||
/// This is not thread safe. Only used during initialization.
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int Dispatcher::addExtraQueue(size_t identifier, size_t nrThreads,
|
|
||||||
size_t maxSize) {
|
|
||||||
if (identifier == 0) {
|
|
||||||
return TRI_ERROR_QUEUE_ALREADY_EXISTS;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t n = identifier + (SYSTEM_QUEUE_SIZE - 1);
|
|
||||||
|
|
||||||
if (_queues.size() <= n) {
|
|
||||||
_queues.resize(n + 1, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_queues[n] != nullptr) {
|
|
||||||
return TRI_ERROR_QUEUE_ALREADY_EXISTS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_stopping != 0) {
|
|
||||||
return TRI_ERROR_DISPATCHER_IS_STOPPING;
|
|
||||||
}
|
|
||||||
|
|
||||||
_queues[n] = new DispatcherQueue(_scheduler, this, n, CreateDispatcherThread,
|
|
||||||
nrThreads, maxSize);
|
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -78,19 +78,13 @@ class Dispatcher {
|
||||||
/// @brief adds a new queue
|
/// @brief adds a new queue
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void addStandardQueue(size_t nrThreads, size_t maxSize);
|
void addStandardQueue(size_t nrThreads, size_t nrExtraThreads, size_t maxSize);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief adds a new AQL queue
|
/// @brief adds a new AQL queue
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void addAQLQueue(size_t nrThreads, size_t maxSize);
|
void addAQLQueue(size_t nrThreads, size_t nrExtraThreads, size_t maxSize);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief starts a new named queue
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int addExtraQueue(size_t identifier, size_t nrThreads, size_t maxSize);
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief adds a new job
|
/// @brief adds a new job
|
||||||
|
|
|
@ -43,6 +43,7 @@ DispatcherFeature::DispatcherFeature(
|
||||||
application_features::ApplicationServer* server)
|
application_features::ApplicationServer* server)
|
||||||
: ApplicationFeature(server, "Dispatcher"),
|
: ApplicationFeature(server, "Dispatcher"),
|
||||||
_nrStandardThreads(0),
|
_nrStandardThreads(0),
|
||||||
|
_nrExtraThreads(0),
|
||||||
_nrAqlThreads(0),
|
_nrAqlThreads(0),
|
||||||
_queueSize(16384),
|
_queueSize(16384),
|
||||||
_dispatcher(nullptr) {
|
_dispatcher(nullptr) {
|
||||||
|
@ -64,11 +65,15 @@ void DispatcherFeature::collectOptions(
|
||||||
options->addSection("server", "Server features");
|
options->addSection("server", "Server features");
|
||||||
|
|
||||||
options->addOption("--server.threads",
|
options->addOption("--server.threads",
|
||||||
"number of threads for basic operations",
|
"number of threads for basic operations (0 = automatic)",
|
||||||
new UInt64Parameter(&_nrStandardThreads));
|
new UInt64Parameter(&_nrStandardThreads));
|
||||||
|
|
||||||
|
options->addHiddenOption("--server.extra-threads",
|
||||||
|
"number of extra threads that can additionally be created when all regular threads are blocked and the client requests thread creation",
|
||||||
|
new UInt64Parameter(&_nrExtraThreads));
|
||||||
|
|
||||||
options->addHiddenOption("--server.aql-threads",
|
options->addHiddenOption("--server.aql-threads",
|
||||||
"number of threads for basic operations",
|
"number of threads for basic operations (0 = automatic)",
|
||||||
new UInt64Parameter(&_nrAqlThreads));
|
new UInt64Parameter(&_nrAqlThreads));
|
||||||
|
|
||||||
options->addHiddenOption("--server.maximal-queue-size",
|
options->addHiddenOption("--server.maximal-queue-size",
|
||||||
|
@ -95,6 +100,10 @@ void DispatcherFeature::validateOptions(std::shared_ptr<ProgramOptions>) {
|
||||||
|
|
||||||
TRI_ASSERT(_nrAqlThreads >= 1);
|
TRI_ASSERT(_nrAqlThreads >= 1);
|
||||||
|
|
||||||
|
if (_nrExtraThreads == 0) {
|
||||||
|
_nrExtraThreads = _nrStandardThreads;
|
||||||
|
}
|
||||||
|
|
||||||
if (_queueSize <= 128) {
|
if (_queueSize <= 128) {
|
||||||
LOG(FATAL)
|
LOG(FATAL)
|
||||||
<< "invalid value for `--server.maximal-queue-size', need at least 128";
|
<< "invalid value for `--server.maximal-queue-size', need at least 128";
|
||||||
|
@ -161,14 +170,18 @@ void DispatcherFeature::buildStandardQueue() {
|
||||||
LOG_TOPIC(DEBUG, Logger::STARTUP) << "setting up a standard queue with "
|
LOG_TOPIC(DEBUG, Logger::STARTUP) << "setting up a standard queue with "
|
||||||
<< _nrStandardThreads << " threads";
|
<< _nrStandardThreads << " threads";
|
||||||
|
|
||||||
_dispatcher->addStandardQueue(static_cast<size_t>(_nrStandardThreads), static_cast<size_t>(_queueSize));
|
_dispatcher->addStandardQueue(static_cast<size_t>(_nrStandardThreads),
|
||||||
|
static_cast<size_t>(_nrExtraThreads),
|
||||||
|
static_cast<size_t>(_queueSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
void DispatcherFeature::buildAqlQueue() {
|
void DispatcherFeature::buildAqlQueue() {
|
||||||
LOG_TOPIC(DEBUG, Logger::STARTUP) << "setting up the AQL standard queue with "
|
LOG_TOPIC(DEBUG, Logger::STARTUP) << "setting up the AQL standard queue with "
|
||||||
<< _nrAqlThreads << " threads";
|
<< _nrAqlThreads << " threads";
|
||||||
|
|
||||||
_dispatcher->addAQLQueue(static_cast<size_t>(_nrAqlThreads), static_cast<size_t>(_queueSize));
|
_dispatcher->addAQLQueue(static_cast<size_t>(_nrAqlThreads),
|
||||||
|
static_cast<size_t>(_nrExtraThreads),
|
||||||
|
static_cast<size_t>(_queueSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
void DispatcherFeature::setProcessorAffinity(std::vector<size_t> const& cores) {
|
void DispatcherFeature::setProcessorAffinity(std::vector<size_t> const& cores) {
|
||||||
|
|
|
@ -52,6 +52,7 @@ class DispatcherFeature final
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64_t _nrStandardThreads;
|
uint64_t _nrStandardThreads;
|
||||||
|
uint64_t _nrExtraThreads;
|
||||||
uint64_t _nrAqlThreads;
|
uint64_t _nrAqlThreads;
|
||||||
uint64_t _queueSize;
|
uint64_t _queueSize;
|
||||||
|
|
||||||
|
|
|
@ -38,9 +38,10 @@ using namespace arangodb::rest;
|
||||||
DispatcherQueue::DispatcherQueue(Scheduler* scheduler, Dispatcher* dispatcher,
|
DispatcherQueue::DispatcherQueue(Scheduler* scheduler, Dispatcher* dispatcher,
|
||||||
size_t id,
|
size_t id,
|
||||||
Dispatcher::newDispatcherThread_fptr creator,
|
Dispatcher::newDispatcherThread_fptr creator,
|
||||||
size_t nrThreads, size_t maxSize)
|
size_t nrThreads, size_t nrExtra, size_t maxSize)
|
||||||
: _id(id),
|
: _id(id),
|
||||||
_nrThreads(nrThreads),
|
_nrThreads(nrThreads),
|
||||||
|
_nrExtra(nrExtra),
|
||||||
_maxSize(maxSize),
|
_maxSize(maxSize),
|
||||||
_waitLock(),
|
_waitLock(),
|
||||||
_readyJobs(maxSize),
|
_readyJobs(maxSize),
|
||||||
|
@ -125,7 +126,7 @@ int DispatcherQueue::addJob(std::unique_ptr<Job>& job, bool startThread) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// if all threads are blocked, start a new one - we ignore race conditions
|
// if all threads are blocked, start a new one - we ignore race conditions
|
||||||
else if (startThread || notEnoughThreads()) {
|
else if (notEnoughThreads()) {
|
||||||
startQueueThread(startThread);
|
startQueueThread(startThread);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,7 +444,12 @@ bool DispatcherQueue::notEnoughThreads() {
|
||||||
size_t nrRunning = _nrRunning.load(std::memory_order_relaxed);
|
size_t nrRunning = _nrRunning.load(std::memory_order_relaxed);
|
||||||
size_t nrBlocked = (size_t)_nrBlocked.load(std::memory_order_relaxed);
|
size_t nrBlocked = (size_t)_nrBlocked.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
return nrRunning <= _nrThreads - 1 || nrRunning <= nrBlocked;
|
if (nrRunning + nrBlocked >= _nrThreads + _nrExtra) {
|
||||||
|
// we have reached the absolute maximum capacity
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nrRunning <= (_nrThreads + _nrExtra - 1) || nrRunning <= nrBlocked;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -56,7 +56,7 @@ class DispatcherQueue {
|
||||||
|
|
||||||
DispatcherQueue(Scheduler*, Dispatcher*, size_t id,
|
DispatcherQueue(Scheduler*, Dispatcher*, size_t id,
|
||||||
Dispatcher::newDispatcherThread_fptr, size_t nrThreads,
|
Dispatcher::newDispatcherThread_fptr, size_t nrThreads,
|
||||||
size_t maxSize);
|
size_t nrExtra, size_t maxSize);
|
||||||
|
|
||||||
~DispatcherQueue();
|
~DispatcherQueue();
|
||||||
|
|
||||||
|
@ -154,12 +154,22 @@ class DispatcherQueue {
|
||||||
/// @brief total number of threads
|
/// @brief total number of threads
|
||||||
///
|
///
|
||||||
/// This number is fixed. It is the number of pre-configured threads from the
|
/// This number is fixed. It is the number of pre-configured threads from the
|
||||||
/// configuration file and is the initial number of threads started. The
|
/// configuration file and is the average number of threads running under
|
||||||
/// dispatcher queues will try to have at least this many running threads.
|
/// normal condition. Note that at server start not all threads will be
|
||||||
|
/// started instantly, as threads will be created on demand.
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
size_t const _nrThreads;
|
size_t const _nrThreads;
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief total number of extra/overhead threads
|
||||||
|
///
|
||||||
|
/// This number is fixed. It is the maximum number of extra threads that can
|
||||||
|
/// be created if _nrThreads threads have already been created
|
||||||
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
size_t const _nrExtra;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief maximum queue size (number of jobs)
|
/// @brief maximum queue size (number of jobs)
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -38,12 +38,7 @@ var cluster = require("@arangodb/cluster");
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
function databasePrefix (req, url) {
|
function databasePrefix (req, url) {
|
||||||
if (req.hasOwnProperty('compatibility') && req.compatibility < 10400) {
|
// location response (e.g. /_db/dbname/_api/collection/xyz)
|
||||||
// pre 1.4-style location response (e.g. /_api/collection/xyz)
|
|
||||||
return url;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1.4-style location response (e.g. /_db/dbname/_api/collection/xyz)
|
|
||||||
return "/_db/" + encodeURIComponent(arangodb.db._name()) + url;
|
return "/_db/" + encodeURIComponent(arangodb.db._name()) + url;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -162,8 +162,7 @@ function post_api_database (req, res) {
|
||||||
|
|
||||||
var result = arangodb.db._createDatabase(json.name || "", options, users);
|
var result = arangodb.db._createDatabase(json.name || "", options, users);
|
||||||
|
|
||||||
var returnCode = (req.compatibility <= 10400 ? actions.HTTP_OK : actions.HTTP_CREATED);
|
actions.resultOk(req, res, actions.HTTP_CREATED, { result : result });
|
||||||
actions.resultOk(req, res, returnCode, { result : result });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -75,9 +75,8 @@ function startReadingQuery (endpoint, collName, timeout) {
|
||||||
}
|
}
|
||||||
var count = 0;
|
var count = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
count += 1;
|
if (++count > 5) {
|
||||||
if (count > 500) {
|
console.error("startReadingQuery: Read transaction did not begin. Giving up after 5 tries");
|
||||||
console.error("startReadingQuery: Read transaction did not begin. Giving up");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
require("internal").wait(0.2);
|
require("internal").wait(0.2);
|
||||||
|
@ -106,7 +105,8 @@ function startReadingQuery (endpoint, collName, timeout) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
console.error("startReadingQuery: Did not find query.", r);
|
console.info("startReadingQuery: Did not find query.", r);
|
||||||
|
require("internal").wait(0.5, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*jshint globalstrict:false, strict:false */
|
/*jshint globalstrict:false, strict:false */
|
||||||
/*global assertTrue, assertEqual */
|
/*global assertTrue, assertEqual, fail */
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief test synchronous replication in the cluster
|
/// @brief test synchronous replication in the cluster
|
||||||
|
@ -28,13 +28,16 @@
|
||||||
/// @author Copyright 2016, ArangoDB GmbH, Cologne, Germany
|
/// @author Copyright 2016, ArangoDB GmbH, Cologne, Germany
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
var jsunity = require("jsunity");
|
const jsunity = require("jsunity");
|
||||||
|
|
||||||
var arangodb = require("@arangodb");
|
const arangodb = require("@arangodb");
|
||||||
var db = arangodb.db;
|
const db = arangodb.db;
|
||||||
var _ = require("lodash");
|
const ERRORS = arangodb.errors;
|
||||||
var print = require("internal").print;
|
const _ = require("lodash");
|
||||||
var wait = require("internal").wait;
|
const print = require("internal").print;
|
||||||
|
const wait = require("internal").wait;
|
||||||
|
const suspendExternal = require("internal").suspendExternal;
|
||||||
|
const continueExternal = require("internal").continueExternal;
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -62,7 +65,6 @@ function SynchronousReplicationSuite () {
|
||||||
s => global.ArangoClusterInfo.getCollectionInfoCurrent("_system", cn, s)
|
s => global.ArangoClusterInfo.getCollectionInfoCurrent("_system", cn, s)
|
||||||
);
|
);
|
||||||
let replicas = ccinfo.map(s => s.servers.length);
|
let replicas = ccinfo.map(s => s.servers.length);
|
||||||
print("Replicas:", replicas);
|
|
||||||
if (_.all(replicas, x => x === 2)) {
|
if (_.all(replicas, x => x === 2)) {
|
||||||
print("Replication up and running!");
|
print("Replication up and running!");
|
||||||
return true;
|
return true;
|
||||||
|
@ -72,6 +74,195 @@ function SynchronousReplicationSuite () {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail the follower
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function failFollower() {
|
||||||
|
var follower = cinfo.shards[shards[0]][1];
|
||||||
|
var endpoint = global.ArangoClusterInfo.getServerEndpoint(follower);
|
||||||
|
// Now look for instanceInfo:
|
||||||
|
var pos = _.findIndex(global.instanceInfo.arangods,
|
||||||
|
x => x.endpoint === endpoint);
|
||||||
|
assertTrue(pos >= 0);
|
||||||
|
assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid));
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief heal the follower
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function healFollower() {
|
||||||
|
var follower = cinfo.shards[shards[0]][1];
|
||||||
|
var endpoint = global.ArangoClusterInfo.getServerEndpoint(follower);
|
||||||
|
// Now look for instanceInfo:
|
||||||
|
var pos = _.findIndex(global.instanceInfo.arangods,
|
||||||
|
x => x.endpoint === endpoint);
|
||||||
|
assertTrue(pos >= 0);
|
||||||
|
assertTrue(continueExternal(global.instanceInfo.arangods[pos].pid));
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief produce failure
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function makeFailure(failure) {
|
||||||
|
if (failure.follower) {
|
||||||
|
failFollower();
|
||||||
|
/* } else {
|
||||||
|
failLeader(); // TODO: function does not exist
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief heal failure
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function healFailure(failure) {
|
||||||
|
if (failure.follower) {
|
||||||
|
healFollower();
|
||||||
|
/* } else {
|
||||||
|
healLeader(); // TODO: function does not exist
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief basic operations, with various failure modes:
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function runBasicOperations(failure, healing) {
|
||||||
|
if (failure.place === 1) { makeFailure(failure); }
|
||||||
|
|
||||||
|
// Insert with check:
|
||||||
|
var id = c.insert({Hallo:12});
|
||||||
|
assertEqual(1, c.count());
|
||||||
|
|
||||||
|
if (healing.place === 1) { healFailure(healing); }
|
||||||
|
if (failure.place === 2) { makeFailure(failure); }
|
||||||
|
|
||||||
|
var doc = c.document(id._key);
|
||||||
|
assertEqual(12, doc.Hallo);
|
||||||
|
|
||||||
|
if (healing.place === 2) { healFailure(healing); }
|
||||||
|
if (failure.place === 3) { makeFailure(failure); }
|
||||||
|
|
||||||
|
var ids = c.insert([{Hallo:13}, {Hallo:14}]);
|
||||||
|
assertEqual(3, c.count());
|
||||||
|
assertEqual(2, ids.length);
|
||||||
|
|
||||||
|
if (healing.place === 3) { healFailure(healing); }
|
||||||
|
if (failure.place === 4) { makeFailure(failure); }
|
||||||
|
|
||||||
|
var docs = c.document([ids[0]._key, ids[1]._key]);
|
||||||
|
assertEqual(2, docs.length);
|
||||||
|
assertEqual(13, docs[0].Hallo);
|
||||||
|
assertEqual(14, docs[1].Hallo);
|
||||||
|
|
||||||
|
if (healing.place === 4) { healFailure(healing); }
|
||||||
|
if (failure.place === 5) { makeFailure(failure); }
|
||||||
|
|
||||||
|
// Replace with check:
|
||||||
|
c.replace(id._key, {"Hallo": 100});
|
||||||
|
|
||||||
|
if (healing.place === 5) { healFailure(healing); }
|
||||||
|
if (failure.place === 6) { makeFailure(failure); }
|
||||||
|
|
||||||
|
doc = c.document(id._key);
|
||||||
|
assertEqual(100, doc.Hallo);
|
||||||
|
|
||||||
|
if (healing.place === 6) { healFailure(healing); }
|
||||||
|
if (failure.place === 7) { makeFailure(failure); }
|
||||||
|
|
||||||
|
c.replace([ids[0]._key, ids[1]._key], [{Hallo:101}, {Hallo:102}]);
|
||||||
|
|
||||||
|
if (healing.place === 7) { healFailure(healing); }
|
||||||
|
if (failure.place === 8) { makeFailure(failure); }
|
||||||
|
|
||||||
|
docs = c.document([ids[0]._key, ids[1]._key]);
|
||||||
|
assertEqual(2, docs.length);
|
||||||
|
assertEqual(101, docs[0].Hallo);
|
||||||
|
assertEqual(102, docs[1].Hallo);
|
||||||
|
|
||||||
|
if (healing.place === 8) { healFailure(healing); }
|
||||||
|
if (failure.place === 9) { makeFailure(failure); }
|
||||||
|
|
||||||
|
// Update with check:
|
||||||
|
c.update(id._key, {"Hallox": 105});
|
||||||
|
|
||||||
|
if (healing.place === 9) { healFailure(healing); }
|
||||||
|
if (failure.place === 10) { makeFailure(failure); }
|
||||||
|
|
||||||
|
doc = c.document(id._key);
|
||||||
|
assertEqual(100, doc.Hallo);
|
||||||
|
assertEqual(105, doc.Hallox);
|
||||||
|
|
||||||
|
if (healing.place === 10) { healFailure(healing); }
|
||||||
|
if (failure.place === 11) { makeFailure(failure); }
|
||||||
|
|
||||||
|
c.update([ids[0]._key, ids[1]._key], [{Hallox:106}, {Hallox:107}]);
|
||||||
|
|
||||||
|
if (healing.place === 11) { healFailure(healing); }
|
||||||
|
if (failure.place === 12) { makeFailure(failure); }
|
||||||
|
|
||||||
|
docs = c.document([ids[0]._key, ids[1]._key]);
|
||||||
|
assertEqual(2, docs.length);
|
||||||
|
assertEqual(101, docs[0].Hallo);
|
||||||
|
assertEqual(102, docs[1].Hallo);
|
||||||
|
assertEqual(106, docs[0].Hallox);
|
||||||
|
assertEqual(107, docs[1].Hallox);
|
||||||
|
|
||||||
|
if (healing.place === 12) { healFailure(healing); }
|
||||||
|
if (failure.place === 13) { makeFailure(failure); }
|
||||||
|
|
||||||
|
// AQL:
|
||||||
|
var q = db._query(`FOR x IN @@cn
|
||||||
|
FILTER x.Hallo > 0
|
||||||
|
SORT x.Hallo
|
||||||
|
RETURN {"Hallo": x.Hallo}`, {"@cn": cn});
|
||||||
|
docs = q.toArray();
|
||||||
|
assertEqual(3, docs.length);
|
||||||
|
assertEqual([{Hallo:100}, {Hallo:101}, {Hallo:102}], docs);
|
||||||
|
|
||||||
|
if (healing.place === 13) { healFailure(healing); }
|
||||||
|
if (failure.place === 14) { makeFailure(failure); }
|
||||||
|
|
||||||
|
// Remove with check:
|
||||||
|
c.remove(id._key);
|
||||||
|
|
||||||
|
if (healing.place === 14) { healFailure(healing); }
|
||||||
|
if (failure.place === 15) { makeFailure(failure); }
|
||||||
|
|
||||||
|
try {
|
||||||
|
doc = c.document(id._key);
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
catch (e1) {
|
||||||
|
assertEqual(ERRORS.ERROR_ARANGO_DOCUMENT_NOT_FOUND.code, e1.errorNum);
|
||||||
|
}
|
||||||
|
assertEqual(2, c.count());
|
||||||
|
|
||||||
|
if (healing.place === 15) { healFailure(healing); }
|
||||||
|
if (failure.place === 16) { makeFailure(failure); }
|
||||||
|
|
||||||
|
c.remove([ids[0]._key, ids[1]._key]);
|
||||||
|
|
||||||
|
if (healing.place === 16) { healFailure(healing); }
|
||||||
|
if (failure.place === 17) { makeFailure(failure); }
|
||||||
|
|
||||||
|
docs = c.document([ids[0]._key, ids[1]._key]);
|
||||||
|
assertEqual(2, docs.length);
|
||||||
|
assertTrue(docs[0].error);
|
||||||
|
assertTrue(docs[1].error);
|
||||||
|
|
||||||
|
if (healing.place === 17) { healFailure(healing); }
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief the actual tests
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -80,7 +271,7 @@ function SynchronousReplicationSuite () {
|
||||||
|
|
||||||
setUp : function () {
|
setUp : function () {
|
||||||
db._drop(cn);
|
db._drop(cn);
|
||||||
c = db._create(cn, {numberOfShards: 2, replicationFactor: 2});
|
c = db._create(cn, {numberOfShards: 1, replicationFactor: 2});
|
||||||
},
|
},
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -96,7 +287,6 @@ function SynchronousReplicationSuite () {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
testCheckInstanceInfo : function () {
|
testCheckInstanceInfo : function () {
|
||||||
require("internal").print("InstanceInfo is:", global.instanceInfo);
|
|
||||||
assertTrue(global.instanceInfo !== undefined);
|
assertTrue(global.instanceInfo !== undefined);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -106,7 +296,197 @@ function SynchronousReplicationSuite () {
|
||||||
|
|
||||||
testSetup : function () {
|
testSetup : function () {
|
||||||
assertTrue(waitForSynchronousReplication());
|
assertTrue(waitForSynchronousReplication());
|
||||||
assertEqual(12, 12);
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief run a standard check without failures:
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperations : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({}, {});
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief run a standard check with failures:
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFailureFollower : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
failFollower();
|
||||||
|
runBasicOperations({}, {});
|
||||||
|
healFollower();
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 1
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail1 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:1, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 2
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail2 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:2, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 3
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail3 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:3, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 4
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail4 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:4, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 5
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail5 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:5, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 6
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail6 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:6, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 7
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail7 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:7, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 8
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail8 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:8, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 9
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail9 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:9, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 10
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail10 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:10, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 11
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail11 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:11, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 12
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail12 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:12, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 13
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail13 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:13, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 14
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail14 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:14, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 15
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail15 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:15, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 16
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail16 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:16, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief fail in place 17
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
testBasicOperationsFollowerFail17 : function () {
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
|
runBasicOperations({place:17, follower:true}, {place:17, follower: true});
|
||||||
|
assertTrue(waitForSynchronousReplication());
|
||||||
},
|
},
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -114,6 +494,8 @@ function SynchronousReplicationSuite () {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
testDummy : function () {
|
testDummy : function () {
|
||||||
|
assertEqual(12, 12);
|
||||||
|
wait(15);
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue