mirror of https://gitee.com/bigwinds/arangodb
Moving Job classes out of Supervision
This commit is contained in:
parent
644714c8de
commit
7b440f94dc
|
@ -180,6 +180,26 @@ class Agent : public arangodb::Thread {
|
|||
/// @brief Next compaction after
|
||||
arangodb::consensus::index_t _nextCompationAfter;
|
||||
};
|
||||
|
||||
inline arangodb::consensus::write_ret_t transact (
|
||||
Agent* _agent, Builder const& transaction) {
|
||||
|
||||
query_t envelope = std::make_shared<Builder>();
|
||||
|
||||
try {
|
||||
envelope->openArray();
|
||||
envelope->add(transaction.slice());
|
||||
envelope->close();
|
||||
} catch (std::exception const& e) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) << "Supervision failed to build transaction.";
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::AGENCY) << envelope->toJson();
|
||||
return _agent->write(envelope);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,233 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Kaveh Vahedipour
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "FailedLeader.h"
|
||||
|
||||
#include "Agent.h"
|
||||
#include "Job.h"
|
||||
|
||||
using namespace arangodb::consensus;
|
||||
|
||||
FailedLeader::FailedLeader(
|
||||
Node const& snapshot, Agent* agent, std::string const& jobId,
|
||||
std::string const& creator, std::string const& agencyPrefix,
|
||||
std::string const& database, std::string const& collection,
|
||||
std::string const& shard, std::string const& from, std::string const& to) :
|
||||
Job(snapshot, agent, jobId, creator, agencyPrefix), _database(database),
|
||||
_collection(collection), _shard(shard), _from(from), _to(to) {
|
||||
|
||||
if (exists()) {
|
||||
if (!status()) {
|
||||
start();
|
||||
}
|
||||
} else {
|
||||
create();
|
||||
start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
FailedLeader::~FailedLeader() {}
|
||||
|
||||
bool FailedLeader::create () const {
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Todo: Move shard " + _shard
|
||||
+ " from " + _from + " to " + _to;
|
||||
|
||||
std::string path = _agencyPrefix + toDoPrefix + _jobId;
|
||||
|
||||
Builder todo;
|
||||
todo.openArray();
|
||||
todo.openObject();
|
||||
todo.add(path, VPackValue(VPackValueType::Object));
|
||||
todo.add("creator", VPackValue(_creator));
|
||||
todo.add("type", VPackValue("failedLeader"));
|
||||
todo.add("database", VPackValue(_database));
|
||||
todo.add("collection", VPackValue(_collection));
|
||||
todo.add("shard", VPackValue(_shard));
|
||||
todo.add("fromServer", VPackValue(_from));
|
||||
todo.add("toServer", VPackValue(_to));
|
||||
todo.add("isLeader", VPackValue(true));
|
||||
todo.add("jobId", VPackValue(_jobId));
|
||||
todo.add("timeCreated",
|
||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||
todo.close(); todo.close(); todo.close();
|
||||
|
||||
write_ret_t res = transact(_agent, todo);
|
||||
|
||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Failed to insert job " + _jobId;
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
bool FailedLeader::start() const {
|
||||
|
||||
// DBservers
|
||||
std::string planPath =
|
||||
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
|
||||
std::string curPath =
|
||||
curColPrefix + _database + "/" + _collection + "/" + _shard + "/servers";
|
||||
|
||||
Node const& current = _snapshot(curPath);
|
||||
|
||||
if (current.slice().length() == 1) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) << "Failed to move shard from " + _from
|
||||
+ " to " + _to + ". No in-sync followers:" + current.slice().toJson();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Copy todo to pending
|
||||
Builder todo, pending;
|
||||
|
||||
// Get todo entry
|
||||
todo.openArray();
|
||||
_snapshot(toDoPrefix + _jobId).toBuilder(todo);
|
||||
todo.close();
|
||||
|
||||
// Transaction
|
||||
pending.openArray();
|
||||
|
||||
// Apply
|
||||
// --- Add pending entry
|
||||
pending.openObject();
|
||||
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("timeStarted",
|
||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
||||
pending.add(obj.key.copyString(), obj.value);
|
||||
}
|
||||
pending.close();
|
||||
|
||||
// --- Remove todo entry
|
||||
pending.add(_agencyPrefix + toDoPrefix + _jobId,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("op", VPackValue("delete"));
|
||||
pending.close();
|
||||
|
||||
// --- Cyclic shift in sync servers
|
||||
pending.add(_agencyPrefix + planPath, VPackValue(VPackValueType::Array));
|
||||
for (size_t i = 1; i < current.slice().length(); ++i) {
|
||||
pending.add(current.slice()[i]);
|
||||
}
|
||||
pending.add(current.slice()[0]);
|
||||
pending.close();
|
||||
|
||||
// --- Block shard
|
||||
pending.add(_agencyPrefix + blockedShardsPrefix + _shard,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("jobId", VPackValue(_jobId));
|
||||
pending.add("jobId", VPackValue(_jobId));
|
||||
pending.close();
|
||||
|
||||
// --- Increment Plan/Version
|
||||
pending.add(_agencyPrefix + planVersion,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("op", VPackValue("increment"));
|
||||
pending.close();
|
||||
|
||||
pending.close();
|
||||
|
||||
// Precondition
|
||||
// --- Check that Current servers are as we expect
|
||||
pending.openObject();
|
||||
pending.add(_agencyPrefix + curPath, VPackValue(VPackValueType::Object));
|
||||
pending.add("old", current.slice());
|
||||
pending.close();
|
||||
|
||||
// --- Check if shard is not blocked
|
||||
pending.add(_agencyPrefix + blockedShardsPrefix + _shard,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("oldEmpty", VPackValue(true));
|
||||
pending.close();
|
||||
|
||||
pending.close(); pending.close();
|
||||
|
||||
// Transact
|
||||
write_ret_t res = transact(_agent, pending);
|
||||
|
||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Move shard " + _shard
|
||||
+ " from " + _from + " to " + _to;
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) <<
|
||||
"Precondition failed for starting job " + _jobId;
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
|
||||
unsigned FailedLeader::status () const {
|
||||
|
||||
Node const& target = _snapshot("/Target");
|
||||
|
||||
if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) {
|
||||
|
||||
return TODO;
|
||||
|
||||
} else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) {
|
||||
|
||||
Node const& job = _snapshot(pendingPrefix + _jobId);
|
||||
std::string database = job("database").toJson(),
|
||||
collection = job("collection").toJson(),
|
||||
shard = job("shard").toJson(),
|
||||
planPath = planColPrefix + database + "/" + collection + "/shards/"
|
||||
+ shard,
|
||||
curPath = curColPrefix + database + "/" + collection + "/" + shard
|
||||
+ "/servers";
|
||||
|
||||
Node const& planned = _snapshot(planPath);
|
||||
Node const& current = _snapshot(curPath);
|
||||
|
||||
if (planned.slice()[0] == current.slice()[0]) {
|
||||
|
||||
if (finish("Shards/" + shard)) {
|
||||
return FINISHED;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return PENDING;
|
||||
|
||||
} else if (target.exists(std::string("/Finished/") + _jobId).size() == 2) {
|
||||
|
||||
return FINISHED;
|
||||
|
||||
} else if (target.exists(std::string("/Failed/") + _jobId).size() == 2) {
|
||||
|
||||
return FAILED;
|
||||
|
||||
}
|
||||
|
||||
return NOTFOUND;
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Kaveh Vahedipour
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_CONSENSUS_FAILED_LEADER_H
|
||||
#define ARANGOD_CONSENSUS_FAILED_LEADER_H 1
|
||||
|
||||
#include "Job.h"
|
||||
#include "Supervision.h"
|
||||
|
||||
namespace arangodb {
|
||||
namespace consensus {
|
||||
|
||||
struct FailedLeader : public Job {
|
||||
|
||||
FailedLeader(Node const& snapshot, Agent* agent, std::string const& jobId,
|
||||
std::string const& creator, std::string const& agencyPrefix,
|
||||
std::string const& database = std::string(),
|
||||
std::string const& collection = std::string(),
|
||||
std::string const& shard = std::string(),
|
||||
std::string const& from = std::string(),
|
||||
std::string const& to = std::string());
|
||||
|
||||
virtual ~FailedLeader();
|
||||
|
||||
virtual bool create () const override;
|
||||
virtual bool start() const override;
|
||||
virtual unsigned status () const override;
|
||||
|
||||
std::string const& _database;
|
||||
std::string const& _collection;
|
||||
std::string const& _shard;
|
||||
std::string const& _from;
|
||||
std::string const& _to;
|
||||
|
||||
};
|
||||
|
||||
}} //namespaces
|
||||
|
||||
#endif
|
||||
|
||||
|
|
@ -0,0 +1,155 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Kaveh Vahedipour
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_CONSENSUS_JOB_H
|
||||
#define ARANGOD_CONSENSUS_JOB_H 1
|
||||
|
||||
#include "Agent.h"
|
||||
#include "Node.h"
|
||||
#include "Supervision.h"
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace arangodb {
|
||||
namespace consensus {
|
||||
|
||||
enum JOB_STATUS {TODO, PENDING, FINISHED, FAILED, NOTFOUND};
|
||||
|
||||
static std::string const pendingPrefix = "/Target/Pending/";
|
||||
static std::string const finishedPrefix = "/Target/Finished/";
|
||||
static std::string const failedPrefix = "/Target/Failed/";
|
||||
static std::string const planColPrefix = "/Plan/Collections/";
|
||||
static std::string const curColPrefix = "/Current/Collections/";
|
||||
static std::string const toDoPrefix = "/Target/ToDo/";
|
||||
static std::string const blockedServersPrefix = "/Supervision/DBServers/";
|
||||
static std::string const blockedShardsPrefix = "/Supervision/Shards/";
|
||||
static std::string const serverStatePrefix = "/Sync/ServerStates/";
|
||||
static std::string const planVersion = "/Plan/Version";
|
||||
|
||||
struct JobResult {
|
||||
JobResult() {}
|
||||
};
|
||||
|
||||
struct JobCallback {
|
||||
JobCallback() {}
|
||||
virtual ~JobCallback(){};
|
||||
virtual bool operator()(JobResult*) = 0;
|
||||
};
|
||||
|
||||
struct Job {
|
||||
|
||||
Job(Node const& snapshot, Agent* agent, std::string const& jobId,
|
||||
std::string const& creator, std::string const& agencyPrefix) :
|
||||
_snapshot(snapshot), _agent(agent), _jobId(jobId), _creator(creator),
|
||||
_agencyPrefix(agencyPrefix) {}
|
||||
|
||||
virtual ~Job() {}
|
||||
|
||||
virtual bool exists() const {
|
||||
|
||||
Node const& target = _snapshot("/Target");
|
||||
unsigned res = 4;
|
||||
|
||||
if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) {
|
||||
res = 0;
|
||||
} else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) {
|
||||
res = 1;
|
||||
} else if (target.exists(std::string("/Finished/") + _jobId).size() == 2) {
|
||||
res = 2;
|
||||
} else if (target.exists(std::string("/Failed/") + _jobId).size() == 2) {
|
||||
res = 3;
|
||||
}
|
||||
|
||||
return (res < 4);
|
||||
|
||||
}
|
||||
|
||||
virtual bool finish(std::string const& type, bool success = true) const {
|
||||
|
||||
Builder pending, finished;
|
||||
|
||||
// Get todo entry
|
||||
pending.openArray();
|
||||
_snapshot(pendingPrefix + _jobId).toBuilder(pending);
|
||||
pending.close();
|
||||
|
||||
// Prepare peding entry, block toserver
|
||||
finished.openArray();
|
||||
|
||||
// --- Add finished
|
||||
finished.openObject();
|
||||
finished.add(_agencyPrefix + (success ? finishedPrefix : failedPrefix)
|
||||
+ _jobId, VPackValue(VPackValueType::Object));
|
||||
finished.add("timeFinished",
|
||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||
for (auto const& obj : VPackObjectIterator(pending.slice()[0])) {
|
||||
finished.add(obj.key.copyString(), obj.value);
|
||||
}
|
||||
finished.close();
|
||||
|
||||
// --- Delete pending
|
||||
finished.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||
VPackValue(VPackValueType::Object));
|
||||
finished.add("op", VPackValue("delete"));
|
||||
finished.close();
|
||||
|
||||
// --- Remove block
|
||||
finished.add(_agencyPrefix + "/Supervision/" + type,
|
||||
VPackValue(VPackValueType::Object));
|
||||
finished.add("op", VPackValue("delete"));
|
||||
finished.close();
|
||||
|
||||
// --- Need precond?
|
||||
finished.close(); finished.close();
|
||||
|
||||
write_ret_t res = transact(_agent, finished);
|
||||
|
||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
virtual unsigned status () const = 0;
|
||||
|
||||
virtual bool create () const = 0;
|
||||
|
||||
virtual bool start() const = 0;
|
||||
|
||||
Node const& _snapshot;
|
||||
Agent* _agent;
|
||||
std::string const _jobId;
|
||||
std::string const& _creator;
|
||||
std::string const& _agencyPrefix;
|
||||
|
||||
};
|
||||
|
||||
}}
|
||||
|
||||
#endif
|
|
@ -24,6 +24,8 @@
|
|||
#include "Supervision.h"
|
||||
|
||||
#include "Agent.h"
|
||||
#include "FailedLeader.h"
|
||||
#include "Job.h"
|
||||
#include "Store.h"
|
||||
|
||||
#include "Basics/ConditionLocker.h"
|
||||
|
@ -34,354 +36,6 @@ using namespace arangodb;
|
|||
|
||||
using namespace arangodb::consensus;
|
||||
|
||||
std::string timepointToString(Supervision::TimePoint const& t) {
|
||||
time_t tt = std::chrono::system_clock::to_time_t(t);
|
||||
struct tm tb;
|
||||
size_t const len (21);
|
||||
char buffer[len];
|
||||
TRI_localtime(tt, &tb);
|
||||
::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb);
|
||||
return std::string(buffer, len-1);
|
||||
}
|
||||
|
||||
Supervision::TimePoint stringToTimepoint(std::string const& str) {
|
||||
std::tm tt;
|
||||
tt.tm_year = std::stoi(str.substr(0,4)) - 1900;
|
||||
tt.tm_mon = std::stoi(str.substr(5,2)) - 1;
|
||||
tt.tm_mday = std::stoi(str.substr(8,2));
|
||||
tt.tm_hour = std::stoi(str.substr(11,2));
|
||||
tt.tm_min = std::stoi(str.substr(14,2));
|
||||
tt.tm_sec = std::stoi(str.substr(17,2));
|
||||
tt.tm_isdst = -1;
|
||||
auto time_c = ::mktime(&tt);
|
||||
return std::chrono::system_clock::from_time_t(time_c);
|
||||
}
|
||||
|
||||
|
||||
inline arangodb::consensus::write_ret_t transact (
|
||||
Agent* _agent, Builder const& transaction) {
|
||||
|
||||
query_t envelope = std::make_shared<Builder>();
|
||||
|
||||
try {
|
||||
envelope->openArray();
|
||||
envelope->add(transaction.slice());
|
||||
envelope->close();
|
||||
} catch (std::exception const& e) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) << "Supervision failed to build transaction.";
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::AGENCY) << envelope->toJson();
|
||||
return _agent->write(envelope);
|
||||
|
||||
}
|
||||
|
||||
enum JOB_STATUS {TODO, PENDING, FINISHED, FAILED, NOTFOUND};
|
||||
|
||||
static std::string const pendingPrefix = "/Target/Pending/";
|
||||
static std::string const finishedPrefix = "/Target/Finished/";
|
||||
static std::string const failedPrefix = "/Target/Failed/";
|
||||
static std::string const planColPrefix = "/Plan/Collections/";
|
||||
static std::string const curColPrefix = "/Current/Collections/";
|
||||
static std::string const toDoPrefix = "/Target/ToDo/";
|
||||
static std::string const blockedServersPrefix = "/Supervision/DBServers/";
|
||||
static std::string const blockedShardsPrefix = "/Supervision/Shards/";
|
||||
static std::string const serverStatePrefix = "/Sync/ServerStates/";
|
||||
static std::string const planVersion = "/Plan/Version";
|
||||
|
||||
Job::Job(Node const& snapshot, Agent* agent, std::string const& jobId,
|
||||
std::string const& creator, std::string const& agencyPrefix) :
|
||||
_snapshot(snapshot), _agent(agent), _jobId(jobId), _creator(creator),
|
||||
_agencyPrefix(agencyPrefix) {}
|
||||
|
||||
Job::~Job() {}
|
||||
|
||||
bool Job::exists() const {
|
||||
|
||||
Node const& target = _snapshot("/Target");
|
||||
unsigned res = 4;
|
||||
|
||||
if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) {
|
||||
res = 0;
|
||||
} else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) {
|
||||
res = 1;
|
||||
} else if (target.exists(std::string("/Finished/") + _jobId).size() == 2) {
|
||||
res = 2;
|
||||
} else if (target.exists(std::string("/Failed/") + _jobId).size() == 2) {
|
||||
res = 3;
|
||||
}
|
||||
|
||||
return (res < 4);
|
||||
|
||||
}
|
||||
|
||||
|
||||
bool Job::finish(std::string const& type, bool success = true) const {
|
||||
|
||||
Builder pending, finished;
|
||||
|
||||
// Get todo entry
|
||||
pending.openArray();
|
||||
_snapshot(pendingPrefix + _jobId).toBuilder(pending);
|
||||
pending.close();
|
||||
|
||||
// Prepare peding entry, block toserver
|
||||
finished.openArray();
|
||||
|
||||
// --- Add finished
|
||||
finished.openObject();
|
||||
finished.add(_agencyPrefix + (success ? finishedPrefix : failedPrefix)
|
||||
+ _jobId, VPackValue(VPackValueType::Object));
|
||||
finished.add("timeFinished",
|
||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||
for (auto const& obj : VPackObjectIterator(pending.slice()[0])) {
|
||||
finished.add(obj.key.copyString(), obj.value);
|
||||
}
|
||||
finished.close();
|
||||
|
||||
// --- Delete pending
|
||||
finished.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||
VPackValue(VPackValueType::Object));
|
||||
finished.add("op", VPackValue("delete"));
|
||||
finished.close();
|
||||
|
||||
// --- Remove block
|
||||
finished.add(_agencyPrefix + "/Supervision/" + type,
|
||||
VPackValue(VPackValueType::Object));
|
||||
finished.add("op", VPackValue("delete"));
|
||||
finished.close();
|
||||
|
||||
// --- Need precond?
|
||||
finished.close(); finished.close();
|
||||
|
||||
write_ret_t res = transact(_agent, finished);
|
||||
|
||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
|
||||
struct FailedLeader : public Job {
|
||||
|
||||
FailedLeader(Node const& snapshot, Agent* agent, std::string const& jobId,
|
||||
std::string const& creator, std::string const& agencyPrefix,
|
||||
std::string const& database = std::string(),
|
||||
std::string const& collection = std::string(),
|
||||
std::string const& shard = std::string(),
|
||||
std::string const& from = std::string(),
|
||||
std::string const& to = std::string()) :
|
||||
Job(snapshot, agent, jobId, creator, agencyPrefix), _database(database),
|
||||
_collection(collection), _shard(shard), _from(from), _to(to) {
|
||||
|
||||
if (exists()) {
|
||||
if (!status()) {
|
||||
start();
|
||||
}
|
||||
} else {
|
||||
create();
|
||||
start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
virtual ~FailedLeader() {}
|
||||
|
||||
virtual bool create () const {
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Todo: Move shard " + _shard
|
||||
+ " from " + _from + " to " + _to;
|
||||
|
||||
std::string path = _agencyPrefix + toDoPrefix + _jobId;
|
||||
|
||||
Builder todo;
|
||||
todo.openArray();
|
||||
todo.openObject();
|
||||
todo.add(path, VPackValue(VPackValueType::Object));
|
||||
todo.add("creator", VPackValue(_creator));
|
||||
todo.add("type", VPackValue("failedLeader"));
|
||||
todo.add("database", VPackValue(_database));
|
||||
todo.add("collection", VPackValue(_collection));
|
||||
todo.add("shard", VPackValue(_shard));
|
||||
todo.add("fromServer", VPackValue(_from));
|
||||
todo.add("toServer", VPackValue(_to));
|
||||
todo.add("isLeader", VPackValue(true));
|
||||
todo.add("jobId", VPackValue(_jobId));
|
||||
todo.add("timeCreated",
|
||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||
todo.close(); todo.close(); todo.close();
|
||||
|
||||
write_ret_t res = transact(_agent, todo);
|
||||
|
||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Failed to insert job " + _jobId;
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
virtual bool start() const {
|
||||
|
||||
// DBservers
|
||||
std::string planPath =
|
||||
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
|
||||
std::string curPath =
|
||||
curColPrefix + _database + "/" + _collection + "/" + _shard + "/servers";
|
||||
|
||||
Node const& current = _snapshot(curPath);
|
||||
|
||||
if (current.slice().length() == 1) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY) << "Failed to move shard from " + _from
|
||||
+ " to " + _to + ". No in-sync followers:" + current.slice().toJson();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Copy todo to pending
|
||||
Builder todo, pending;
|
||||
|
||||
// Get todo entry
|
||||
todo.openArray();
|
||||
_snapshot(toDoPrefix + _jobId).toBuilder(todo);
|
||||
todo.close();
|
||||
|
||||
// Transaction
|
||||
pending.openArray();
|
||||
|
||||
// Apply
|
||||
// --- Add pending entry
|
||||
pending.openObject();
|
||||
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("timeStarted",
|
||||
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
||||
pending.add(obj.key.copyString(), obj.value);
|
||||
}
|
||||
pending.close();
|
||||
|
||||
// --- Remove todo entry
|
||||
pending.add(_agencyPrefix + toDoPrefix + _jobId,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("op", VPackValue("delete"));
|
||||
pending.close();
|
||||
|
||||
// --- Cyclic shift in sync servers
|
||||
pending.add(_agencyPrefix + planPath, VPackValue(VPackValueType::Array));
|
||||
for (size_t i = 1; i < current.slice().length(); ++i) {
|
||||
pending.add(current.slice()[i]);
|
||||
}
|
||||
pending.add(current.slice()[0]);
|
||||
pending.close();
|
||||
|
||||
// --- Block shard
|
||||
pending.add(_agencyPrefix + blockedShardsPrefix + _shard,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("jobId", VPackValue(_jobId));
|
||||
pending.add("jobId", VPackValue(_jobId));
|
||||
pending.close();
|
||||
|
||||
// --- Increment Plan/Version
|
||||
pending.add(_agencyPrefix + planVersion,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("op", VPackValue("increment"));
|
||||
pending.close();
|
||||
|
||||
pending.close();
|
||||
|
||||
// Precondition
|
||||
// --- Check that Current servers are as we expect
|
||||
pending.openObject();
|
||||
pending.add(_agencyPrefix + curPath, VPackValue(VPackValueType::Object));
|
||||
pending.add("old", current.slice());
|
||||
pending.close();
|
||||
|
||||
// --- Check if shard is not blocked
|
||||
pending.add(_agencyPrefix + blockedShardsPrefix + _shard,
|
||||
VPackValue(VPackValueType::Object));
|
||||
pending.add("oldEmpty", VPackValue(true));
|
||||
pending.close();
|
||||
|
||||
pending.close(); pending.close();
|
||||
|
||||
// Transact
|
||||
write_ret_t res = transact(_agent, pending);
|
||||
|
||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Move shard " + _shard
|
||||
+ " from " + _from + " to " + _to;
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG_TOPIC(INFO, Logger::AGENCY) <<
|
||||
"Precondition failed for starting job " + _jobId;
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
|
||||
virtual unsigned status () const {
|
||||
|
||||
Node const& target = _snapshot("/Target");
|
||||
|
||||
if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) {
|
||||
|
||||
return TODO;
|
||||
|
||||
} else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) {
|
||||
|
||||
Node const& job = _snapshot(pendingPrefix + _jobId);
|
||||
std::string database = job("database").toJson(),
|
||||
collection = job("collection").toJson(),
|
||||
shard = job("shard").toJson(),
|
||||
planPath = planColPrefix + database + "/" + collection + "/shards/"
|
||||
+ shard,
|
||||
curPath = curColPrefix + database + "/" + collection + "/" + shard
|
||||
+ "/servers";
|
||||
|
||||
Node const& planned = _snapshot(planPath);
|
||||
Node const& current = _snapshot(curPath);
|
||||
|
||||
if (planned.slice()[0] == current.slice()[0]) {
|
||||
|
||||
if (finish("Shards/" + shard)) {
|
||||
return FINISHED;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return PENDING;
|
||||
|
||||
} else if (target.exists(std::string("/Finished/") + _jobId).size() == 2) {
|
||||
|
||||
return FINISHED;
|
||||
|
||||
} else if (target.exists(std::string("/Failed/") + _jobId).size() == 2) {
|
||||
|
||||
return FAILED;
|
||||
|
||||
}
|
||||
|
||||
return NOTFOUND;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
std::string const& _database;
|
||||
std::string const& _collection;
|
||||
std::string const& _shard;
|
||||
std::string const& _from;
|
||||
std::string const& _to;
|
||||
|
||||
};
|
||||
|
||||
struct FailedServer : public Job {
|
||||
|
||||
FailedServer(Node const& snapshot, Agent* agent, std::string const& jobId,
|
||||
|
@ -583,7 +237,13 @@ struct CleanOutServer : public Job {
|
|||
std::string const& creator, std::string const& prefix,
|
||||
std::string const& server) :
|
||||
Job(snapshot, agent, jobId, creator, prefix), _server(server) {
|
||||
|
||||
|
||||
if (exists()) {
|
||||
if (status() == TODO) {
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
virtual ~CleanOutServer () {}
|
||||
|
@ -672,14 +332,19 @@ struct CleanOutServer : public Job {
|
|||
|
||||
// Only proceed if leader and create job
|
||||
if ((*dbsit.begin()).copyString() != _server) {
|
||||
continue;
|
||||
/* MoveShardFromLeader (
|
||||
_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
|
||||
_jobId, _agencyPrefix, database.first, collptr.first,
|
||||
shard.first, _server, shard.second->slice()[1].copyString());*/
|
||||
sub++;
|
||||
} else {
|
||||
/* MoveShardFromFollower (
|
||||
_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
|
||||
_jobId, _agencyPrefix, database.first, collptr.first,
|
||||
shard.first, _server, shard.second->slice()[1].copyString());*/
|
||||
sub++;
|
||||
}
|
||||
|
||||
FailedLeader(
|
||||
_snapshot, _agent, _jobId + "-" + std::to_string(sub++), _jobId,
|
||||
_agencyPrefix, database.first, collptr.first, shard.first,
|
||||
_server, shard.second->slice()[1].copyString());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,32 +36,6 @@ namespace consensus {
|
|||
class Agent;
|
||||
class Store;
|
||||
|
||||
struct JobResult {
|
||||
JobResult() {}
|
||||
};
|
||||
|
||||
struct JobCallback {
|
||||
JobCallback() {}
|
||||
virtual ~JobCallback(){};
|
||||
virtual bool operator()(JobResult*) = 0;
|
||||
};
|
||||
|
||||
struct Job {
|
||||
Job(Node const&, Agent*, std::string const& jobId,
|
||||
std::string const& creator, std::string const& agencyPrefix);
|
||||
virtual ~Job();
|
||||
virtual bool exists () const;
|
||||
virtual bool finish (std::string const&, bool) const;
|
||||
virtual unsigned status () const = 0;
|
||||
virtual bool create () const = 0;
|
||||
virtual bool start() const = 0;
|
||||
Node const& _snapshot;
|
||||
Agent* _agent;
|
||||
std::string const _jobId;
|
||||
std::string const& _creator;
|
||||
std::string const& _agencyPrefix;
|
||||
};
|
||||
|
||||
struct check_t {
|
||||
bool good;
|
||||
std::string name;
|
||||
|
@ -183,7 +157,30 @@ class Supervision : public arangodb::Thread {
|
|||
|
||||
static std::string _agencyPrefix;
|
||||
};
|
||||
}
|
||||
|
||||
inline std::string timepointToString(Supervision::TimePoint const& t) {
|
||||
time_t tt = std::chrono::system_clock::to_time_t(t);
|
||||
struct tm tb;
|
||||
size_t const len (21);
|
||||
char buffer[len];
|
||||
TRI_localtime(tt, &tb);
|
||||
::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb);
|
||||
return std::string(buffer, len-1);
|
||||
}
|
||||
|
||||
inline Supervision::TimePoint stringToTimepoint(std::string const& s) {
|
||||
std::tm tt;
|
||||
tt.tm_year = std::stoi(s.substr(0,4)) - 1900;
|
||||
tt.tm_mon = std::stoi(s.substr(5,2)) - 1;
|
||||
tt.tm_mday = std::stoi(s.substr(8,2));
|
||||
tt.tm_hour = std::stoi(s.substr(11,2));
|
||||
tt.tm_min = std::stoi(s.substr(14,2));
|
||||
tt.tm_sec = std::stoi(s.substr(17,2));
|
||||
tt.tm_isdst = -1;
|
||||
auto time_c = ::mktime(&tt);
|
||||
return std::chrono::system_clock::from_time_t(time_c);
|
||||
}
|
||||
|
||||
}} // Name spaces
|
||||
|
||||
#endif
|
||||
|
|
|
@ -84,6 +84,7 @@ add_executable(${BIN_ARANGOD}
|
|||
Agency/Agent.cpp
|
||||
Agency/AgentCallback.cpp
|
||||
Agency/Constituent.cpp
|
||||
Agency/FailedLeader.cpp
|
||||
Agency/NotifierThread.cpp
|
||||
Agency/NotifyCallback.cpp
|
||||
Agency/Node.cpp
|
||||
|
|
|
@ -258,8 +258,9 @@
|
|||
"ERROR_INVALID_APPLICATION_MANIFEST" : { "code" : 3001, "message" : "manifest file is invalid" },
|
||||
"ERROR_INVALID_FOXX_OPTIONS" : { "code" : 3004, "message" : "invalid foxx options" },
|
||||
"ERROR_INVALID_MOUNTPOINT" : { "code" : 3007, "message" : "mountpoint is invalid" },
|
||||
"ERROR_APP_NOT_FOUND" : { "code" : 3009, "message" : "App not found" },
|
||||
"ERROR_APP_NEEDS_CONFIGURATION" : { "code" : 3010, "message" : "App not configured" },
|
||||
"ERROR_APP_NOT_FOUND" : { "code" : 3009, "message" : "Service not found" },
|
||||
"ERROR_APP_NEEDS_CONFIGURATION" : { "code" : 3010, "message" : "Service not configured" },
|
||||
"ERROR_APP_MOUNTPOINT_CONFLICT" : { "code" : 3011, "message" : "mountpoint already in use" },
|
||||
"ERROR_MODULE_NOT_FOUND" : { "code" : 3100, "message" : "cannot locate module" },
|
||||
"ERROR_MODULE_FAILURE" : { "code" : 3103, "message" : "failed to invoke module" },
|
||||
"RESULT_ELEMENT_EXISTS" : { "code" : 10000, "message" : "element not inserted into structure, because it already exists" },
|
||||
|
|
|
@ -254,8 +254,9 @@ void TRI_InitializeErrorMessages () {
|
|||
REG_ERROR(ERROR_INVALID_APPLICATION_MANIFEST, "manifest file is invalid");
|
||||
REG_ERROR(ERROR_INVALID_FOXX_OPTIONS, "invalid foxx options");
|
||||
REG_ERROR(ERROR_INVALID_MOUNTPOINT, "mountpoint is invalid");
|
||||
REG_ERROR(ERROR_APP_NOT_FOUND, "App not found");
|
||||
REG_ERROR(ERROR_APP_NEEDS_CONFIGURATION, "App not configured");
|
||||
REG_ERROR(ERROR_APP_NOT_FOUND, "Service not found");
|
||||
REG_ERROR(ERROR_APP_NEEDS_CONFIGURATION, "Service not configured");
|
||||
REG_ERROR(ERROR_APP_MOUNTPOINT_CONFLICT, "mountpoint already in use");
|
||||
REG_ERROR(ERROR_MODULE_NOT_FOUND, "cannot locate module");
|
||||
REG_ERROR(ERROR_MODULE_FAILURE, "failed to invoke module");
|
||||
REG_ERROR(RESULT_ELEMENT_EXISTS, "element not inserted into structure, because it already exists");
|
||||
|
|
|
@ -606,10 +606,12 @@
|
|||
/// The options used to configure the foxx are invalid.
|
||||
/// - 3007: @LIT{mountpoint is invalid}
|
||||
/// mountpoint is invalid
|
||||
/// - 3009: @LIT{App not found}
|
||||
/// No app found at this mountpoint
|
||||
/// - 3010: @LIT{App not configured}
|
||||
/// The app has to be configured before it can be used
|
||||
/// - 3009: @LIT{Service not found}
|
||||
/// No service found at this mountpoint
|
||||
/// - 3010: @LIT{Service not configured}
|
||||
/// The service has to be configured before it can be used
|
||||
/// - 3011: @LIT{mountpoint already in use}
|
||||
/// A service has already been installed at this mountpoint
|
||||
/// - 3100: @LIT{cannot locate module}
|
||||
/// The module path could not be resolved.
|
||||
/// - 3103: @LIT{failed to invoke module}
|
||||
|
@ -3215,9 +3217,9 @@ void TRI_InitializeErrorMessages ();
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief 3009: ERROR_APP_NOT_FOUND
|
||||
///
|
||||
/// App not found
|
||||
/// Service not found
|
||||
///
|
||||
/// No app found at this mountpoint
|
||||
/// No service found at this mountpoint
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#define TRI_ERROR_APP_NOT_FOUND (3009)
|
||||
|
@ -3225,13 +3227,23 @@ void TRI_InitializeErrorMessages ();
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief 3010: ERROR_APP_NEEDS_CONFIGURATION
|
||||
///
|
||||
/// App not configured
|
||||
/// Service not configured
|
||||
///
|
||||
/// The app has to be configured before it can be used
|
||||
/// The service has to be configured before it can be used
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#define TRI_ERROR_APP_NEEDS_CONFIGURATION (3010)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief 3011: ERROR_APP_MOUNTPOINT_CONFLICT
|
||||
///
|
||||
/// mountpoint already in use
|
||||
///
|
||||
/// A service has already been installed at this mountpoint
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#define TRI_ERROR_APP_MOUNTPOINT_CONFLICT (3011)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief 3100: ERROR_MODULE_NOT_FOUND
|
||||
///
|
||||
|
|
Loading…
Reference in New Issue