mirror of https://gitee.com/bigwinds/arangodb
erge branch 'devel' of github.com:arangodb/ArangoDB into devel
This commit is contained in:
commit
c81df0e7a3
|
@ -17,6 +17,12 @@ The number of documents to skip in the query (optional).
|
|||
The maximal amount of documents to return. The *skip*
|
||||
is applied before the *limit* restriction. (optional)
|
||||
|
||||
@RESTBODYPARAM{batchSize,integer,optional,int64}
|
||||
maximum number of result documents to be transferred from
|
||||
the server to the client in one roundtrip. If this attribute is
|
||||
not set, a server-controlled default value will be used. A *batchSize* value of
|
||||
*0* is disallowed.
|
||||
|
||||
@RESTDESCRIPTION
|
||||
|
||||
This will find all documents matching a given example.
|
||||
|
|
|
@ -73,6 +73,30 @@ bool AddFollower::create() {
|
|||
TRI_ASSERT(current[0].isString());
|
||||
#endif
|
||||
|
||||
std::string planPath =
|
||||
planColPrefix + _database + "/" + _collection + "/shards";
|
||||
|
||||
auto const& myClones = clones(_snapshot, _database, _collection);
|
||||
if (!myClones.empty()) {
|
||||
|
||||
size_t sub = 0;
|
||||
auto myshards = _snapshot(planPath).children();
|
||||
auto mpos = std::distance(myshards.begin(), myshards.find(_shard));
|
||||
|
||||
// Deal with my clones
|
||||
for (auto const& collection : myClones) {
|
||||
auto othershards = _snapshot(
|
||||
planColPrefix + _database + "/" + collection + "/shards").children();
|
||||
auto opos = othershards.begin();
|
||||
std::advance(opos, mpos);
|
||||
auto const& shard = opos->first;
|
||||
|
||||
AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
|
||||
_jobId, _agencyPrefix, _database, collection, shard,
|
||||
_newFollower);
|
||||
}
|
||||
}
|
||||
|
||||
_jb = std::make_shared<Builder>();
|
||||
_jb->openArray();
|
||||
_jb->openObject();
|
||||
|
|
|
@ -243,7 +243,7 @@ bool CleanOutServer::start() {
|
|||
|
||||
bool CleanOutServer::scheduleMoveShards() {
|
||||
|
||||
std::vector<std::string> servers = availableServers();
|
||||
std::vector<std::string> servers = availableServers(_snapshot);
|
||||
|
||||
// Minimum 1 DB server must remain
|
||||
if (servers.size() == 1) {
|
||||
|
|
|
@ -78,7 +78,8 @@ bool FailedFollower::create() {
|
|||
|
||||
// Deal with my clones
|
||||
for (auto const& collection : myClones) {
|
||||
auto othershards = _snapshot(planPath).children();
|
||||
auto othershards = _snapshot(
|
||||
planColPrefix + _database + "/" + collection + "/shards").children();
|
||||
auto opos = othershards.begin();
|
||||
std::advance(opos, mpos);
|
||||
auto const& shard = opos->first;
|
||||
|
|
|
@ -146,7 +146,7 @@ bool FailedServer::start() {
|
|||
}
|
||||
} catch (...) {} // Not clone
|
||||
|
||||
auto available = availableServers();
|
||||
auto available = availableServers(_snapshot);
|
||||
|
||||
for (auto const& shard : collection("shards").children()) {
|
||||
|
||||
|
|
|
@ -143,12 +143,12 @@ bool Job::finish(std::string const& type, bool success,
|
|||
}
|
||||
|
||||
|
||||
std::vector<std::string> Job::availableServers() const {
|
||||
std::vector<std::string> Job::availableServers(Node const& snapshot) {
|
||||
|
||||
std::vector<std::string> ret;
|
||||
|
||||
// Get servers from plan
|
||||
Node::Children const& dbservers = _snapshot(plannedServers).children();
|
||||
Node::Children const& dbservers = snapshot(plannedServers).children();
|
||||
for (auto const& srv : dbservers) {
|
||||
ret.push_back(srv.first);
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ std::vector<std::string> Job::availableServers() const {
|
|||
// Remove cleaned servers from ist
|
||||
try {
|
||||
for (auto const& srv :
|
||||
VPackArrayIterator(_snapshot(cleanedPrefix).slice())) {
|
||||
VPackArrayIterator(snapshot(cleanedPrefix).slice())) {
|
||||
ret.erase(
|
||||
std::remove(ret.begin(), ret.end(), srv.copyString()),
|
||||
ret.end());
|
||||
|
@ -167,7 +167,7 @@ std::vector<std::string> Job::availableServers() const {
|
|||
// Remove failed servers from list
|
||||
try {
|
||||
for (auto const& srv :
|
||||
VPackArrayIterator(_snapshot(failedServersPrefix).slice())) {
|
||||
VPackArrayIterator(snapshot(failedServersPrefix).slice())) {
|
||||
ret.erase(
|
||||
std::remove(ret.begin(), ret.end(), srv.copyString()),
|
||||
ret.end());
|
||||
|
|
|
@ -109,7 +109,8 @@ struct Job {
|
|||
|
||||
virtual bool start() = 0;
|
||||
|
||||
virtual std::vector<std::string> availableServers() const;
|
||||
static std::vector<std::string> availableServers(
|
||||
const arangodb::consensus::Node&);
|
||||
|
||||
static std::vector<std::string> clones(
|
||||
Node const& snapshot, std::string const& database,
|
||||
|
|
|
@ -286,7 +286,7 @@ bool RemoveServer::start() {
|
|||
|
||||
bool RemoveServer::scheduleAddFollowers() {
|
||||
|
||||
std::vector<std::string> servers = availableServers();
|
||||
std::vector<std::string> servers = availableServers(_snapshot);
|
||||
|
||||
// Minimum 1 DB server must remain
|
||||
if (servers.size() == 1) {
|
||||
|
|
|
@ -548,6 +548,7 @@ bool Supervision::handleJobs() {
|
|||
}
|
||||
|
||||
// Do supervision
|
||||
|
||||
shrinkCluster();
|
||||
workJobs();
|
||||
enforceReplication();
|
||||
|
@ -614,18 +615,36 @@ void Supervision::workJobs() {
|
|||
void Supervision::enforceReplication() {
|
||||
|
||||
auto const& plannedDBs = _snapshot(planColPrefix).children();
|
||||
auto available = Job::availableServers(_snapshot);
|
||||
|
||||
for (const auto& db_ : plannedDBs) { // Planned databases
|
||||
auto const& db = *(db_.second);
|
||||
for (const auto& col_ : db.children()) { // Planned collections
|
||||
auto const& col = *(col_.second);
|
||||
auto const& replicationFactor = col("replicationFactor").slice().getUInt();
|
||||
for (auto const& shard_ : col("shards").children()) { // Pl shards
|
||||
auto const& shard = *(shard_.second);
|
||||
if (replicationFactor != shard.slice().length()) {
|
||||
LOG(WARN) << shard.slice().typeName()
|
||||
<< " target repl(" << replicationFactor
|
||||
<< ") actual repl(" << shard.slice().length() << ")";
|
||||
|
||||
bool clone = false;
|
||||
try {
|
||||
clone = !col("distributeShardsLike").slice().copyString().empty();
|
||||
} catch (...) {}
|
||||
|
||||
if (!clone) {
|
||||
for (auto const& shard_ : col("shards").children()) { // Pl shards
|
||||
auto const& shard = *(shard_.second);
|
||||
|
||||
// Enough DBServer to
|
||||
if (replicationFactor > shard.slice().length() &&
|
||||
available.size() >= replicationFactor) {
|
||||
for (auto const& i : VPackArrayIterator(shard.slice())) {
|
||||
available.erase(
|
||||
std::remove(
|
||||
available.begin(), available.end(), i.copyString()),
|
||||
available.end());
|
||||
}
|
||||
AddFollower(
|
||||
_snapshot, _agent, std::to_string(_jobId++), "supervision",
|
||||
_agencyPrefix, db_.first, col_.first, shard_.first, available.back());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -308,7 +308,8 @@ void ClusterFeature::prepare() {
|
|||
LOG(INFO) << "Waiting for DBservers to show up...";
|
||||
ci->loadCurrentDBServers();
|
||||
std::vector<ServerID> DBServers = ci->getCurrentDBServers();
|
||||
if (DBServers.size() > 1 || TRI_microtime() - start > 30.0) {
|
||||
if (DBServers.size() >= 1 &&
|
||||
(DBServers.size() > 1 || TRI_microtime() - start > 15.0)) {
|
||||
LOG(INFO) << "Found " << DBServers.size() << " DBservers.";
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue