diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 49a4fbd48e..8fc1077745 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -65,7 +65,7 @@ const std::vector AgencyTransaction::TypeUrl( // ----------------------------------------------------------------------------- AgencyPrecondition::AgencyPrecondition() - : type(AgencyPrecondition::Type::NONE) {} + : type(AgencyPrecondition::Type::NONE), empty(true) {} AgencyPrecondition::AgencyPrecondition(std::string const& key, Type t, bool e) : key(AgencyCommManager::path(key)), type(t), empty(e) {} diff --git a/arangod/Agency/FailedFollower.cpp b/arangod/Agency/FailedFollower.cpp new file mode 100644 index 0000000000..59e88c6cb8 --- /dev/null +++ b/arangod/Agency/FailedFollower.cpp @@ -0,0 +1,276 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "FailedFollower.h" + +#include "Agency/Agent.h" +#include "Agency/Job.h" + +using namespace arangodb::consensus; + +FailedFollower::FailedFollower(Node const& snapshot, Agent* agent, + std::string const& jobId, + std::string const& creator, + std::string const& agencyPrefix, + std::string const& database, + std::string const& collection, + std::string const& shard, + std::string const& from, + std::string const& to) + : Job(snapshot, agent, jobId, creator, agencyPrefix), + _database(database), + _collection(collection), + _shard(shard), + _from(from), + _to(to) { + try { + JOB_STATUS js = status(); + + if (js == TODO) { + start(); + } else if (js == NOTFOUND) { + if (create()) { + start(); + } + } + } catch (std::exception const& e) { + LOG_TOPIC(DEBUG, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; + finish("Shards/" + _shard, false, e.what()); + } +} + +FailedFollower::~FailedFollower() {} + +bool FailedFollower::create() { + LOG_TOPIC(INFO, Logger::AGENCY) + << "Todo: failed Follower for " + _shard + " from " + _from + " to " + _to; + + std::string path = _agencyPrefix + toDoPrefix + _jobId; + + _jb = std::make_shared(); + _jb->openArray(); + _jb->openObject(); + + // Todo entry + _jb->add(path, VPackValue(VPackValueType::Object)); + _jb->add("creator", VPackValue(_creator)); + _jb->add("type", VPackValue("failedFollower")); + _jb->add("database", VPackValue(_database)); + _jb->add("collection", VPackValue(_collection)); + _jb->add("shard", VPackValue(_shard)); + _jb->add("fromServer", VPackValue(_from)); + _jb->add("toServer", VPackValue(_to)); + _jb->add("isLeader", VPackValue(false)); + _jb->add("jobId", VPackValue(_jobId)); + _jb->add("timeCreated", + VPackValue(timepointToString(std::chrono::system_clock::now()))); + _jb->close(); + + // Add shard to /arango/Target/FailedServers/ array + path = _agencyPrefix + failedServersPrefix + "/" + _from; + _jb->add(path, VPackValue(VPackValueType::Object)); + _jb->add("op", VPackValue("push")); + _jb->add("new", VPackValue(_shard)); + _jb->close(); + + _jb->close(); + _jb->close(); + + write_ret_t res = transact(_agent, *_jb); + + if (res.accepted && res.indices.size() == 1 && res.indices[0]) { + return true; + } + + LOG_TOPIC(INFO, Logger::AGENCY) << "Failed to insert job " + _jobId; + return false; +} + +bool FailedFollower::start() { + // DBservers + std::string planPath = + planColPrefix + _database + "/" + _collection + "/shards/" + _shard; + std::string curPath = + curColPrefix + _database + "/" + _collection + "/" + _shard + "/servers"; + + Node const& current = _snapshot(curPath); + + if (current.slice().length() == 1) { + LOG_TOPIC(ERR, Logger::AGENCY) << "Failed to shift follower role for shard " + + _shard + " from " + _from + " to " + + _to + ". No in-sync followers:" + + current.slice().toJson(); + return false; + } + + // Copy todo to pending + Builder todo, pending; + + // Get todo entry + todo.openArray(); + if (_jb == nullptr) { + try { + _snapshot(toDoPrefix + _jobId).toBuilder(todo); + } catch (std::exception const&) { + LOG_TOPIC(INFO, Logger::AGENCY) + << "Failed to get key " + toDoPrefix + _jobId + " from agency snapshot"; + return false; + } + } else { + todo.add(_jb->slice().get(_agencyPrefix + toDoPrefix + _jobId).valueAt(0)); + } + todo.close(); + + // Transaction + pending.openArray(); + + // Apply + // --- Add pending entry + pending.openObject(); + pending.add(_agencyPrefix + pendingPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("timeStarted", + VPackValue(timepointToString(std::chrono::system_clock::now()))); + for (auto const& obj : VPackObjectIterator(todo.slice()[0])) { + pending.add(obj.key.copyString(), obj.value); + } + pending.close(); + + // --- Remove todo entry + pending.add(_agencyPrefix + toDoPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("op", VPackValue("delete")); + pending.close(); + + // --- Add new server to the list + pending.add(_agencyPrefix + planPath, VPackValue(VPackValueType::Array)); + for(const auto& i : VPackArrayIterator(current.slice())) { + if (i.copyString() != _from) { + pending.add(i); + } else { + pending.add(VPackValue(_to)); + } + } + + pending.close(); + + // --- Block shard + pending.add(_agencyPrefix + blockedShardsPrefix + _shard, + VPackValue(VPackValueType::Object)); + pending.add("jobId", VPackValue(_jobId)); + pending.close(); + + // --- Increment Plan/Version + pending.add(_agencyPrefix + planVersion, VPackValue(VPackValueType::Object)); + pending.add("op", VPackValue("increment")); + pending.close(); + + pending.close(); + + // Precondition + // --- Check that Current servers are as we expect + pending.openObject(); + pending.add(_agencyPrefix + curPath, VPackValue(VPackValueType::Object)); + pending.add("old", current.slice()); + pending.close(); + + // --- Check if shard is not blocked + pending.add(_agencyPrefix + blockedShardsPrefix + _shard, + VPackValue(VPackValueType::Object)); + pending.add("oldEmpty", VPackValue(true)); + pending.close(); + + pending.close(); + pending.close(); + + // Transact + write_ret_t res = transact(_agent, pending); + + if (res.accepted && res.indices.size() == 1 && res.indices[0]) { + LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Change followership " + _shard + + " from " + _from + " to " + _to; + return true; + } + + LOG_TOPIC(INFO, Logger::AGENCY) + << "Precondition failed for starting job " + _jobId; + return false; +} + +JOB_STATUS FailedFollower::status() { + auto status = exists(); + + if (status != NOTFOUND) { // Get job details from agency + + try { + _database = _snapshot(pos[status] + _jobId + "/database").getString(); + _collection = _snapshot(pos[status] + _jobId + "/collection").getString(); + _from = _snapshot(pos[status] + _jobId + "/fromServer").getString(); + _to = _snapshot(pos[status] + _jobId + "/toServer").getString(); + _shard = _snapshot(pos[status] + _jobId + "/shard").getString(); + } catch (std::exception const& e) { + std::stringstream err; + err << "Failed to find job " << _jobId << " in agency: " << e.what(); + LOG_TOPIC(ERR, Logger::AGENCY) << err.str(); + finish("Shards/" + _shard, false, err.str()); + return FAILED; + } + } + + if (status == PENDING) { + Node const& job = _snapshot(pendingPrefix + _jobId); + std::string database = job("database").toJson(), + collection = job("collection").toJson(), + shard = job("shard").toJson(); + + std::string planPath = planColPrefix + database + "/" + collection + + "/shards/" + shard, + curPath = curColPrefix + database + "/" + collection + "/" + + shard + "/servers"; + + Node const& planned = _snapshot(planPath); + Node const& current = _snapshot(curPath); + + if (planned.slice() == current.slice()) { + + // Remove shard from /arango/Target/FailedServers/ array + Builder del; + del.openArray(); + del.openObject(); + std::string path = _agencyPrefix + failedServersPrefix + "/" + _from; + del.add(path, VPackValue(VPackValueType::Object)); + del.add("op", VPackValue("erase")); + del.add("val", VPackValue(_shard)); + del.close(); + del.close(); + del.close(); + write_ret_t res = transact(_agent, del); + + if (finish("Shards/" + shard)) { + return FINISHED; + } + } + } + + return status; +} diff --git a/arangod/Agency/FailedFollower.h b/arangod/Agency/FailedFollower.h new file mode 100644 index 0000000000..c356aabeb0 --- /dev/null +++ b/arangod/Agency/FailedFollower.h @@ -0,0 +1,57 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_CONSENSUS_FAILED_FOLLOWER_H +#define ARANGOD_CONSENSUS_FAILED_FOLLOWER_H 1 + +#include "Job.h" +#include "Supervision.h" + +namespace arangodb { +namespace consensus { + +struct FailedFollower : public Job { + FailedFollower(Node const& snapshot, Agent* agent, std::string const& jobId, + std::string const& creator, std::string const& agencyPrefix, + std::string const& database = std::string(), + std::string const& collection = std::string(), + std::string const& shard = std::string(), + std::string const& from = std::string(), + std::string const& to = std::string()); + + virtual ~FailedFollower(); + + virtual bool create() override; + virtual bool start() override; + virtual JOB_STATUS status() override; + + std::string _database; + std::string _collection; + std::string _shard; + std::string _from; + std::string _to; +}; +} +} // namespaces + +#endif diff --git a/arangod/Agency/FailedServer.cpp b/arangod/Agency/FailedServer.cpp index 394c0b88f2..3cbf815463 100644 --- a/arangod/Agency/FailedServer.cpp +++ b/arangod/Agency/FailedServer.cpp @@ -25,6 +25,7 @@ #include "Agency/Agent.h" #include "Agency/FailedLeader.h" +#include "Agency/FailedFollower.h" #include "Agency/Job.h" #include "Agency/UnassumedLeadership.h" @@ -138,15 +139,18 @@ bool FailedServer::start() { for (auto const& shard : collection("shards").children()) { VPackArrayIterator dbsit(shard.second->slice()); - // Only proceed if leader and create job - if ((*dbsit.begin()).copyString() != _server) { - continue; - } - - FailedLeader( + // Failed leader + if ((*dbsit.begin()).copyString() == _server) { + FailedLeader( _snapshot, _agent, _jobId + "-" + std::to_string(sub++), _jobId, _agencyPrefix, database.first, collptr.first, shard.first, _server, shard.second->slice()[1].copyString()); + } else { + FailedFollower( + _snapshot, _agent, _jobId + "-" + std::to_string(sub++), + _jobId, _agencyPrefix, database.first, collptr.first, + shard.first, _server, shard.second->slice()[1].copyString()); + } } } diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 5bd28cec96..ff063619ac 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -31,6 +31,7 @@ #include "GeneralServer/RestHandlerFactory.h" #include +#include #include #include @@ -239,8 +240,6 @@ bool Inception::restartingActiveAgent() { auto s = std::chrono::system_clock::now(); std::chrono::seconds timeout(60); - long waitInterval(500000); - // Can only be done responcibly, if we are complete if (myConfig.poolComplete()) { @@ -249,6 +248,8 @@ bool Inception::restartingActiveAgent() { CONDITION_LOCKER(guard, _cv); + long waitInterval(500000); + while (!this->isStopping() && !_agent->isStopping()) { active.erase( @@ -361,13 +362,17 @@ bool Inception::estimateRAFTInterval() { using namespace std::chrono; LOG_TOPIC(INFO, Logger::AGENCY) << "Estimating RAFT timeouts ..."; + size_t nrep = 25; + double precision = 1.0e-2; std::string path("/_api/agency/config"); - auto pool = _agent->config().pool(); - auto myid = _agent->id(); + auto config = _agent->config(); - for (size_t i = 0; i < 25; ++i) { - for (auto const& peer : pool) { + auto myid = _agent->id(); + double to = 0.25; + + for (size_t i = 0; i < nrep; ++i) { + for (auto const& peer : config.pool()) { if (peer.first != myid) { std::string clientid = peer.first + std::to_string(i); auto hf = @@ -379,11 +384,12 @@ bool Inception::estimateRAFTInterval() { 2.0, true); } } - std::this_thread::sleep_for(std::chrono::duration(1)); + std::this_thread::sleep_for(std::chrono::duration(to)); + to *= 1.01; } auto s = system_clock::now(); - seconds timeout(3); + seconds timeout(15); CONDITION_LOCKER(guard, _cv); @@ -393,7 +399,7 @@ bool Inception::estimateRAFTInterval() { { MUTEX_LOCKER(lock, _pLock); - if (_pings.size() == 25*(pool.size()-1)) { + if (_pings.size() == nrep*(config.size()-1)) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "All pings are in"; break; } @@ -433,14 +439,14 @@ bool Inception::estimateRAFTInterval() { measurement.add("max", VPackValue(mx)); measurement.close(); std::string measjson = measurement.toJson(); - + path = privApiPrefix + "measure"; - for (auto const& peer : pool) { + for (auto const& peer : config.pool()) { if (peer.first != myid) { auto clientId = "1"; auto comres = arangodb::ClusterComm::instance()->syncRequest( clientId, 1, peer.second, rest::RequestType::POST, path, - measjson, std::unordered_map(), 2.0); + measjson, std::unordered_map(), 5.0); } } @@ -455,7 +461,7 @@ bool Inception::estimateRAFTInterval() { { MUTEX_LOCKER(lock, _mLock); - if (_measurements.size() == pool.size()) { + if (_measurements.size() == config.size()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "All measurements are in"; break; } @@ -470,25 +476,36 @@ bool Inception::estimateRAFTInterval() { } - double maxmean = .0; - double maxstdev = .0; - for (auto const& meas : _measurements) { - if (maxmean < meas[0]) { - maxmean = meas[0]; - } - if (maxstdev < meas[1]) { - maxstdev = meas[1]; + if (_measurements.size() == config.size()) { + + double maxmean = .0; + double maxstdev = .0; + for (auto const& meas : _measurements) { + if (maxmean < meas[0]) { + maxmean = meas[0]; + } + if (maxstdev < meas[1]) { + maxstdev = meas[1]; + } } + + + mn = precision * + std::ceil((1./precision)*(.25 + precision*(maxmean+3*maxstdev))); + mx = 5. * mn; + + LOG_TOPIC(INFO, Logger::AGENCY) + << "Auto-adapting RAFT bracket to: {" + << std::fixed << std::setprecision(2) << mn << ", " << mx << "} seconds"; + + _agent->resetRAFTTimes(mn, mx); + + } else { + + return false; + } - mn = 1.e-3*std::ceil(1.e3*(.25 + 1.0e-3*(maxmean+3*maxstdev))); - mx = 5. * mn; - - LOG_TOPIC(INFO, Logger::AGENCY) - << "Auto-adapting RAFT bracket to: {" << mn << ", " << mx << "} seconds"; - - _agent->resetRAFTTimes(mn, mx); - } return true; diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 97376a32f3..9be3082c29 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -88,6 +88,7 @@ SET(ARANGOD_SOURCES Agency/AgentConfiguration.cpp Agency/CleanOutServer.cpp Agency/Constituent.cpp + Agency/FailedFollower.cpp Agency/FailedLeader.cpp Agency/FailedServer.cpp Agency/GossipCallback.cpp diff --git a/arangod/Indexes/EdgeIndex.cpp b/arangod/Indexes/EdgeIndex.cpp index 93ef1bd5f5..fdf8df8e45 100644 --- a/arangod/Indexes/EdgeIndex.cpp +++ b/arangod/Indexes/EdgeIndex.cpp @@ -150,7 +150,7 @@ IndexLookupResult EdgeIndexIterator::next() { } else { _lastElement = _buffer.back(); // found something - return IndexLookupResult(_buffer.at(_posInBuffer++).revisionId()); + return IndexLookupResult(_buffer[_posInBuffer++].revisionId()); } // found no result. now go to next lookup value in _keys diff --git a/arangod/Indexes/HashIndex.cpp b/arangod/Indexes/HashIndex.cpp index c8c39efa72..17e7334176 100644 --- a/arangod/Indexes/HashIndex.cpp +++ b/arangod/Indexes/HashIndex.cpp @@ -310,7 +310,7 @@ IndexLookupResult HashIndexIterator::next() { if (!_buffer.empty()) { // found something - return IndexLookupResult(_buffer.at(_posInBuffer++)->revisionId()); + return IndexLookupResult(_buffer[_posInBuffer++]->revisionId()); } } } @@ -346,7 +346,7 @@ void HashIndexIterator::nextBabies(std::vector& result, size_ } for (size_t i = _posInBuffer; i < atMost + _posInBuffer; ++i) { - result.emplace_back(_buffer.at(i)->revisionId()); + result.emplace_back(_buffer[i]->revisionId()); } _posInBuffer += atMost; return; @@ -405,7 +405,7 @@ IndexLookupResult HashIndexIteratorVPack::next() { if (!_buffer.empty()) { // found something - return IndexLookupResult(_buffer.at(_posInBuffer++)->revisionId()); + return IndexLookupResult(_buffer[_posInBuffer++]->revisionId()); } } } diff --git a/arangod/VocBase/datafile.cpp b/arangod/VocBase/datafile.cpp index 026528a9e9..a646f055a9 100644 --- a/arangod/VocBase/datafile.cpp +++ b/arangod/VocBase/datafile.cpp @@ -174,12 +174,14 @@ static int CreateDatafile(std::string const& filename, TRI_voc_size_t maximalSiz return -1; } -#ifdef __linux__ - // try fallocate first - int res = fallocate(fd, FALLOC_FL_ZERO_RANGE, 0, maximalSize); -#else // no fallocate present, or at least pretend it's not there... int res = TRI_ERROR_NOT_IMPLEMENTED; + +#ifdef __linux__ +#ifdef FALLOC_FL_ZERO_RANGE + // try fallocate + res = fallocate(fd, FALLOC_FL_ZERO_RANGE, 0, maximalSize); +#endif #endif if (res != TRI_ERROR_NO_ERROR) {