mirror of https://gitee.com/bigwinds/arangodb
failed servers are excluded from new shard creation
This commit is contained in:
parent
fc60183490
commit
b3b7d7c907
|
@ -190,7 +190,7 @@ bool FailedServer::create () {
|
||||||
_jb->add("op", VPackValue("push"));
|
_jb->add("op", VPackValue("push"));
|
||||||
_jb->add("new", VPackValue(_server));
|
_jb->add("new", VPackValue(_server));
|
||||||
_jb->close();
|
_jb->close();
|
||||||
|
|
||||||
// Raise plan version
|
// Raise plan version
|
||||||
path = _agencyPrefix + planVersion;
|
path = _agencyPrefix + planVersion;
|
||||||
_jb->add(path, VPackValue(VPackValueType::Object));
|
_jb->add(path, VPackValue(VPackValueType::Object));
|
||||||
|
|
|
@ -45,8 +45,8 @@ static std::string const pendingPrefix = "/Target/Pending/";
|
||||||
static std::string const failedPrefix = "/Target/Failed/";
|
static std::string const failedPrefix = "/Target/Failed/";
|
||||||
static std::string const finishedPrefix = "/Target/Finished/";
|
static std::string const finishedPrefix = "/Target/Finished/";
|
||||||
static std::string const toDoPrefix = "/Target/ToDo/";
|
static std::string const toDoPrefix = "/Target/ToDo/";
|
||||||
static std::string const cleanedPrefix = "Target/CleanedServers";
|
static std::string const cleanedPrefix = "/Target/CleanedServers";
|
||||||
static std::string const failedServersPrefix = "Target/FailedServers";
|
static std::string const failedServersPrefix = "/Target/FailedServers";
|
||||||
static std::string const planColPrefix = "/Plan/Collections/";
|
static std::string const planColPrefix = "/Plan/Collections/";
|
||||||
static std::string const curColPrefix = "/Current/Collections/";
|
static std::string const curColPrefix = "/Current/Collections/";
|
||||||
static std::string const blockedServersPrefix = "/Supervision/DBServers/";
|
static std::string const blockedServersPrefix = "/Supervision/DBServers/";
|
||||||
|
|
|
@ -612,7 +612,7 @@ bool Supervision::updateAgencyPrefix (size_t nTries, int intervalSec) {
|
||||||
while (!this->isStopping()) {
|
while (!this->isStopping()) {
|
||||||
_snapshot = _agent->readDB().get("/");
|
_snapshot = _agent->readDB().get("/");
|
||||||
if (_snapshot.children().size() > 0) {
|
if (_snapshot.children().size() > 0) {
|
||||||
_agencyPrefix = std::string("/") + _snapshot.children().begin()->first;
|
_agencyPrefix = "/arango";//std::string("/") + _snapshot.children().begin()->first;
|
||||||
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Agency prefix is " << _agencyPrefix;
|
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Agency prefix is " << _agencyPrefix;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1103,7 +1103,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
||||||
transaction.operations.push_back(createCollection);
|
transaction.operations.push_back(createCollection);
|
||||||
transaction.operations.push_back(increaseVersion);
|
transaction.operations.push_back(increaseVersion);
|
||||||
transaction.preconditions.push_back(precondition);
|
transaction.preconditions.push_back(precondition);
|
||||||
|
|
||||||
AgencyCommResult res = ac.sendTransactionWithFailover(transaction);
|
AgencyCommResult res = ac.sendTransactionWithFailover(transaction);
|
||||||
|
|
||||||
if (!res.successful()) {
|
if (!res.successful()) {
|
||||||
|
@ -2051,6 +2051,8 @@ void ClusterInfo::loadCurrentCoordinators() {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
static std::string const prefixCurrentDBServers = "Current/DBServers";
|
static std::string const prefixCurrentDBServers = "Current/DBServers";
|
||||||
|
static std::string const prefixTargetCleaned = "Target/CleanedOutServers";
|
||||||
|
static std::string const prefixTargetFailed = "Target/FailedServers";
|
||||||
|
|
||||||
void ClusterInfo::loadCurrentDBServers() {
|
void ClusterInfo::loadCurrentDBServers() {
|
||||||
uint64_t storedVersion = _DBServersProt.version;
|
uint64_t storedVersion = _DBServersProt.version;
|
||||||
|
@ -2062,17 +2064,50 @@ void ClusterInfo::loadCurrentDBServers() {
|
||||||
|
|
||||||
// Now contact the agency:
|
// Now contact the agency:
|
||||||
AgencyCommResult result = _agency.getValues(prefixCurrentDBServers);
|
AgencyCommResult result = _agency.getValues(prefixCurrentDBServers);
|
||||||
|
AgencyCommResult failed = _agency.getValues(prefixTargetFailed);
|
||||||
|
AgencyCommResult cleaned = _agency.getValues(prefixTargetCleaned);
|
||||||
|
|
||||||
if (result.successful()) {
|
if (result.successful()) {
|
||||||
|
|
||||||
velocypack::Slice currentDBServers =
|
velocypack::Slice currentDBServers =
|
||||||
result.slice()[0].get(std::vector<std::string>(
|
result.slice()[0].get(std::vector<std::string>(
|
||||||
{AgencyComm::prefix(), "Current", "DBServers"}));
|
{AgencyComm::prefix(), "Current", "DBServers"}));
|
||||||
|
velocypack::Slice failedDBServers =
|
||||||
|
failed.slice()[0].get(std::vector<std::string>(
|
||||||
|
{AgencyComm::prefix(), "Target", "FailedServers"}));
|
||||||
|
velocypack::Slice cleanedDBServers =
|
||||||
|
cleaned.slice()[0].get(std::vector<std::string>(
|
||||||
|
{AgencyComm::prefix(), "Target", "CleanedOutServers"}));
|
||||||
|
|
||||||
if (currentDBServers.isObject()) {
|
if (currentDBServers.isObject()) {
|
||||||
decltype(_DBServers) newDBServers;
|
decltype(_DBServers) newDBServers;
|
||||||
|
|
||||||
for (auto const& dbserver : VPackObjectIterator(currentDBServers)) {
|
for (auto const& dbserver : VPackObjectIterator(currentDBServers)) {
|
||||||
|
if (failedDBServers.isArray()) {
|
||||||
|
bool found = false;
|
||||||
|
for (auto const& failedServer : VPackArrayIterator(failedDBServers)) {
|
||||||
|
if (dbserver.key == failedServer) {
|
||||||
|
found = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (found) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cleanedDBServers.isArray()) {
|
||||||
|
bool found = false;
|
||||||
|
for (auto const& cleanedServer : VPackArrayIterator(cleanedDBServers)) {
|
||||||
|
if (dbserver.key == cleanedServer) {
|
||||||
|
found = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (found) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
newDBServers.emplace(
|
newDBServers.emplace(
|
||||||
std::make_pair(dbserver.key.copyString(), dbserver.value.copyString()));
|
std::make_pair(dbserver.key.copyString(), dbserver.value.copyString()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1979,6 +1979,7 @@ std::map<std::string, std::vector<std::string>> distributeShards(
|
||||||
std::map<std::string, std::vector<std::string>> shards;
|
std::map<std::string, std::vector<std::string>> shards;
|
||||||
|
|
||||||
ClusterInfo* ci = ClusterInfo::instance();
|
ClusterInfo* ci = ClusterInfo::instance();
|
||||||
|
ci->loadCurrentDBServers();
|
||||||
if (dbServers.size() == 0) {
|
if (dbServers.size() == 0) {
|
||||||
dbServers = ci->getCurrentDBServers();
|
dbServers = ci->getCurrentDBServers();
|
||||||
if (dbServers.empty()) {
|
if (dbServers.empty()) {
|
||||||
|
|
|
@ -83,6 +83,7 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
|
||||||
--server.threads 16 \
|
--server.threads 16 \
|
||||||
--agency.compaction-step-size $COMP \
|
--agency.compaction-step-size $COMP \
|
||||||
--log.force-direct true \
|
--log.force-direct true \
|
||||||
|
--log.level agency=debug \
|
||||||
> cluster/$port.stdout 2>&1 &
|
> cluster/$port.stdout 2>&1 &
|
||||||
done
|
done
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue