diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 5000ef0712..99131cde8d 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -316,31 +316,33 @@ bool Agent::load() { // Write new entries to replicated state and store write_ret_t Agent::write(query_t const& query) { - if (_constituent.leading()) { // Only working as leader - + + if (_constituent.leading()) { // Only leader + std::vector applied; std::vector indices; index_t maxind = 0; - + { MUTEX_LOCKER(mutexLocker, _ioLock); applied = _spearhead.apply(query); // Apply to spearhead indices = _state.log(query, applied, term(), id()); // Log w/ indicies } - + if (!indices.empty()) { maxind = *std::max_element(indices.begin(), indices.end()); } // _appendCV.signal(); // Wake up run - + reportIn(id(), maxind); - + return write_ret_t(true, id(), applied, indices); // Indices to wait for to rest - + } else { // Else we redirect return write_ret_t(false, _constituent.leaderID()); } + } // Read from store diff --git a/arangod/Agency/CleanOutServer.cpp b/arangod/Agency/CleanOutServer.cpp index e48cf22971..7609075fac 100644 --- a/arangod/Agency/CleanOutServer.cpp +++ b/arangod/Agency/CleanOutServer.cpp @@ -25,6 +25,7 @@ #include "Agent.h" #include "Job.h" +#include "MoveShard.h" using namespace arangodb::consensus; @@ -35,6 +36,9 @@ CleanOutServer::CleanOutServer ( Job(snapshot, agent, jobId, creator, prefix), _server(server) { if (exists()) { + if (_server == "") { + _server = _snapshot(pendingPrefix + _jobId + "/server").getString(); + } if (status() == TODO) { start(); } @@ -108,52 +112,67 @@ bool CleanOutServer::start() const { // Transact to agency write_ret_t res = transact(_agent, pending); - + if (res.accepted && res.indices.size()==1 && res.indices[0]) { - - LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Clean out server " + _server; - - Node::Children const& databases = - _snapshot("/Plan/Collections").children(); - - size_t sub = 0; - for (auto const& database : databases) { - for (auto const& collptr : database.second->children()) { - Node const& collection = *(collptr.second); - Node const& replicationFactor = collection("replicationFactor"); - if (replicationFactor.slice().getUInt() > 1) { - for (auto const& shard : collection("shards").children()) { - VPackArrayIterator dbsit(shard.second->slice()); - - // Only proceed if leader and create job - if ((*dbsit.begin()).copyString() != _server) { -/* MoveShardFromLeader ( - _snapshot, _agent, _jobId + "-" + std::to_string(sub++), - _jobId, _agencyPrefix, database.first, collptr.first, - shard.first, _server, shard.second->slice()[1].copyString());*/ - sub++; - } else { -/* MoveShardFromFollower ( - _snapshot, _agent, _jobId + "-" + std::to_string(sub++), - _jobId, _agencyPrefix, database.first, collptr.first, - shard.first, _server, shard.second->slice()[1].copyString());*/ - sub++; - } + LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Clean out server " + _server; - } - } - } + // Check if we can get things done in the first place + if (!checkFeasibility()) { + finish("DBServers/" + _server); + return false; } - - return true; - } + // Schedule shard relocations + scheduleMoveShards(); + + return true; + + } + LOG_TOPIC(INFO, Logger::AGENCY) << "Precondition failed for starting job " + _jobId; return false; - } +bool CleanOutServer::scheduleMoveShards() const { + return true; +} + +bool CleanOutServer::checkFeasibility () const { + + // Check if server is already in cleaned servers: fail! + Node::Children const& cleanedServers = + _snapshot("/Target/CleanedServers").children(); + for (auto const cleaned : cleanedServers) { + if (cleaned.first == _server) { + LOG_TOPIC(ERR, Logger::AGENCY) << _server << + " has been cleaned out already!"; + return false; + } + } + + // Determine number of available servers + Node::Children const& dbservers = _snapshot("/Plan/DBServers").children(); + uint64_t nservers = dbservers.size() - cleanedServers.size() - 1; + + // See if available servers after cleanout satisfy all replication factors + Node::Children const& databases = _snapshot("/Plan/Collections").children(); + for (auto const& database : databases) { + for (auto const& collptr : database.second->children()) { + try { + uint64_t replFactor = (*collptr.second)("replicationFactor").getUInt(); + if (replFactor > nservers) { + LOG_TOPIC(ERR, Logger::AGENCY) << + "Cannot house all shard replics after cleaning out " << _server; + return false; + } + } catch (...) {} + } + } + + return true; + +} diff --git a/arangod/Agency/CleanOutServer.h b/arangod/Agency/CleanOutServer.h index 5b4b48e633..cc95671b36 100644 --- a/arangod/Agency/CleanOutServer.h +++ b/arangod/Agency/CleanOutServer.h @@ -32,17 +32,21 @@ namespace consensus { struct CleanOutServer : public Job { - CleanOutServer (Node const& snapshot, Agent* agent, std::string const& jobId, - std::string const& creator, std::string const& prefix, - std::string const& server); - - virtual ~CleanOutServer (); - - virtual unsigned status () const override; - virtual bool create () const override; + CleanOutServer(Node const& snapshot, Agent* agent, std::string const& jobId, + std::string const& creator, std::string const& prefix, + std::string const& server); + + virtual ~CleanOutServer(); + + virtual unsigned status() const override; + virtual bool create() const override; virtual bool start() const override; + + // Check if all shards' replication factors can be satisfied after clean out. + bool checkFeasibility() const; + bool scheduleMoveShards() const; - std::string const& _server; + std::string _server; }; diff --git a/arangod/Agency/FailedLeader.cpp b/arangod/Agency/FailedLeader.cpp index 6549c1dcf0..72ec29c797 100644 --- a/arangod/Agency/FailedLeader.cpp +++ b/arangod/Agency/FailedLeader.cpp @@ -36,7 +36,7 @@ FailedLeader::FailedLeader( Job(snapshot, agent, jobId, creator, agencyPrefix), _database(database), _collection(collection), _shard(shard), _from(from), _to(to) { - try{ + try { if (exists()) { if (!status()) { start(); @@ -46,12 +46,10 @@ FailedLeader::FailedLeader( start(); } } catch (...) { - std::string tmp = shard; - if (tmp == "") { - Node const& job = _snapshot(pendingPrefix + _jobId); - tmp = job("shard").toJson(); + if (_shard == "") { + _shard = _snapshot(pendingPrefix + _jobId + "/shard").getString(); } - finish("Shards/" + tmp); + finish("Shards/" + _shard, false); } } diff --git a/arangod/Agency/FailedLeader.h b/arangod/Agency/FailedLeader.h index d67a4be82e..1c938019e5 100644 --- a/arangod/Agency/FailedLeader.h +++ b/arangod/Agency/FailedLeader.h @@ -32,8 +32,11 @@ namespace consensus { struct FailedLeader : public Job { - FailedLeader(Node const& snapshot, Agent* agent, std::string const& jobId, - std::string const& creator, std::string const& agencyPrefix, + FailedLeader(Node const& snapshot, + Agent* agent, + std::string const& jobId, + std::string const& creator, + std::string const& agencyPrefix, std::string const& database = std::string(), std::string const& collection = std::string(), std::string const& shard = std::string(), @@ -46,11 +49,11 @@ struct FailedLeader : public Job { virtual bool start() const override; virtual unsigned status () const override; - std::string const& _database; - std::string const& _collection; - std::string const& _shard; - std::string const& _from; - std::string const& _to; + std::string _database; + std::string _collection; + std::string _shard; + std::string _from; + std::string _to; }; diff --git a/arangod/Agency/FailedServer.cpp b/arangod/Agency/FailedServer.cpp index 4f06daccd7..b16617d5a2 100644 --- a/arangod/Agency/FailedServer.cpp +++ b/arangod/Agency/FailedServer.cpp @@ -32,8 +32,8 @@ using namespace arangodb::consensus; FailedServer::FailedServer(Node const& snapshot, Agent* agent, std::string const& jobId, std::string const& creator, std::string const& agencyPrefix, std::string const& failed) : - Job(snapshot, agent, jobId, creator, agencyPrefix), _failed(failed) { - + Job(snapshot, agent, jobId, creator, agencyPrefix), _server(failed) { + try { if (exists()) { if (status() == TODO) { @@ -44,9 +44,13 @@ FailedServer::FailedServer(Node const& snapshot, Agent* agent, std::string const start(); } } catch (...) { - finish("DBServers/" + _failed); + if (_server == "") { + _server = _snapshot(pendingPrefix + _jobId + "/server").getString(); + } + + finish("DBServers/" + _server, false); } - + } FailedServer::~FailedServer () {} @@ -88,7 +92,7 @@ bool FailedServer::start() const { pending.close(); // --- Block toServer - pending.add(_agencyPrefix + blockedServersPrefix + _failed, + pending.add(_agencyPrefix + blockedServersPrefix + _server, VPackValue(VPackValueType::Object)); pending.add("jobId", VPackValue(_jobId)); pending.close(); @@ -98,7 +102,7 @@ bool FailedServer::start() const { // Preconditions // --- Check that toServer not blocked pending.openObject(); - pending.add(_agencyPrefix + blockedServersPrefix + _failed, + pending.add(_agencyPrefix + blockedServersPrefix + _server, VPackValue(VPackValueType::Object)); pending.add("oldEmpty", VPackValue(true)); pending.close(); @@ -111,7 +115,7 @@ bool FailedServer::start() const { if (res.accepted && res.indices.size()==1 && res.indices[0]) { LOG_TOPIC(INFO, Logger::AGENCY) << - "Pending: DB Server " + _failed + " failed."; + "Pending: DB Server " + _server + " failed."; Node::Children const& databases = _snapshot("/Plan/Collections").children(); @@ -127,14 +131,14 @@ bool FailedServer::start() const { VPackArrayIterator dbsit(shard.second->slice()); // Only proceed if leader and create job - if ((*dbsit.begin()).copyString() != _failed) { + if ((*dbsit.begin()).copyString() != _server) { continue; } FailedLeader( _snapshot, _agent, _jobId + "-" + std::to_string(sub++), _jobId, _agencyPrefix, database.first, collptr.first, shard.first, - _failed, shard.second->slice()[1].copyString()); + _server, shard.second->slice()[1].copyString()); } } @@ -154,7 +158,7 @@ bool FailedServer::start() const { bool FailedServer::create () const { LOG_TOPIC(INFO, Logger::AGENCY) - << "Todo: DB Server " + _failed + " failed."; + << "Todo: DB Server " + _server + " failed."; std::string path = _agencyPrefix + toDoPrefix + _jobId; @@ -163,7 +167,7 @@ bool FailedServer::create () const { todo.openObject(); todo.add(path, VPackValue(VPackValueType::Object)); todo.add("type", VPackValue("failedServer")); - todo.add("server", VPackValue(_failed)); + todo.add("server", VPackValue(_server)); todo.add("jobId", VPackValue(_jobId)); todo.add("creator", VPackValue(_creator)); todo.add("timeCreated", @@ -206,7 +210,7 @@ unsigned FailedServer::status () const { } if (!found) { - if (finish("DBServers/" + _failed)) { + if (finish("DBServers/" + _server)) { return FINISHED; } } diff --git a/arangod/Agency/FailedServer.h b/arangod/Agency/FailedServer.h index bdabdab17f..b4f2d9515d 100644 --- a/arangod/Agency/FailedServer.h +++ b/arangod/Agency/FailedServer.h @@ -42,7 +42,7 @@ struct FailedServer : public Job { virtual bool create () const override; virtual unsigned status () const override; - std::string const& _failed; + std::string _server; }; diff --git a/arangod/Agency/MoveShard.cpp b/arangod/Agency/MoveShard.cpp new file mode 100644 index 0000000000..28c52db150 --- /dev/null +++ b/arangod/Agency/MoveShard.cpp @@ -0,0 +1,217 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "MoveShard.h" + +#include "Agent.h" +#include "Job.h" + +using namespace arangodb::consensus; + +MoveShard::MoveShard (Node const& snapshot, Agent* agent, + std::string const& jobId, std::string const& creator, + std::string const& prefix, std::string const& database, + std::string const& collection, std::string const& shard, + std::string const& from, std::string const& to) : + Job(snapshot, agent, jobId, creator, prefix), _database(database), + _collection(collection), _shard(shard), _from(from), _to(to) { + + try { + + if (exists()) { + + if (_shard == "") { + _shard = _snapshot(pendingPrefix + _jobId + "/shard").getString(); + } + if (_database == "") { + _database = _snapshot(pendingPrefix + _jobId + "/database").getString(); + } + if (_collection == "") { + _collection = + _snapshot(pendingPrefix + _jobId + "/collection").getString(); + } + if (_from == "") { + _from = _snapshot(pendingPrefix + _jobId + "/fromServer").getString(); + } + if (_to == "") { + _to = _snapshot(pendingPrefix + _jobId + "/toServer").getString(); + } + + if (!status()) { + start(); + } + + } else { + + create(); + start(); + + } + + } catch (...) { + finish("Shards/" + _shard, false); + } + + +} + +MoveShard::~MoveShard () {} + +bool MoveShard::create () const { + + LOG_TOPIC(INFO, Logger::AGENCY) + << "Todo: Move shard " + _shard + " from " + _from + " to " << _to; + + std::string path = _agencyPrefix + toDoPrefix + _jobId; + + Builder todo; + todo.openArray(); + todo.openObject(); + todo.add(path, VPackValue(VPackValueType::Object)); + todo.add("creator", VPackValue(_creator)); + todo.add("type", VPackValue("failedLeader")); + todo.add("database", VPackValue(_database)); + todo.add("collection", VPackValue(_collection)); + todo.add("shard", VPackValue(_shard)); + todo.add("fromServer", VPackValue(_from)); + todo.add("toServer", VPackValue(_to)); + + todo.add("isLeader", VPackValue(true)); + todo.add("jobId", VPackValue(_jobId)); + todo.add("timeCreated", + VPackValue(timepointToString(std::chrono::system_clock::now()))); + todo.close(); todo.close(); todo.close(); + + write_ret_t res = transact(_agent, todo); + + if (res.accepted && res.indices.size()==1 && res.indices[0]) { + return true; + } + + LOG_TOPIC(INFO, Logger::AGENCY) << "Failed to insert job " + _jobId; + return false; + +} + + +bool MoveShard::start() const { + + LOG_TOPIC(INFO, Logger::AGENCY) + << "Pending: Move shard " + _shard + " from " + _from + " to " << _to; + + // Copy todo to pending +/* Builder todo, pending; + + // Get todo entry + todo.openArray(); + _snapshot(toDoPrefix + _jobId).toBuilder(todo); + todo.close(); + + // Enter peding, remove todo, block toserver + pending.openArray(); + + // --- Add pending + pending.openObject(); + pending.add(_agencyPrefix + pendingPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("timeStarted", + VPackValue(timepointToString(std::chrono::system_clock::now()))); + for (auto const& obj : VPackObjectIterator(todo.slice()[0])) { + pending.add(obj.key.copyString(), obj.value); + } + pending.close(); + + // --- Delete todo + pending.add(_agencyPrefix + toDoPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("op", VPackValue("delete")); + pending.close(); + + // --- Block toServer + pending.add(_agencyPrefix + blockedServersPrefix + _server, + VPackValue(VPackValueType::Object)); + pending.add("jobId", VPackValue(_jobId)); + pending.close(); + + // --- Announce in Sync that server is cleaning out + pending.add(_agencyPrefix + serverStatePrefix + _server, + VPackValue(VPackValueType::Object)); + pending.add("cleaning", VPackValue(true)); + pending.close(); + + pending.close(); + + // Preconditions + // --- Check that toServer not blocked + pending.openObject(); + pending.add(_agencyPrefix + blockedServersPrefix + _server, + VPackValue(VPackValueType::Object)); + pending.add("oldEmpty", VPackValue(true)); + pending.close(); + + pending.close(); pending.close(); + + // Transact to agency + write_ret_t res = transact(_agent, pending); + + if (res.accepted && res.indices.size()==1 && res.indices[0]) { + + LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Clean out server " + _server; + + Node::Children const& databases = + _snapshot("/Plan/Collections").children(); + + size_t sub = 0; + + for (auto const& database : databases) { + for (auto const& collptr : database.second->children()) { + Node const& collection = *(collptr.second); + Node const& replicationFactor = collection("replicationFactor"); + if (replicationFactor.slice().getUInt() > 1) { + for (auto const& shard : collection("shards").children()) { + VPackArrayIterator dbsit(shard.second->slice()); + + MoveShard ( + _snapshot, _agent, _jobId + "-" + std::to_string(sub++), + _jobId, _agencyPrefix, database.first, collptr.first, + shard.first, _server, _server); + + } + } + } + } + + return true; + } + + LOG_TOPIC(INFO, Logger::AGENCY) << + "Precondition failed for starting job " + _jobId; +*/ + return false; + +} + +unsigned MoveShard::status () const { + return 0; +} + diff --git a/arangod/Agency/MoveShard.h b/arangod/Agency/MoveShard.h new file mode 100644 index 0000000000..7e1b05cc64 --- /dev/null +++ b/arangod/Agency/MoveShard.h @@ -0,0 +1,62 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_CONSENSUS_MOVE_SHARD_FROM_FOLLOWER_H +#define ARANGOD_CONSENSUS_MOVE_SHARD_FROM_FOLLOWER_H 1 + +#include "Job.h" +#include "Supervision.h" + +namespace arangodb { +namespace consensus { + +struct MoveShard : public Job { + + MoveShard (Node const& snapshot, + Agent* agent, + std::string const& jobId, + std::string const& creator, + std::string const& prefix, + std::string const& database = std::string(), + std::string const& collection = std::string(), + std::string const& shard = std::string(), + std::string const& from = std::string(), + std::string const& to = std::string()); + + virtual ~MoveShard (); + + virtual unsigned status () const override; + virtual bool create () const override; + virtual bool start() const override; + + std::string _database; + std::string _collection; + std::string _shard; + std::string _from; + std::string _to; + +}; + +}} + +#endif diff --git a/arangod/Agency/Node.cpp b/arangod/Agency/Node.cpp index 04de6be61f..e4536d41c0 100644 --- a/arangod/Agency/Node.cpp +++ b/arangod/Agency/Node.cpp @@ -611,3 +611,39 @@ std::vector Node::exists(std::vector const& rel) const std::vector Node::exists(std::string const& rel) const { return exists(split(rel, '/')); } + +int Node::getInt() const { + + if (type() == NODE) { + throw StoreException("Must not convert NODE type to int"); + } + return slice().getInt(); + +} + +uint64_t Node::getUInt() const { + + if (type() == NODE) { + throw StoreException("Must not convert NODE type to unsigned int"); + } + return slice().getUInt(); + +} + +double Node::getDouble() const { + + if (type() == NODE) { + throw StoreException("Must not convert NODE type to int"); + } + return slice().getDouble(); + +} + +std::string Node::getString() const { + + if (type() == NODE) { + throw StoreException("Must not convert NODE type to string"); + } + return slice().copyString(); + +} diff --git a/arangod/Agency/Node.h b/arangod/Agency/Node.h index e1c40f3165..57ff24bb8a 100644 --- a/arangod/Agency/Node.h +++ b/arangod/Agency/Node.h @@ -202,6 +202,18 @@ class Node { /// @brief Part of relative path which exists std::vector exists(std::string const&) const; + /// @brief Get integer value (throws if type NODE or if conversion fails) + int getInt() const; + + /// @brief Get insigned value (throws if type NODE or if conversion fails) + uint64_t getUInt() const; + + /// @brief Get double value (throws if type NODE or if conversion fails) + double getDouble() const; + + /// @brief Get string value (throws if type NODE or if conversion fails) + std::string getString() const; + protected: /// @brief Add time to live entry virtual bool addTimeToLive(long millis); diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 74f17f9288..3268760f70 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -91,7 +91,13 @@ bool State::persist(arangodb::consensus::index_t index, term_t term, if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } - OperationResult result = trx.insert("log", body.slice(), _options); + OperationResult result; + try { + result = trx.insert("log", body.slice(), _options); + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) << + "Failed to persist log entry:" << e.what(); + } res = trx.finish(result.code); return (res == TRI_ERROR_NO_ERROR); @@ -113,7 +119,7 @@ std::vector State::log( buf->append((char const*)i[0].begin(), i[0].byteSize()); idx[j] = _log.back().index + 1; _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM - persist(idx[j], term, lid, i[0]); // log to disk + persist(idx[j], term, lid, i[0]); // log to disk ++j; } } diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index f5d929da5e..e43b53b2b8 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -87,6 +87,7 @@ add_executable(${BIN_ARANGOD} Agency/CleanOutServer.cpp Agency/FailedLeader.cpp Agency/FailedServer.cpp + Agency/MoveShard.cpp Agency/NotifierThread.cpp Agency/NotifyCallback.cpp Agency/Node.cpp