//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2019 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Tobias Gödderz //////////////////////////////////////////////////////////////////////////////// #include "RebootTracker.h" #include "Scheduler/SchedulerFeature.h" #include "lib/Basics/Exceptions.h" #include "lib/Basics/MutexLocker.h" #include "lib/Basics/ScopeGuard.h" #include "lib/Logger/LogMacros.h" #include "lib/Logger/Logger.h" #include using namespace arangodb; using namespace arangodb::cluster; RebootTracker::RebootTracker(RebootTracker::SchedulerPointer scheduler) : _scheduler(scheduler) { // All the mocked application servers in the catch tests that use the // ClusterFeature, which at some point instantiates this, do not start the // SchedulerFeature. Thus this dies. However, we will be able to fix that at // a central place later, as there is some refactoring going on there. Then // this #ifdef can be removed. #ifndef ARANGODB_USE_GOOGLE_TESTS TRI_ASSERT(_scheduler != nullptr); #endif } void RebootTracker::updateServerState(std::unordered_map const& state) { MUTEX_LOCKER(guard, _mutex); // Call cb for each iterator. auto for_each_iter = [](auto begin, auto end, auto cb) { auto it = begin; decltype(it) next; while (it != end) { // save next iterator now, in case cb invalidates it. next = std::next(it); cb(it); it = next; } }; // For all known servers, look whether they are changed or were removed for_each_iter(_rebootIds.begin(), _rebootIds.end(), [&](auto const curIt) { auto const& serverId = curIt->first; auto& oldRebootId = curIt->second; auto const& newIt = state.find(serverId); if (newIt == state.end()) { // Try to schedule all callbacks for serverId. // If that didn't throw, erase the entry. scheduleAllCallbacksFor(serverId); auto it = _callbacks.find(serverId); if (it != _callbacks.end()) { TRI_ASSERT(it->second.empty()); _callbacks.erase(it); } _rebootIds.erase(curIt); } else { TRI_ASSERT(serverId == newIt->first); auto const& newRebootId = newIt->second; TRI_ASSERT(oldRebootId <= newRebootId); if (oldRebootId < newRebootId) { LOG_TOPIC("88857", INFO, Logger::CLUSTER) << "Server " << serverId << " rebooted, aborting its old jobs now."; // Try to schedule all callbacks for serverId older than newRebootId. // If that didn't throw, erase the entry. scheduleCallbacksFor(serverId, newRebootId); oldRebootId = newRebootId; } } }); // Look whether there are servers that are still unknown // (note: we could shortcut this and return if the sizes are equal, as at // this point, all entries in _rebootIds are also in state) for (auto const& newIt : state) { auto const& serverId = newIt.first; auto const& rebootId = newIt.second; auto rv = _rebootIds.emplace(serverId, rebootId); auto const inserted = rv.second; // If we inserted a new server, we may NOT already have any callbacks for // it! TRI_ASSERT(!inserted || _callbacks.find(serverId) == _callbacks.end()); } } CallbackGuard RebootTracker::callMeOnChange(RebootTracker::PeerState const& peerState, RebootTracker::Callback callback, std::string callbackDescription) { MUTEX_LOCKER(guard, _mutex); auto const rebootIdIt = _rebootIds.find(peerState.serverId()); // We MUST NOT insert something in _callbacks[serverId] unless _rebootIds[serverId] exists! if (rebootIdIt == _rebootIds.end()) { std::string const error = [&]() { std::stringstream strstream; strstream << "When trying to register callback '" << callbackDescription << "': " << "The server " << peerState.serverId() << " is not known. " << "If this server joined the cluster in the last seconds, " "this can happen."; return strstream.str(); }(); LOG_TOPIC("76abc", INFO, Logger::CLUSTER) << error; THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_SERVER_UNKNOWN, error); } auto const currentRebootId = rebootIdIt->second; if (peerState.rebootId() < currentRebootId) { // If this ID is already older, schedule the callback immediately. queueCallback(DescriptedCallback{std::move(callback), std::move(callbackDescription)}); return CallbackGuard{nullptr}; } // For the given server, get the existing rebootId => [callbacks] map, // or create a new one auto& rebootIdMap = _callbacks[peerState.serverId()]; // For the given rebootId, get the existing callbacks map, // or create a new one auto& callbackMapPtr = rebootIdMap[peerState.rebootId()]; if (callbackMapPtr == nullptr) { // We must never leave a nullptr in here! // Try to create a new map, or remove the entry. try { callbackMapPtr = std::make_shared::type::element_type>(); } catch (...) { rebootIdMap.erase(peerState.rebootId()); throw; } } TRI_ASSERT(callbackMapPtr != nullptr); auto& callbackMap = *callbackMapPtr; auto const callbackId = getNextCallbackId(); // The guard constructor might, theoretically, throw, and so can constructing // the std::function. So we need to construct it before emplacing the callback. auto callbackGuard = CallbackGuard([this, peerState, callbackId]() { unregisterCallback(peerState, callbackId); }); auto emplaceRv = callbackMap.emplace(callbackId, DescriptedCallback{std::move(callback), std::move(callbackDescription)}); auto const iterator = emplaceRv.first; bool const inserted = emplaceRv.second; TRI_ASSERT(inserted); TRI_ASSERT(callbackId == iterator->first); return callbackGuard; } void RebootTracker::scheduleAllCallbacksFor(ServerID const& serverId) { scheduleCallbacksFor(serverId, RebootId::max()); // Now the rebootId map of this server, if it exists, must be empty. TRI_ASSERT(_callbacks.find(serverId) == _callbacks.end() || _callbacks.find(serverId)->second.empty()); } // This function may throw. // If (and only if) it returns, it has scheduled all affected callbacks, and // removed them from the registry. // Otherwise the state is unchanged. void RebootTracker::scheduleCallbacksFor(ServerID const& serverId, RebootId rebootId) { _mutex.assertLockedByCurrentThread(); auto serverIt = _callbacks.find(serverId); if (serverIt != _callbacks.end()) { auto& rebootMap = serverIt->second; auto const begin = rebootMap.begin(); // lower_bounds returns the first iterator that is *not less than* rebootId auto const end = rebootMap.lower_bound(rebootId); std::vectorsecond)> callbackSets; callbackSets.reserve(std::distance(begin, end)); std::for_each(begin, end, [&callbackSets](auto it) { callbackSets.emplace_back(it.second); }); // could throw queueCallbacks(std::move(callbackSets)); // If and only if we successfully scheduled all callbacks, we erase them // from the registry. rebootMap.erase(begin, end); } } RebootTracker::Callback RebootTracker::createSchedulerCallback( std::vector>> callbacks) { TRI_ASSERT(!callbacks.empty()); TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(), [](auto it) { return it == nullptr; })); TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(), [](auto it) { return it->empty(); })); return [callbacks = std::move(callbacks)]() { LOG_TOPIC("80dfe", DEBUG, Logger::CLUSTER) << "Executing scheduled reboot callbacks"; TRI_ASSERT(!callbacks.empty()); for (auto const& callbacksPtr : callbacks) { TRI_ASSERT(callbacksPtr != nullptr); TRI_ASSERT(!callbacksPtr->empty()); for (auto const& it : *callbacksPtr) { auto const& cb = it.second.callback; auto const& descr = it.second.description; LOG_TOPIC("afdfd", DEBUG, Logger::CLUSTER) << "Executing callback " << it.second.description; try { cb(); } catch (arangodb::basics::Exception const& ex) { LOG_TOPIC("88a63", INFO, Logger::CLUSTER) << "Failed to execute reboot callback: " << descr << ": " << "[" << ex.code() << "] " << ex.what(); } catch (std::exception const& ex) { LOG_TOPIC("3d935", INFO, Logger::CLUSTER) << "Failed to execute reboot callback: " << descr << ": " << ex.what(); } catch (...) { LOG_TOPIC("f7427", INFO, Logger::CLUSTER) << "Failed to execute reboot callback: " << descr << ": " << "Unknown error."; } } } }; } void RebootTracker::queueCallbacks( std::vector>> callbacks) { if (callbacks.empty()) { return; } TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(), [](auto it) { return it == nullptr; })); TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(), [](auto it) { return it->empty(); })); auto cb = createSchedulerCallback(std::move(callbacks)); if (!_scheduler->queue(RequestLane::CLUSTER_INTERNAL, std::move(cb))) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_QUEUE_FULL, "No available threads when trying to queue cleanup " "callbacks due to a server reboot"); } } void RebootTracker::unregisterCallback(PeerState const& peerState, RebootTracker::CallbackId callbackId) { MUTEX_LOCKER(guard, _mutex); auto const cbIt = _callbacks.find(peerState.serverId()); if (cbIt != _callbacks.end()) { auto& rebootMap = cbIt->second; auto const rbIt = rebootMap.find(peerState.rebootId()); if (rbIt != rebootMap.end()) { auto& callbackSetPtr = rbIt->second; TRI_ASSERT(callbackSetPtr != nullptr); callbackSetPtr->erase(callbackId); if (callbackSetPtr->empty()) { rebootMap.erase(rbIt); } } } } RebootTracker::CallbackId RebootTracker::getNextCallbackId() noexcept { _mutex.assertLockedByCurrentThread(); CallbackId nextId = _nextCallbackId; ++_nextCallbackId; return nextId; } void RebootTracker::queueCallback(DescriptedCallback callback) { queueCallbacks({std::make_shared>( std::unordered_map{ std::make_pair(getNextCallbackId(), std::move(callback))})}); } CallbackGuard::CallbackGuard() : _callback(nullptr) {} CallbackGuard::CallbackGuard(std::function callback) : _callback(std::move(callback)) {} // NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor) CallbackGuard::CallbackGuard(CallbackGuard&& other) : _callback(std::move(other._callback)) { other._callback = nullptr; } // NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor) CallbackGuard& CallbackGuard::operator=(CallbackGuard&& other) { call(); _callback = std::move(other._callback); other._callback = nullptr; return *this; } CallbackGuard::~CallbackGuard() { call(); } void CallbackGuard::callAndClear() { call(); _callback = nullptr; } void CallbackGuard::call() { if (_callback) { _callback(); } }