mirror of https://gitee.com/bigwinds/arangodb
307 lines
12 KiB
C++
307 lines
12 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
|
|
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Kaveh Vahedipour
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#ifndef ARANGOD_CONSENSUS_JOB_H
|
|
#define ARANGOD_CONSENSUS_JOB_H 1
|
|
|
|
#include "AgentInterface.h"
|
|
#include "Node.h"
|
|
#include "Supervision.h"
|
|
|
|
#include "Basics/Result.h"
|
|
|
|
#include <velocypack/Iterator.h>
|
|
#include <velocypack/Slice.h>
|
|
#include <velocypack/velocypack-aliases.h>
|
|
|
|
#include <string>
|
|
|
|
namespace arangodb {
|
|
namespace consensus {
|
|
|
|
enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND };
|
|
const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/",
|
|
"/Target/Finished/", "/Target/Failed/"});
|
|
extern std::string const mapUniqueToShortID;
|
|
extern std::string const pendingPrefix;
|
|
extern std::string const failedPrefix;
|
|
extern std::string const finishedPrefix;
|
|
extern std::string const toDoPrefix;
|
|
extern std::string const cleanedPrefix;
|
|
extern std::string const toBeCleanedPrefix;
|
|
extern std::string const failedServersPrefix;
|
|
extern std::string const planColPrefix;
|
|
extern std::string const curColPrefix;
|
|
extern std::string const planDBPrefix;
|
|
extern std::string const curServersKnown;
|
|
extern std::string const blockedServersPrefix;
|
|
extern std::string const blockedShardsPrefix;
|
|
extern std::string const planVersion;
|
|
extern std::string const plannedServers;
|
|
extern std::string const healthPrefix;
|
|
extern std::string const asyncReplLeader;
|
|
extern std::string const asyncReplTransientPrefix;
|
|
|
|
struct Job {
|
|
struct shard_t {
|
|
std::string collection;
|
|
std::string shard;
|
|
shard_t(std::string const& c, std::string const& s)
|
|
: collection(c), shard(s) {}
|
|
};
|
|
|
|
Job(JOB_STATUS status, Node const& snapshot, AgentInterface* agent,
|
|
std::string const& jobId, std::string const& creator = std::string());
|
|
|
|
virtual ~Job();
|
|
|
|
virtual void run(bool& aborts) = 0;
|
|
|
|
void runHelper(std::string const& server, std::string const& shard, bool& aborts) {
|
|
if (_status == FAILED) { // happens when the constructor did not work
|
|
return;
|
|
}
|
|
try {
|
|
status(); // This runs everything to to with state PENDING if needed!
|
|
} catch (std::exception const& e) {
|
|
LOG_TOPIC("e2d06", WARN, Logger::AGENCY)
|
|
<< "Exception caught in status() method: " << e.what();
|
|
finish(server, shard, false, e.what());
|
|
}
|
|
try {
|
|
if (_status == TODO) {
|
|
start(aborts);
|
|
} else if (_status == NOTFOUND) {
|
|
if (create(nullptr)) {
|
|
start(aborts);
|
|
}
|
|
}
|
|
} catch (std::exception const& e) {
|
|
LOG_TOPIC("5ac04", WARN, Logger::AGENCY) << "Exception caught in create() or "
|
|
"start() method: "
|
|
<< e.what();
|
|
finish("", "", false, e.what());
|
|
}
|
|
}
|
|
|
|
virtual Result abort(std::string const& reason) = 0;
|
|
|
|
virtual bool finish(std::string const& server, std::string const& shard,
|
|
bool success = true, std::string const& reason = std::string(),
|
|
query_t const payload = nullptr);
|
|
|
|
virtual JOB_STATUS status() = 0;
|
|
|
|
virtual bool create(std::shared_ptr<VPackBuilder> b) = 0;
|
|
|
|
// Returns if job was actually started (i.e. false if directly failed!)
|
|
virtual bool start(bool& aborts) = 0;
|
|
|
|
static bool abortable(Node const& snapshot, std::string const& jobId);
|
|
|
|
std::string id(std::string const& idOrShortName);
|
|
std::string uuidLookup(std::string const& shortID);
|
|
|
|
/// @brief Get a random server, which is not blocked, in good condition and
|
|
/// excluding "exclude" vector
|
|
static std::string randomIdleAvailableServer(Node const& snap,
|
|
std::vector<std::string> const& exclude);
|
|
static std::string randomIdleAvailableServer(Node const& snap, VPackSlice const& exclude);
|
|
static size_t countGoodOrBadServersInList(Node const& snap, VPackSlice const& serverList);
|
|
static size_t countGoodOrBadServersInList(Node const& snap, std::vector<std::string> const& serverList);
|
|
static bool isInServerList(Node const& snap, std::string const& prefix, std::string const& server, bool isArray);
|
|
|
|
/// @brief Get servers from plan, which are not failed or cleaned out
|
|
static std::vector<std::string> availableServers(const arangodb::consensus::Node&);
|
|
|
|
/// @brief Get servers from Supervision with health status GOOD
|
|
static std::vector<std::string> healthyServers(arangodb::consensus::Node const&);
|
|
|
|
static std::vector<shard_t> clones(Node const& snap, std::string const& db,
|
|
std::string const& col, std::string const& shrd);
|
|
|
|
static std::string findNonblockedCommonHealthyInSyncFollower(Node const& snap,
|
|
std::string const& db,
|
|
std::string const& col,
|
|
std::string const& shrd);
|
|
|
|
JOB_STATUS _status;
|
|
Node const& _snapshot;
|
|
AgentInterface* _agent;
|
|
std::string _jobId;
|
|
std::string _creator;
|
|
|
|
static std::string agencyPrefix; // will be initialized in AgencyFeature
|
|
|
|
std::shared_ptr<Builder> _jb;
|
|
|
|
static void doForAllShards(
|
|
Node const& snapshot, std::string& database, std::vector<shard_t>& shards,
|
|
std::function<void(Slice plan, Slice current, std::string& planPath, std::string& curPath)> worker);
|
|
|
|
// The following methods adds an operation to a transaction object or
|
|
// a condition to a precondition object. In all cases, the builder trx
|
|
// or pre must be in the state that an object has been opened, this
|
|
// method adds some attribute/value pairs and leaves the object open:
|
|
static void addIncreasePlanVersion(Builder& trx);
|
|
static void addRemoveJobFromSomewhere(Builder& trx, std::string const& where,
|
|
std::string const& jobId);
|
|
static void addPutJobIntoSomewhere(Builder& trx, std::string const& where,
|
|
Slice job, std::string const& reason = "");
|
|
static void addPreconditionCollectionStillThere(Builder& pre, std::string const& database,
|
|
std::string const& collection);
|
|
static void addBlockServer(Builder& trx, std::string const& server,
|
|
std::string const& jobId);
|
|
static void addBlockShard(Builder& trx, std::string const& shard, std::string const& jobId);
|
|
static void addReleaseServer(Builder& trx, std::string const& server);
|
|
static void addReleaseShard(Builder& trx, std::string const& shard);
|
|
static void addPreconditionServerNotBlocked(Builder& pre, std::string const& server);
|
|
static void addPreconditionServerHealth(Builder& pre, std::string const& server,
|
|
std::string const& health);
|
|
static void addPreconditionShardNotBlocked(Builder& pre, std::string const& shard);
|
|
static void addPreconditionUnchanged(Builder& pre, std::string const& key, Slice value);
|
|
static void addPreconditionJobStillInPending(Builder& pre, std::string const& jobId);
|
|
static std::string checkServerHealth(Node const& snapshot, std::string const& server);
|
|
};
|
|
|
|
inline arangodb::consensus::write_ret_t singleWriteTransaction(AgentInterface* _agent,
|
|
Builder const& transaction,
|
|
bool waitForCommit = true) {
|
|
query_t envelope = std::make_shared<Builder>();
|
|
|
|
Slice trx = transaction.slice();
|
|
try {
|
|
{
|
|
VPackArrayBuilder listOfTrxs(envelope.get());
|
|
VPackArrayBuilder onePair(envelope.get());
|
|
{
|
|
VPackObjectBuilder mutationPart(envelope.get());
|
|
for (auto const& pair : VPackObjectIterator(trx[0])) {
|
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
|
}
|
|
}
|
|
if (trx.length() > 1) {
|
|
VPackObjectBuilder preconditionPart(envelope.get());
|
|
for (auto const& pair : VPackObjectIterator(trx[1])) {
|
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
|
}
|
|
}
|
|
}
|
|
} catch (std::exception const& e) {
|
|
LOG_TOPIC("5be90", ERR, Logger::SUPERVISION)
|
|
<< "Supervision failed to build single-write transaction: " << e.what();
|
|
}
|
|
|
|
auto ret = _agent->write(envelope);
|
|
if (waitForCommit && !ret.indices.empty()) {
|
|
auto maximum = *std::max_element(ret.indices.begin(), ret.indices.end());
|
|
if (maximum > 0) { // some baby has worked
|
|
_agent->waitFor(maximum);
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
inline arangodb::consensus::trans_ret_t generalTransaction(AgentInterface* _agent,
|
|
Builder const& transaction) {
|
|
query_t envelope = std::make_shared<Builder>();
|
|
Slice trx = transaction.slice();
|
|
|
|
try {
|
|
{
|
|
VPackArrayBuilder listOfTrxs(envelope.get());
|
|
for (auto const& singleTrans : VPackArrayIterator(trx)) {
|
|
TRI_ASSERT(singleTrans.isArray() && singleTrans.length() > 0);
|
|
if (singleTrans[0].isObject()) {
|
|
VPackArrayBuilder onePair(envelope.get());
|
|
{
|
|
VPackObjectBuilder mutationPart(envelope.get());
|
|
for (auto const& pair : VPackObjectIterator(singleTrans[0])) {
|
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
|
}
|
|
}
|
|
if (singleTrans.length() > 1) {
|
|
VPackObjectBuilder preconditionPart(envelope.get());
|
|
for (auto const& pair : VPackObjectIterator(singleTrans[1])) {
|
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
|
}
|
|
}
|
|
} else if (singleTrans[0].isString()) {
|
|
VPackArrayBuilder reads(envelope.get());
|
|
for (auto const& path : VPackArrayIterator(singleTrans)) {
|
|
envelope->add(VPackValue("/" + Job::agencyPrefix + path.copyString()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (std::exception const& e) {
|
|
LOG_TOPIC("aae99", ERR, Logger::SUPERVISION)
|
|
<< "Supervision failed to build transaction: " << e.what();
|
|
}
|
|
|
|
auto ret = _agent->transact(envelope);
|
|
|
|
// This is for now disabled to speed up things. We wait after a full
|
|
// Supervision run, which is good enough.
|
|
//if (ret.maxind > 0) {
|
|
// _agent->waitFor(ret.maxind);
|
|
//
|
|
|
|
return ret;
|
|
}
|
|
|
|
inline arangodb::consensus::trans_ret_t transient(AgentInterface* _agent,
|
|
Builder const& transaction) {
|
|
query_t envelope = std::make_shared<Builder>();
|
|
|
|
Slice trx = transaction.slice();
|
|
try {
|
|
{
|
|
VPackArrayBuilder listOfTrxs(envelope.get());
|
|
VPackArrayBuilder onePair(envelope.get());
|
|
{
|
|
VPackObjectBuilder mutationPart(envelope.get());
|
|
for (auto const& pair : VPackObjectIterator(trx[0])) {
|
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
|
}
|
|
}
|
|
if (trx.length() > 1) {
|
|
VPackObjectBuilder preconditionPart(envelope.get());
|
|
for (auto const& pair : VPackObjectIterator(trx[1])) {
|
|
envelope->add("/" + Job::agencyPrefix + pair.key.copyString(), pair.value);
|
|
}
|
|
}
|
|
}
|
|
} catch (std::exception const& e) {
|
|
LOG_TOPIC("d03d5", ERR, Logger::SUPERVISION)
|
|
<< "Supervision failed to build transaction for transient: " << e.what();
|
|
}
|
|
|
|
return _agent->transient(envelope);
|
|
}
|
|
|
|
} // namespace consensus
|
|
} // namespace arangodb
|
|
|
|
#endif
|