mirror of https://gitee.com/bigwinds/arangodb
341 lines
12 KiB
C++
341 lines
12 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Tobias Gödderz
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "RebootTracker.h"
|
|
|
|
#include "Scheduler/SchedulerFeature.h"
|
|
#include "lib/Basics/Exceptions.h"
|
|
#include "lib/Basics/MutexLocker.h"
|
|
#include "lib/Basics/ScopeGuard.h"
|
|
#include "lib/Logger/LogMacros.h"
|
|
#include "lib/Logger/Logger.h"
|
|
|
|
#include <algorithm>
|
|
|
|
using namespace arangodb;
|
|
using namespace arangodb::cluster;
|
|
|
|
RebootTracker::RebootTracker(RebootTracker::SchedulerPointer scheduler)
|
|
: _scheduler(scheduler) {
|
|
// All the mocked application servers in the catch tests that use the
|
|
// ClusterFeature, which at some point instantiates this, do not start the
|
|
// SchedulerFeature. Thus this dies. However, we will be able to fix that at
|
|
// a central place later, as there is some refactoring going on there. Then
|
|
// this #ifdef can be removed.
|
|
#ifndef ARANGODB_USE_GOOGLE_TESTS
|
|
TRI_ASSERT(_scheduler != nullptr);
|
|
#endif
|
|
}
|
|
|
|
void RebootTracker::updateServerState(std::unordered_map<ServerID, RebootId> const& state) {
|
|
MUTEX_LOCKER(guard, _mutex);
|
|
|
|
// Call cb for each iterator.
|
|
auto for_each_iter = [](auto begin, auto end, auto cb) {
|
|
auto it = begin;
|
|
decltype(it) next;
|
|
while (it != end) {
|
|
// save next iterator now, in case cb invalidates it.
|
|
next = std::next(it);
|
|
cb(it);
|
|
it = next;
|
|
}
|
|
};
|
|
|
|
// For all known servers, look whether they are changed or were removed
|
|
for_each_iter(_rebootIds.begin(), _rebootIds.end(), [&](auto const curIt) {
|
|
auto const& serverId = curIt->first;
|
|
auto& oldRebootId = curIt->second;
|
|
auto const& newIt = state.find(serverId);
|
|
|
|
if (newIt == state.end()) {
|
|
// Try to schedule all callbacks for serverId.
|
|
// If that didn't throw, erase the entry.
|
|
scheduleAllCallbacksFor(serverId);
|
|
auto it = _callbacks.find(serverId);
|
|
if (it != _callbacks.end()) {
|
|
TRI_ASSERT(it->second.empty());
|
|
_callbacks.erase(it);
|
|
}
|
|
_rebootIds.erase(curIt);
|
|
} else {
|
|
TRI_ASSERT(serverId == newIt->first);
|
|
auto const& newRebootId = newIt->second;
|
|
TRI_ASSERT(oldRebootId <= newRebootId);
|
|
if (oldRebootId < newRebootId) {
|
|
LOG_TOPIC("88857", INFO, Logger::CLUSTER)
|
|
<< "Server " << serverId << " rebooted, aborting its old jobs now.";
|
|
// Try to schedule all callbacks for serverId older than newRebootId.
|
|
// If that didn't throw, erase the entry.
|
|
scheduleCallbacksFor(serverId, newRebootId);
|
|
oldRebootId = newRebootId;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Look whether there are servers that are still unknown
|
|
// (note: we could shortcut this and return if the sizes are equal, as at
|
|
// this point, all entries in _rebootIds are also in state)
|
|
for (auto const& newIt : state) {
|
|
auto const& serverId = newIt.first;
|
|
auto const& rebootId = newIt.second;
|
|
auto rv = _rebootIds.emplace(serverId, rebootId);
|
|
auto const inserted = rv.second;
|
|
// If we inserted a new server, we may NOT already have any callbacks for
|
|
// it!
|
|
TRI_ASSERT(!inserted || _callbacks.find(serverId) == _callbacks.end());
|
|
}
|
|
}
|
|
|
|
CallbackGuard RebootTracker::callMeOnChange(RebootTracker::PeerState const& peerState,
|
|
RebootTracker::Callback callback,
|
|
std::string callbackDescription) {
|
|
MUTEX_LOCKER(guard, _mutex);
|
|
|
|
auto const rebootIdIt = _rebootIds.find(peerState.serverId());
|
|
|
|
// We MUST NOT insert something in _callbacks[serverId] unless _rebootIds[serverId] exists!
|
|
if (rebootIdIt == _rebootIds.end()) {
|
|
std::string const error = [&]() {
|
|
std::stringstream strstream;
|
|
strstream << "When trying to register callback '" << callbackDescription << "': "
|
|
<< "The server " << peerState.serverId() << " is not known. "
|
|
<< "If this server joined the cluster in the last seconds, "
|
|
"this can happen.";
|
|
return strstream.str();
|
|
}();
|
|
LOG_TOPIC("76abc", INFO, Logger::CLUSTER) << error;
|
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_SERVER_UNKNOWN, error);
|
|
}
|
|
|
|
auto const currentRebootId = rebootIdIt->second;
|
|
|
|
if (peerState.rebootId() < currentRebootId) {
|
|
// If this ID is already older, schedule the callback immediately.
|
|
queueCallback(DescriptedCallback{std::move(callback), std::move(callbackDescription)});
|
|
return CallbackGuard{nullptr};
|
|
}
|
|
|
|
// For the given server, get the existing rebootId => [callbacks] map,
|
|
// or create a new one
|
|
auto& rebootIdMap = _callbacks[peerState.serverId()];
|
|
// For the given rebootId, get the existing callbacks map,
|
|
// or create a new one
|
|
auto& callbackMapPtr = rebootIdMap[peerState.rebootId()];
|
|
|
|
if (callbackMapPtr == nullptr) {
|
|
// We must never leave a nullptr in here!
|
|
// Try to create a new map, or remove the entry.
|
|
try {
|
|
callbackMapPtr =
|
|
std::make_shared<std::remove_reference<decltype(callbackMapPtr)>::type::element_type>();
|
|
} catch (...) {
|
|
rebootIdMap.erase(peerState.rebootId());
|
|
throw;
|
|
}
|
|
}
|
|
|
|
TRI_ASSERT(callbackMapPtr != nullptr);
|
|
|
|
auto& callbackMap = *callbackMapPtr;
|
|
|
|
auto const callbackId = getNextCallbackId();
|
|
|
|
// The guard constructor might, theoretically, throw, and so can constructing
|
|
// the std::function. So we need to construct it before emplacing the callback.
|
|
auto callbackGuard = CallbackGuard([this, peerState, callbackId]() {
|
|
unregisterCallback(peerState, callbackId);
|
|
});
|
|
|
|
auto emplaceRv =
|
|
callbackMap.emplace(callbackId, DescriptedCallback{std::move(callback),
|
|
std::move(callbackDescription)});
|
|
auto const iterator = emplaceRv.first;
|
|
bool const inserted = emplaceRv.second;
|
|
TRI_ASSERT(inserted);
|
|
TRI_ASSERT(callbackId == iterator->first);
|
|
|
|
return callbackGuard;
|
|
}
|
|
|
|
void RebootTracker::scheduleAllCallbacksFor(ServerID const& serverId) {
|
|
scheduleCallbacksFor(serverId, RebootId::max());
|
|
// Now the rebootId map of this server, if it exists, must be empty.
|
|
TRI_ASSERT(_callbacks.find(serverId) == _callbacks.end() ||
|
|
_callbacks.find(serverId)->second.empty());
|
|
}
|
|
|
|
// This function may throw.
|
|
// If (and only if) it returns, it has scheduled all affected callbacks, and
|
|
// removed them from the registry.
|
|
// Otherwise the state is unchanged.
|
|
void RebootTracker::scheduleCallbacksFor(ServerID const& serverId, RebootId rebootId) {
|
|
_mutex.assertLockedByCurrentThread();
|
|
|
|
auto serverIt = _callbacks.find(serverId);
|
|
if (serverIt != _callbacks.end()) {
|
|
auto& rebootMap = serverIt->second;
|
|
auto const begin = rebootMap.begin();
|
|
// lower_bounds returns the first iterator that is *not less than* rebootId
|
|
auto const end = rebootMap.lower_bound(rebootId);
|
|
|
|
std::vector<decltype(begin->second)> callbackSets;
|
|
callbackSets.reserve(std::distance(begin, end));
|
|
|
|
std::for_each(begin, end, [&callbackSets](auto it) {
|
|
callbackSets.emplace_back(it.second);
|
|
});
|
|
|
|
// could throw
|
|
queueCallbacks(std::move(callbackSets));
|
|
|
|
// If and only if we successfully scheduled all callbacks, we erase them
|
|
// from the registry.
|
|
rebootMap.erase(begin, end);
|
|
}
|
|
}
|
|
|
|
RebootTracker::Callback RebootTracker::createSchedulerCallback(
|
|
std::vector<std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>> callbacks) {
|
|
TRI_ASSERT(!callbacks.empty());
|
|
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
|
|
[](auto it) { return it == nullptr; }));
|
|
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
|
|
[](auto it) { return it->empty(); }));
|
|
|
|
return [callbacks = std::move(callbacks)]() {
|
|
LOG_TOPIC("80dfe", DEBUG, Logger::CLUSTER)
|
|
<< "Executing scheduled reboot callbacks";
|
|
TRI_ASSERT(!callbacks.empty());
|
|
for (auto const& callbacksPtr : callbacks) {
|
|
TRI_ASSERT(callbacksPtr != nullptr);
|
|
TRI_ASSERT(!callbacksPtr->empty());
|
|
for (auto const& it : *callbacksPtr) {
|
|
auto const& cb = it.second.callback;
|
|
auto const& descr = it.second.description;
|
|
LOG_TOPIC("afdfd", DEBUG, Logger::CLUSTER)
|
|
<< "Executing callback " << it.second.description;
|
|
try {
|
|
cb();
|
|
} catch (arangodb::basics::Exception const& ex) {
|
|
LOG_TOPIC("88a63", INFO, Logger::CLUSTER)
|
|
<< "Failed to execute reboot callback: " << descr << ": "
|
|
<< "[" << ex.code() << "] " << ex.what();
|
|
} catch (std::exception const& ex) {
|
|
LOG_TOPIC("3d935", INFO, Logger::CLUSTER)
|
|
<< "Failed to execute reboot callback: " << descr << ": " << ex.what();
|
|
} catch (...) {
|
|
LOG_TOPIC("f7427", INFO, Logger::CLUSTER)
|
|
<< "Failed to execute reboot callback: " << descr << ": "
|
|
<< "Unknown error.";
|
|
}
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
void RebootTracker::queueCallbacks(
|
|
std::vector<std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>> callbacks) {
|
|
if (callbacks.empty()) {
|
|
return;
|
|
}
|
|
|
|
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
|
|
[](auto it) { return it == nullptr; }));
|
|
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
|
|
[](auto it) { return it->empty(); }));
|
|
|
|
auto cb = createSchedulerCallback(std::move(callbacks));
|
|
|
|
if (!_scheduler->queue(RequestLane::CLUSTER_INTERNAL, std::move(cb))) {
|
|
THROW_ARANGO_EXCEPTION_MESSAGE(
|
|
TRI_ERROR_QUEUE_FULL,
|
|
"No available threads when trying to queue cleanup "
|
|
"callbacks due to a server reboot");
|
|
}
|
|
}
|
|
|
|
void RebootTracker::unregisterCallback(PeerState const& peerState,
|
|
RebootTracker::CallbackId callbackId) {
|
|
MUTEX_LOCKER(guard, _mutex);
|
|
|
|
auto const cbIt = _callbacks.find(peerState.serverId());
|
|
if (cbIt != _callbacks.end()) {
|
|
auto& rebootMap = cbIt->second;
|
|
auto const rbIt = rebootMap.find(peerState.rebootId());
|
|
if (rbIt != rebootMap.end()) {
|
|
auto& callbackSetPtr = rbIt->second;
|
|
TRI_ASSERT(callbackSetPtr != nullptr);
|
|
callbackSetPtr->erase(callbackId);
|
|
if (callbackSetPtr->empty()) {
|
|
rebootMap.erase(rbIt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
RebootTracker::CallbackId RebootTracker::getNextCallbackId() noexcept {
|
|
_mutex.assertLockedByCurrentThread();
|
|
|
|
CallbackId nextId = _nextCallbackId;
|
|
++_nextCallbackId;
|
|
return nextId;
|
|
}
|
|
|
|
void RebootTracker::queueCallback(DescriptedCallback callback) {
|
|
queueCallbacks({std::make_shared<std::unordered_map<CallbackId, DescriptedCallback>>(
|
|
std::unordered_map<CallbackId, DescriptedCallback>{
|
|
std::make_pair(getNextCallbackId(), std::move(callback))})});
|
|
}
|
|
|
|
CallbackGuard::CallbackGuard() : _callback(nullptr) {}
|
|
|
|
CallbackGuard::CallbackGuard(std::function<void(void)> callback)
|
|
: _callback(std::move(callback)) {}
|
|
|
|
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
|
|
CallbackGuard::CallbackGuard(CallbackGuard&& other)
|
|
: _callback(std::move(other._callback)) {
|
|
other._callback = nullptr;
|
|
}
|
|
|
|
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
|
|
CallbackGuard& CallbackGuard::operator=(CallbackGuard&& other) {
|
|
call();
|
|
_callback = std::move(other._callback);
|
|
other._callback = nullptr;
|
|
return *this;
|
|
}
|
|
|
|
CallbackGuard::~CallbackGuard() { call(); }
|
|
|
|
void CallbackGuard::callAndClear() {
|
|
call();
|
|
_callback = nullptr;
|
|
}
|
|
|
|
void CallbackGuard::call() {
|
|
if (_callback) {
|
|
_callback();
|
|
}
|
|
}
|