1
0
Fork 0

[3.4] Feature/rebootid notice changes, backport of #9523 (#9685)

* Feature/rebootid notice changes, backport of #9523

* Move to 3.4 and C++11-compatible code (except for test code)

* Backported tests/Mocks/Servers.{h,cpp}

* Rebuilt errorfiles

* Ported CallbackGuardTest from gtest to catch

* Ported RebootTrackerTest from gtest to catch

* Make sure the state method is called, so the overridden method is used during tests

* Fixed test to work with the old scheduler

* release version 3.4.8

* [3.4] fix agency lockup when removing 404-ed callbacks (#9839)

* Update arangod/Cluster/ServerState.cpp

Co-Authored-By: Markus Pfeiffer <markuspf@users.noreply.github.com>

* Instantiate the scheduler during ::prepare()

* Fix test crash introduced during backport

* Fix a compile error on Windows (thanks, V8)
This commit is contained in:
Tobias Gödderz 2019-09-19 14:03:39 +02:00 committed by KVS85
parent 31e4f9d1dc
commit dc2e27db6c
32 changed files with 2084 additions and 61 deletions

View File

@ -762,6 +762,7 @@ endif ()
option(USE_CATCH_TESTS "Compile catch C++ tests" ON)
if (USE_CATCH_TESTS)
add_definitions("-DTEST_VIRTUAL=virtual")
add_definitions("-DARANGODB_USE_CATCH_TESTS=1")
else()
add_definitions("-DTEST_VIRTUAL=")
endif()

View File

@ -22,7 +22,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "AqlTransaction.h"
#include "Logger/Logger.h"
#include "StorageEngine/TransactionCollection.h"
#include "StorageEngine/TransactionState.h"
#include "Utils/CollectionNameResolver.h"
@ -32,29 +32,32 @@
#include "Enterprise/Transaction/IgnoreNoAccessAqlTransaction.h"
#endif
#include <memory>
using namespace arangodb;
using namespace arangodb::aql;
AqlTransaction* AqlTransaction::create(
std::shared_ptr<AqlTransaction> AqlTransaction::create(
std::shared_ptr<transaction::Context> const& transactionContext,
std::map<std::string, aql::Collection*> const* collections,
transaction::Options const& options, bool isMainTransaction,
std::unordered_set<std::string> inaccessibleCollections) {
#ifdef USE_ENTERPRISE
if (options.skipInaccessibleCollections) {
return new transaction::IgnoreNoAccessAqlTransaction(transactionContext, collections,
options, isMainTransaction,
inaccessibleCollections);
return std::make_shared<transaction::IgnoreNoAccessAqlTransaction>(
transactionContext, collections, options, isMainTransaction, std::move(inaccessibleCollections));
}
#endif
return new AqlTransaction(transactionContext, collections, options, isMainTransaction);
return std::shared_ptr<AqlTransaction>(
new AqlTransaction(transactionContext, collections, options, isMainTransaction));
}
/// @brief clone, used to make daughter transactions for parts of a
/// distributed AQL query running on the coordinator
transaction::Methods* AqlTransaction::clone(transaction::Options const& options) const {
std::shared_ptr<transaction::Methods> AqlTransaction::clone(transaction::Options const& options) const {
auto ctx = transaction::StandaloneContext::Create(vocbase());
return new AqlTransaction(ctx, &_collections, options, false);
return std::shared_ptr<AqlTransaction>(new AqlTransaction(ctx, &_collections, options, false));
}
/// @brief add a collection to the transaction

View File

@ -39,7 +39,7 @@ class AqlTransaction : public transaction::Methods {
public:
/// @brief create the transaction and add all collections
/// from the query context
static AqlTransaction* create(std::shared_ptr<transaction::Context> const& transactionContext,
static std::shared_ptr<AqlTransaction> create(std::shared_ptr<transaction::Context> const& transactionContext,
std::map<std::string, aql::Collection*> const* collections,
transaction::Options const& options, bool isMainTransaction,
std::unordered_set<std::string> inaccessibleCollections =
@ -69,7 +69,7 @@ class AqlTransaction : public transaction::Methods {
/// @brief clone, used to make daughter transactions for parts of a
/// distributed AQL query running on the coordinator
transaction::Methods* clone(transaction::Options const&) const override;
std::shared_ptr<transaction::Methods> clone(transaction::Options const&) const override;
/// @brief lockCollections, this is needed in a corner case in AQL: we need
/// to lock all shards in a controlled way when we set up a distributed

View File

@ -455,13 +455,11 @@ ExecutionPlan* Query::preparePlan() {
}
#endif
std::unique_ptr<AqlTransaction> trx(
AqlTransaction::create(std::move(ctx), _collections.collections(),
_queryOptions.transactionOptions,
_part == PART_MAIN, inaccessibleCollections));
TRI_DEFER(trx.release());
auto trx = AqlTransaction::create(ctx, _collections.collections(),
_queryOptions.transactionOptions,
_part == PART_MAIN, inaccessibleCollections);
// create the transaction object, but do not start it yet
_trx = trx.get();
_trx = trx;
if (!trx->transactionContextPtr()->getParentTransaction()) {
trx->addHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL);
@ -651,7 +649,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
AqlValue const& val = res.second->getValueReference(i, resultRegister);
if (!val.isEmpty()) {
val.toVelocyPack(_trx, resultBuilder, useQueryCache);
val.toVelocyPack(_trx.get(), resultBuilder, useQueryCache);
}
}
@ -848,10 +846,10 @@ ExecutionState Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry,
AqlValue const& val = value->getValueReference(i, resultRegister);
if (!val.isEmpty()) {
resArray->Set(j++, val.toV8(isolate, _trx));
resArray->Set(j++, val.toV8(isolate, _trx.get()));
if (useQueryCache) {
val.toVelocyPack(_trx, *builder, true);
val.toVelocyPack(_trx.get(), *builder, true);
}
}
@ -1383,7 +1381,6 @@ ExecutionState Query::cleanupPlanAndEngine(int errorCode, VPackBuilder* statsBui
}
// If the transaction was not committed, it is automatically aborted
delete _trx;
_trx = nullptr;
_plan.reset();

View File

@ -118,8 +118,8 @@ class Query {
QueryString const& queryString() const { return _queryString; }
/// @brief Inject a transaction from outside. Use with care!
void injectTransaction(transaction::Methods* trx) {
_trx = trx;
void injectTransaction(std::shared_ptr<transaction::Methods> trx) {
_trx = std::move(trx);
init();
}
@ -243,7 +243,7 @@ class Query {
TEST_VIRTUAL void setEngine(ExecutionEngine* engine);
/// @brief return the transaction, if prepared
TEST_VIRTUAL inline transaction::Methods* trx() { return _trx; }
TEST_VIRTUAL inline transaction::Methods* trx() { return _trx.get(); }
/// @brief get the plan for the query
ExecutionPlan* plan() const { return _plan.get(); }
@ -404,7 +404,7 @@ class Query {
/// @brief the transaction object, in a distributed query every part of
/// the query has its own transaction object. The transaction object is
/// created in the prepare method.
transaction::Methods* _trx;
std::shared_ptr<transaction::Methods> _trx;
/// @brief the ExecutionEngine object, if the query is prepared
std::unique_ptr<ExecutionEngine> _engine;

View File

@ -297,6 +297,7 @@ SET(ARANGOD_SOURCES
Cluster/ClusterRepairDistributeShardsLike.cpp
Cluster/ClusterRepairOperations.cpp
Cluster/ClusterTraverser.cpp
Cluster/ClusterTypes.cpp
Cluster/CreateCollection.cpp
Cluster/CreateDatabase.cpp
Cluster/CriticalThread.cpp
@ -313,6 +314,7 @@ SET(ARANGOD_SOURCES
Cluster/MaintenanceRestHandler.cpp
Cluster/MaintenanceWorker.cpp
Cluster/NonAction.cpp
Cluster/RebootTracker.cpp
Cluster/ReplicationTimeoutFeature.cpp
Cluster/ResignShardLeadership.cpp
Cluster/RestAgencyCallbacksHandler.cpp

View File

@ -33,6 +33,7 @@
#include "Basics/hashes.h"
#include "Cluster/ClusterCollectionCreationInfo.h"
#include "Cluster/ClusterHelpers.h"
#include "Cluster/RebootTracker.h"
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "Random/RandomGenerator.h"
@ -162,6 +163,7 @@ static inline arangodb::AgencyOperation CreateCollectionSuccess(
#endif
using namespace arangodb;
using namespace arangodb::cluster;
static std::unique_ptr<ClusterInfo> _instance;
@ -240,6 +242,7 @@ ClusterInfo* ClusterInfo::instance() { return _instance.get(); }
ClusterInfo::ClusterInfo(AgencyCallbackRegistry* agencyCallbackRegistry)
: _agency(),
_agencyCallbackRegistry(agencyCallbackRegistry),
_rebootTracker(SchedulerFeature::SCHEDULER),
_planVersion(0),
_currentVersion(0),
_planLoader(std::thread::id()),
@ -965,6 +968,13 @@ void ClusterInfo::loadPlan() {
static std::string const prefixCurrent = "Current";
void ClusterInfo::loadCurrent() {
// We need to update ServersKnown to notice rebootId changes for all servers.
// To keep things simple and separate, we call loadServers here instead of
// trying to integrate the servers upgrade code into loadCurrent, even if that
// means small bits of the plan are read twice.
loadServers();
++_currentProt.wantedVersion; // Indicate that after *NOW* somebody has to
// reread from the agency!
MUTEX_LOCKER(mutexLocker, _currentProt.mutex); // only one may work at a time
@ -1237,6 +1247,15 @@ std::shared_ptr<CollectionInfoCurrent> ClusterInfo::getCollectionCurrent(
return std::make_shared<CollectionInfoCurrent>(0);
}
RebootTracker& ClusterInfo::rebootTracker() noexcept {
return _rebootTracker;
}
RebootTracker const& ClusterInfo::rebootTracker() const noexcept {
return _rebootTracker;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief ask about a view
/// If it is not found in the cache, the cache is reloaded once. The second
@ -3111,7 +3130,8 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
static std::string const prefixServers = "Current/ServersRegistered";
static std::string const prefixServersRegistered = "Current/ServersRegistered";
static std::string const prefixServersKnown = "Current/ServersKnown";
static std::string const mapUniqueToShortId = "Target/MapUniqueToShortID";
void ClusterInfo::loadServers() {
@ -3126,8 +3146,9 @@ void ClusterInfo::loadServers() {
}
AgencyCommResult result = _agency.sendTransactionWithFailover(AgencyReadTransaction(
std::vector<std::string>({AgencyCommManager::path(prefixServers),
AgencyCommManager::path(mapUniqueToShortId)})));
std::vector<std::string>({AgencyCommManager::path(prefixServersRegistered),
AgencyCommManager::path(mapUniqueToShortId),
AgencyCommManager::path(prefixServersKnown)})));
if (result.successful()) {
velocypack::Slice serversRegistered = result.slice()[0].get(std::vector<std::string>(
@ -3136,11 +3157,16 @@ void ClusterInfo::loadServers() {
velocypack::Slice serversAliases = result.slice()[0].get(std::vector<std::string>(
{AgencyCommManager::path(), "Target", "MapUniqueToShortID"}));
velocypack::Slice serversKnownSlice = result.slice()[0].get(std::vector<std::string>(
{AgencyCommManager::path(), "Current", "ServersKnown"}));
if (serversRegistered.isObject()) {
decltype(_servers) newServers;
decltype(_serverAliases) newAliases;
decltype(_serverAdvertisedEndpoints) newAdvertisedEndpoints;
std::unordered_set<ServerID> serverIds;
for (auto const& res : VPackObjectIterator(serversRegistered)) {
velocypack::Slice slice = res.value;
@ -3164,24 +3190,31 @@ void ClusterInfo::loadServers() {
}
newServers.emplace(std::make_pair(serverId, server));
newAdvertisedEndpoints.emplace(std::make_pair(serverId, advertised));
serverIds.emplace(serverId);
}
}
decltype(_serversKnown) newServersKnown(serversKnownSlice, serverIds);
// Now set the new value:
{
WRITE_LOCKER(writeLocker, _serversProt.lock);
_servers.swap(newServers);
_serverAliases.swap(newAliases);
_serverAdvertisedEndpoints.swap(newAdvertisedEndpoints);
_serversKnown = std::move(newServersKnown);
_serversProt.doneVersion = storedVersion;
_serversProt.isValid = true;
}
// RebootTracker has its own mutex, and doesn't strictly need to be in sync
// with the other members.
rebootTracker().updateServerState(_serversKnown.rebootIds());
return;
}
}
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Error while loading " << prefixServers
<< "Error while loading " << prefixServersRegistered
<< " httpCode: " << result.httpCode() << " errorCode: " << result.errorCode()
<< " errorMessage: " << result.errorMessage() << " body: " << result.body();
}
@ -3808,6 +3841,53 @@ arangodb::Result ClusterInfo::agencyDump(std::shared_ptr<VPackBuilder> body) {
return Result();
}
ClusterInfo::ServersKnown::ServersKnown(VPackSlice const serversKnownSlice,
std::unordered_set<ServerID> const& serverIds)
: _serversKnown() {
TRI_ASSERT(serversKnownSlice.isNone() || serversKnownSlice.isObject());
if (serversKnownSlice.isObject()) {
for (auto const it : VPackObjectIterator(serversKnownSlice)) {
VPackSlice const knownServerSlice = it.value;
TRI_ASSERT(knownServerSlice.isObject());
if (knownServerSlice.isObject()) {
VPackSlice const rebootIdSlice = knownServerSlice.get("rebootId");
TRI_ASSERT(rebootIdSlice.isInteger());
if (rebootIdSlice.isInteger()) {
std::string serverId = it.key.copyString();
auto const rebootId = RebootId{rebootIdSlice.getNumericValue<uint64_t>()};
_serversKnown.emplace(std::move(serverId), rebootId);
}
}
}
}
// For backwards compatibility / rolling upgrades, add servers that aren't in
// ServersKnown but in ServersRegistered with a reboot ID of 0 as a fallback.
// We should be able to remove this in 3.6.
for (auto const& serverId : serverIds) {
auto const rv = _serversKnown.emplace(serverId, RebootId{0});
LOG_TOPIC_IF(INFO, Logger::CLUSTER, rv.second)
<< "Server "
<< serverId << " is in Current/ServersRegistered, but not in "
"Current/ServersKnown. This is expected to happen "
"during a rolling upgrade.";
}
}
std::unordered_map<ServerID, ClusterInfo::ServersKnown::KnownServer> const&
ClusterInfo::ServersKnown::serversKnown() const noexcept {
return _serversKnown;
}
std::unordered_map<ServerID, RebootId> ClusterInfo::ServersKnown::rebootIds() const noexcept {
std::unordered_map<ServerID, RebootId> rebootIds;
for (auto const& it : _serversKnown) {
rebootIds.emplace(it.first, it.second.rebootId());
}
return rebootIds;
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------

View File

@ -39,6 +39,8 @@
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/AgencyCallbackRegistry.h"
#include "Cluster/ClusterTypes.h"
#include "Cluster/RebootTracker.h"
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
@ -48,15 +50,6 @@ class Slice;
}
class ClusterInfo;
class LogicalCollection;
typedef std::string ServerID; // ID of a server
typedef std::string DatabaseID; // ID/name of a database
typedef std::string CollectionID; // ID of a collection
typedef std::string ViewID; // ID of a view
typedef std::string ShardID; // ID of a shard
typedef uint32_t ServerShortID; // Short ID of a server
typedef std::string ServerShortName; // Short name of a server
struct ClusterCollectionCreationInfo;
class CollectionInfoCurrent {
@ -242,6 +235,30 @@ class ClusterInfo {
ClusterInfo(ClusterInfo const&) = delete; // not implemented
ClusterInfo& operator=(ClusterInfo const&) = delete; // not implemented
public:
class ServersKnown {
public:
ServersKnown() = default;
ServersKnown(VPackSlice serversKnownSlice, std::unordered_set<ServerID> const& servers);
class KnownServer {
public:
explicit constexpr KnownServer(RebootId rebootId) : _rebootId(rebootId) {}
RebootId rebootId() const noexcept { return _rebootId; }
private:
RebootId _rebootId;
};
std::unordered_map<ServerID, KnownServer> const& serversKnown() const noexcept;
std::unordered_map<ServerID, RebootId> rebootIds() const noexcept;
private:
std::unordered_map<ServerID, KnownServer> _serversKnown;
};
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief creates library
@ -368,6 +385,13 @@ class ClusterInfo {
virtual std::shared_ptr<CollectionInfoCurrent> getCollectionCurrent(DatabaseID const&,
CollectionID const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief Get the RebootTracker
//////////////////////////////////////////////////////////////////////////////
cluster::RebootTracker& rebootTracker() noexcept;
cluster::RebootTracker const& rebootTracker() const noexcept;
//////////////////////////////////////////////////////////////////////////////
/// @brief create database in coordinator
//////////////////////////////////////////////////////////////////////////////
@ -689,12 +713,17 @@ class ClusterInfo {
ProtectionData() : isValid(false), wantedVersion(0), doneVersion(0) {}
};
cluster::RebootTracker _rebootTracker;
// The servers, first all, we only need Current here:
std::unordered_map<ServerID, std::string> _servers; // from Current/ServersRegistered
std::unordered_map<ServerID, std::string> _serverAliases; // from Current/ServersRegistered
std::unordered_map<ServerID, std::string> _serverAdvertisedEndpoints; // from Current/ServersRegistered
ProtectionData _serversProt;
// Current/ServersKnown:
ServersKnown _serversKnown;
// The DBServers, also from Current:
std::unordered_map<ServerID, ServerID> _DBServers; // from Current/DBServers
ProtectionData _DBServersProt;

View File

@ -0,0 +1,23 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#include "ClusterTypes.h"

View File

@ -0,0 +1,75 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CLUSTER_CLUSTERTYPES_H
#define ARANGOD_CLUSTER_CLUSTERTYPES_H
#include <limits>
#include <string>
namespace arangodb {
typedef std::string ServerID; // ID of a server
typedef std::string DatabaseID; // ID/name of a database
typedef std::string CollectionID; // ID of a collection
typedef std::string ViewID; // ID of a view
typedef std::string ShardID; // ID of a shard
typedef uint32_t ServerShortID; // Short ID of a server
typedef std::string ServerShortName; // Short name of a server
class RebootId {
public:
explicit constexpr RebootId(uint64_t rebootId) noexcept : _value(rebootId) {}
uint64_t value() const noexcept { return _value; }
bool initialized() const noexcept { return value() != 0; }
bool operator==(RebootId other) const noexcept {
return value() == other.value();
}
bool operator!=(RebootId other) const noexcept {
return value() != other.value();
}
bool operator<(RebootId other) const noexcept {
return value() < other.value();
}
bool operator>(RebootId other) const noexcept {
return value() > other.value();
}
bool operator<=(RebootId other) const noexcept {
return value() <= other.value();
}
bool operator>=(RebootId other) const noexcept {
return value() >= other.value();
}
static constexpr RebootId max() noexcept {
return RebootId{std::numeric_limits<decltype(_value)>::max()};
}
private:
uint64_t _value;
};
} // namespace arangodb
#endif // ARANGOD_CLUSTER_CLUSTERTYPES_H

View File

@ -0,0 +1,340 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#include "RebootTracker.h"
#include "Scheduler/SchedulerFeature.h"
#include "lib/Basics/Exceptions.h"
#include "lib/Basics/MutexLocker.h"
#include "lib/Basics/ScopeGuard.h"
#include "lib/Logger/LogMacros.h"
#include "lib/Logger/Logger.h"
#include <algorithm>
using namespace arangodb;
using namespace arangodb::cluster;
RebootTracker::RebootTracker(RebootTracker::SchedulerPointer scheduler)
: _scheduler(scheduler) {
// All the mocked application servers in the catch tests that use the
// ClusterFeature, which at some point instantiates this, do not start the
// SchedulerFeature. Thus this dies. However, we will be able to fix that at
// a central place later, as there is some refactoring going on there. Then
// this #ifdef can be removed.
#ifndef ARANGODB_USE_CATCH_TESTS
TRI_ASSERT(_scheduler != nullptr);
#endif
}
void RebootTracker::updateServerState(std::unordered_map<ServerID, RebootId> const& state) {
MUTEX_LOCKER(guard, _mutex);
for (auto nextIt = _rebootIds.begin(); nextIt != _rebootIds.end(); ) {
// Save the next iterator now, because curIt may get invalidated due to an erase.
auto const curIt = nextIt;
nextIt = std::next(nextIt);
{
auto const& serverId = curIt->first;
auto& oldRebootId = curIt->second;
auto const& newIt = state.find(serverId);
if (newIt == state.end()) {
// Try to schedule all callbacks for serverId.
// If that didn't throw, erase the entry.
scheduleAllCallbacksFor(serverId);
auto it = _callbacks.find(serverId);
if (it != _callbacks.end()) {
TRI_ASSERT(it->second.empty());
_callbacks.erase(it);
}
_rebootIds.erase(curIt);
} else {
TRI_ASSERT(serverId == newIt->first);
auto const& newRebootId = newIt->second;
TRI_ASSERT(oldRebootId <= newRebootId);
if (oldRebootId < newRebootId) {
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "Server " << serverId << " rebooted, aborting its old jobs now.";
// Try to schedule all callbacks for serverId older than newRebootId.
// If that didn't throw, erase the entry.
scheduleCallbacksFor(serverId, newRebootId);
oldRebootId = newRebootId;
}
}
}
};
// Look whether there are servers that are still unknown
// (note: we could shortcut this and return if the sizes are equal, as at
// this point, all entries in _rebootIds are also in state)
for (auto const& newIt : state) {
auto const& serverId = newIt.first;
auto const& rebootId = newIt.second;
auto rv = _rebootIds.emplace(serverId, rebootId);
auto const inserted = rv.second;
// If we inserted a new server, we may NOT already have any callbacks for
// it!
TRI_ASSERT(!inserted || _callbacks.find(serverId) == _callbacks.end());
}
}
CallbackGuard RebootTracker::callMeOnChange(RebootTracker::PeerState const& peerState,
RebootTracker::Callback callback,
std::string callbackDescription) {
MUTEX_LOCKER(guard, _mutex);
auto const rebootIdIt = _rebootIds.find(peerState.serverId());
// We MUST NOT insert something in _callbacks[serverId] unless _rebootIds[serverId] exists!
if (rebootIdIt == _rebootIds.end()) {
std::string const error = [&]() {
std::stringstream strstream;
strstream << "When trying to register callback '" << callbackDescription << "': "
<< "The server " << peerState.serverId() << " is not known. "
<< "If this server joined the cluster in the last seconds, "
"this can happen.";
return strstream.str();
}();
LOG_TOPIC(INFO, Logger::CLUSTER) << error;
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_SERVER_UNKNOWN, error);
}
auto const currentRebootId = rebootIdIt->second;
if (peerState.rebootId() < currentRebootId) {
// If this ID is already older, schedule the callback immediately.
queueCallback(DescriptedCallback{std::move(callback), std::move(callbackDescription)});
return CallbackGuard{nullptr};
}
// For the given server, get the existing rebootId => [callbacks] map,
// or create a new one
auto& rebootIdMap = _callbacks[peerState.serverId()];
// For the given rebootId, get the existing callbacks map,
// or create a new one
auto& callbackMapPtr = rebootIdMap[peerState.rebootId()];
if (callbackMapPtr == nullptr) {
// We must never leave a nullptr in here!
// Try to create a new map, or remove the entry.
try {
callbackMapPtr =
std::make_shared<std::remove_reference<decltype(callbackMapPtr)>::type::element_type>();
} catch (...) {
rebootIdMap.erase(peerState.rebootId());
throw;
}
}
TRI_ASSERT(callbackMapPtr != nullptr);
auto& callbackMap = *callbackMapPtr;
auto const callbackId = getNextCallbackId();
// The guard constructor might, theoretically, throw, and so can constructing
// the std::function. So we need to construct it before emplacing the callback.
auto callbackGuard = CallbackGuard([this, peerState, callbackId]() {
unregisterCallback(peerState, callbackId);
});
auto emplaceRv =
callbackMap.emplace(callbackId, DescriptedCallback{std::move(callback),
std::move(callbackDescription)});
auto const iterator = emplaceRv.first;
bool const inserted = emplaceRv.second;
TRI_ASSERT(inserted);
TRI_ASSERT(callbackId == iterator->first);
return callbackGuard;
}
void RebootTracker::scheduleAllCallbacksFor(ServerID const& serverId) {
scheduleCallbacksFor(serverId, RebootId::max());
// Now the rebootId map of this server, if it exists, must be empty.
TRI_ASSERT(_callbacks.find(serverId) == _callbacks.end() ||
_callbacks.find(serverId)->second.empty());
}
// This function may throw.
// If (and only if) it returns, it has scheduled all affected callbacks, and
// removed them from the registry.
// Otherwise the state is unchanged.
void RebootTracker::scheduleCallbacksFor(ServerID const& serverId, RebootId rebootId) {
_mutex.assertLockedByCurrentThread();
auto serverIt = _callbacks.find(serverId);
if (serverIt != _callbacks.end()) {
auto& rebootMap = serverIt->second;
auto const begin = rebootMap.begin();
// lower_bounds returns the first iterator that is *not less than* rebootId
auto const end = rebootMap.lower_bound(rebootId);
std::vector<decltype(begin->second)> callbackSets;
callbackSets.reserve(std::distance(begin, end));
using RebootMapElt = std::remove_reference<decltype(rebootMap)>::type::value_type;
std::for_each(begin, end, [&callbackSets](RebootMapElt const& it) {
callbackSets.emplace_back(it.second);
});
// could throw
queueCallbacks(std::move(callbackSets));
// If and only if we successfully scheduled all callbacks, we erase them
// from the registry.
rebootMap.erase(begin, end);
}
}
std::function<void(bool)> RebootTracker::createSchedulerCallback(
std::vector<std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>> callbacks) {
TRI_ASSERT(!callbacks.empty());
using CallbackMapPtr = decltype(callbacks)::value_type;
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
[](CallbackMapPtr const& it) { return it == nullptr; }));
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
[](CallbackMapPtr const& it) { return it->empty(); }));
// C++11 doesn't yet allow generalized captures (e.g. [callbacks = std::move(callbacks)]),
// so we use a shared ptr to avoid copying the vector.
std::shared_ptr<decltype(callbacks)> allCallbacksPtr =
std::make_shared<decltype(callbacks)>(std::move(callbacks));
return [allCallbacksPtr](bool) {
auto const& callbacks = *allCallbacksPtr;
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Executing scheduled reboot callbacks";
TRI_ASSERT(!callbacks.empty());
for (auto const& callbacksPtr : callbacks) {
TRI_ASSERT(callbacksPtr != nullptr);
TRI_ASSERT(!callbacksPtr->empty());
for (auto const& it : *callbacksPtr) {
auto const& cb = it.second.callback;
auto const& descr = it.second.description;
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Executing callback " << it.second.description;
try {
cb();
} catch (arangodb::basics::Exception const& ex) {
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "Failed to execute reboot callback: " << descr << ": "
<< "[" << ex.code() << "] " << ex.what();
} catch (std::exception const& ex) {
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "Failed to execute reboot callback: " << descr << ": " << ex.what();
} catch (...) {
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "Failed to execute reboot callback: " << descr << ": "
<< "Unknown error.";
}
}
}
};
}
void RebootTracker::queueCallbacks(
std::vector<std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>> callbacks) {
if (callbacks.empty()) {
return;
}
using CallbackMapPtr = decltype(callbacks)::value_type;
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
[](CallbackMapPtr const& it) { return it == nullptr; }));
TRI_ASSERT(std::none_of(callbacks.cbegin(), callbacks.cend(),
[](CallbackMapPtr const& it) { return it->empty(); }));
auto cb = createSchedulerCallback(std::move(callbacks));
if (!_scheduler->queue(RequestPriority::HIGH, std::move(cb))) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUEUE_FULL,
"No available threads when trying to queue cleanup "
"callbacks due to a server reboot");
}
}
void RebootTracker::unregisterCallback(PeerState const& peerState,
RebootTracker::CallbackId callbackId) {
MUTEX_LOCKER(guard, _mutex);
auto const cbIt = _callbacks.find(peerState.serverId());
if (cbIt != _callbacks.end()) {
auto& rebootMap = cbIt->second;
auto const rbIt = rebootMap.find(peerState.rebootId());
if (rbIt != rebootMap.end()) {
auto& callbackSetPtr = rbIt->second;
TRI_ASSERT(callbackSetPtr != nullptr);
callbackSetPtr->erase(callbackId);
if (callbackSetPtr->empty()) {
rebootMap.erase(rbIt);
}
}
}
}
RebootTracker::CallbackId RebootTracker::getNextCallbackId() noexcept {
_mutex.assertLockedByCurrentThread();
CallbackId nextId = _nextCallbackId;
++_nextCallbackId;
return nextId;
}
void RebootTracker::queueCallback(DescriptedCallback callback) {
queueCallbacks({std::make_shared<std::unordered_map<CallbackId, DescriptedCallback>>(
std::unordered_map<CallbackId, DescriptedCallback>{
std::make_pair(getNextCallbackId(), std::move(callback))})});
}
CallbackGuard::CallbackGuard() : _callback(nullptr) {}
CallbackGuard::CallbackGuard(std::function<void(void)> callback)
: _callback(std::move(callback)) {}
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard::CallbackGuard(CallbackGuard&& other)
: _callback(std::move(other._callback)) {
other._callback = nullptr;
}
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard& CallbackGuard::operator=(CallbackGuard&& other) {
call();
_callback = std::move(other._callback);
other._callback = nullptr;
return *this;
}
CallbackGuard::~CallbackGuard() { call(); }
void CallbackGuard::callAndClear() {
call();
_callback = nullptr;
}
void CallbackGuard::call() {
if (_callback) {
_callback();
}
}

View File

@ -0,0 +1,163 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CLUSTER_REBOOTTRACKER_H
#define ARANGOD_CLUSTER_REBOOTTRACKER_H
#include "Cluster/ClusterTypes.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "lib/Basics/Mutex.h"
#include <map>
#include <type_traits>
#include <unordered_map>
#include <vector>
namespace arangodb {
namespace cluster {
/// @brief If constructed with a callback, the given callback will be called
/// exactly once: Either during destruction, or when the object is overwritten
/// (via operator=()), or when it's explicitly cleared. It's not copyable,
/// but movable.
class CallbackGuard {
public:
// Calls the callback given callback upon destruction.
// Allows only move semantics and no copy semantics.
CallbackGuard();
// IMPORTANT NOTE:
// The passed callback should not throw exceptions, they will not be caught
// here, but thrown by the destructor!
explicit CallbackGuard(std::function<void(void)> callback);
~CallbackGuard();
// Note that the move constructor of std::function is not noexcept until
// C++20. Thus we cannot mark the constructors here noexcept.
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard(CallbackGuard&& other);
// operator= additionally calls the _callback, and this can also throw.
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard& operator=(CallbackGuard&&);
CallbackGuard(CallbackGuard const&) = delete;
CallbackGuard& operator=(CallbackGuard const&) = delete;
/// @brief Call the contained callback, then delete it.
void callAndClear();
private:
void call();
std::function<void(void)> _callback;
};
// Note:
// Instances of this class must be destructed during shutdown before the
// scheduler is destroyed.
class RebootTracker {
public:
using Callback = std::function<void()>;
using SchedulerPointer = decltype(SchedulerFeature::SCHEDULER);
static_assert(std::is_pointer<SchedulerPointer>::value,
"If SCHEDULER is changed to a non-pointer type, this class "
"might have to be adapted");
static_assert(
std::is_base_of<rest::Scheduler, std::remove_pointer<SchedulerPointer>::type>::value,
"SchedulerPointer is expected to point to an instance of Scheduler");
class PeerState {
public:
PeerState(ServerID serverId, RebootId rebootId)
: _serverId{std::move(serverId)}, _rebootId{rebootId} {}
ServerID const& serverId() const noexcept { return _serverId; }
RebootId rebootId() const noexcept { return _rebootId; }
private:
ServerID _serverId;
RebootId _rebootId;
};
explicit RebootTracker(SchedulerPointer scheduler);
CallbackGuard callMeOnChange(PeerState const& peerState, Callback callback,
std::string callbackDescription);
void updateServerState(std::unordered_map<ServerID, RebootId> const& state);
private:
using CallbackId = uint64_t;
struct DescriptedCallback {
Callback callback;
std::string description;
};
CallbackId getNextCallbackId() noexcept;
void unregisterCallback(PeerState const&, CallbackId);
// bool notifyChange(PeerState const& peerState);
void scheduleAllCallbacksFor(ServerID const& serverId);
void scheduleCallbacksFor(ServerID const& serverId, RebootId rebootId);
static std::function<void(bool)> createSchedulerCallback(
std::vector<std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>> callbacks);
void queueCallbacks(std::vector<std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>> callbacks);
void queueCallback(DescriptedCallback callback);
private:
Mutex _mutex;
CallbackId _nextCallbackId{1};
/// @brief Last known rebootId of every server.
/// Will regularly get updates from the agency.
/// Updates may not be applied if scheduling the affected callbacks fails, so
/// the scheduling will be tried again on the next update.
std::unordered_map<ServerID, RebootId> _rebootIds;
/// @brief List of registered callbacks per server.
/// Maps (serverId, rebootId) to a set of callbacks (indexed by a CallbackId).
/// Needs to fulfill the following:
/// - A callback with a given ID may never be moved to another (serverId, rebootId) entry,
/// to allow CallbackGuard to find a callback.
/// - All ServerIDs in this map must always exist in _rebootIds (though not
/// necessarily the other way round).
/// - The shared_ptr in the RebootId-indexed map must never be nullptr
/// - The unordered_map pointed to by the aforementioned shared_ptr must never be empty
/// - The RebootIds used as index in the inner map are expected to not be smaller than the corresponding ones in _rebootIds
std::unordered_map<ServerID, std::map<RebootId, std::shared_ptr<std::unordered_map<CallbackId, DescriptedCallback>>>> _callbacks;
/// @brief Save a pointer to the scheduler for easier testing
SchedulerPointer _scheduler;
};
} // namespace cluster
} // namespace arangodb
#endif // ARANGOD_CLUSTER_REBOOTTRACKER_H

View File

@ -36,6 +36,7 @@
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ResultT.h"
#include "Logger/Logger.h"
#include "Rest/Version.h"
#include "RestServer/DatabaseFeature.h"
@ -58,6 +59,7 @@ ServerState::ServerState()
_lock(),
_id(),
_shortId(0),
_rebootId(0),
_javaScriptStartupPath(),
_myEndpoint(),
_advertisedEndpoint(),
@ -319,6 +321,8 @@ bool ServerState::unregister() {
AgencySimpleOperationType::DELETE_OP));
operations.push_back(AgencyOperation("Current/" + agencyListKey + "/" + id,
AgencySimpleOperationType::DELETE_OP));
operations.push_back(AgencyOperation("Current/ServersKnown/" + id,
AgencySimpleOperationType::DELETE_OP));
operations.push_back(AgencyOperation("Plan/Version", AgencySimpleOperationType::INCREMENT_OP));
operations.push_back(AgencyOperation("Current/Version",
AgencySimpleOperationType::INCREMENT_OP));
@ -329,6 +333,32 @@ bool ServerState::unregister() {
return r.successful();
}
ResultT<uint64_t> ServerState::readRebootIdFromAgency(AgencyComm& comm)
{
TRI_ASSERT(!_id.empty());
std::string rebootIdPath = "Current/ServersKnown/" + _id + "/rebootId";
AgencyCommResult result = comm.getValues(rebootIdPath);
if (!result.successful()) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "Could not read back " << rebootIdPath;
return ResultT<uint64_t>::error(TRI_ERROR_INTERNAL, "could not read rebootId from agency");
}
auto slicePath = AgencyCommManager::slicePath(rebootIdPath);
auto valueSlice = result.slice()[0].get(slicePath);
if (!valueSlice.isInteger()) {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "rebootId is not an integer";
return ResultT<uint64_t>::error(TRI_ERROR_INTERNAL, "rebootId is not an integer");
}
return ResultT<uint64_t>::success(valueSlice.getNumericValue<uint64_t>());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief try to integrate into a cluster
////////////////////////////////////////////////////////////////////////////////
@ -359,7 +389,8 @@ bool ServerState::integrateIntoCluster(ServerState::RoleEnum role,
// generate and persist new id
// }
std::string id;
if (!hasPersistedId()) {
bool hadPersistedId = hasPersistedId();
if (!hadPersistedId) {
id = generatePersistedId(role);
LOG_TOPIC(INFO, Logger::CLUSTER) << "Fresh start. Persisting new UUID " << id;
@ -383,7 +414,7 @@ bool ServerState::integrateIntoCluster(ServerState::RoleEnum role,
<< roleToString(role) << " and our id is " << id;
// now overwrite the entry in /Current/ServersRegistered/<myId>
return registerAtAgencyPhase2(comm);
return registerAtAgencyPhase2(comm, hadPersistedId);
}
//////////////////////////////////////////////////////////////////////////////
@ -635,10 +666,13 @@ bool ServerState::registerAtAgencyPhase1(AgencyComm& comm, const ServerState::Ro
return false;
}
bool ServerState::registerAtAgencyPhase2(AgencyComm& comm) {
bool ServerState::registerAtAgencyPhase2(AgencyComm& comm, bool const hadPersistedId) {
TRI_ASSERT(!_id.empty() && !_myEndpoint.empty());
while (!application_features::ApplicationServer::isStopping()) {
std::string serverRegistrationPath = currentServersRegisteredPref + _id;
std::string rebootIdPath = "/Current/ServersKnown/" + _id + "/rebootId";
VPackBuilder builder;
try {
VPackObjectBuilder b(&builder);
@ -653,10 +687,24 @@ bool ServerState::registerAtAgencyPhase2(AgencyComm& comm) {
FATAL_ERROR_EXIT();
}
auto result = comm.setValue(currentServersRegisteredPref + _id, builder.slice(), 0.0);
// If we generated a new UUID, this *must not* exist in the Agency, so we
// should fail to register.
std::vector<AgencyPrecondition> pre;
if (!hadPersistedId) {
pre.emplace_back(AgencyPrecondition(rebootIdPath, AgencyPrecondition::Type::EMPTY, true));
}
AgencyWriteTransaction trx(
{AgencyOperation(serverRegistrationPath, AgencyValueOperationType::SET,
builder.slice()),
AgencyOperation(rebootIdPath, AgencySimpleOperationType::INCREMENT_OP),
AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP)},
pre);
auto result = comm.sendTransactionWithFailover(trx, 0.0);
if (result.successful()) {
return true;
break; // Continue below to read back the rebootId
} else {
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER)
<< "failed to register server in agency: http code: " << result.httpCode()
@ -666,6 +714,18 @@ bool ServerState::registerAtAgencyPhase2(AgencyComm& comm) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// if we left the above retry loop because the server is stopping
// we'll skip this and return false right away.
while (!application_features::ApplicationServer::isStopping()) {
auto result = readRebootIdFromAgency(comm);
if (result) {
setRebootId(result.get());
return true;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return false;
}
@ -720,6 +780,16 @@ void ServerState::setShortId(uint32_t id) {
_shortId.store(id, std::memory_order_relaxed);
}
uint64_t ServerState::getRebootId() const {
TRI_ASSERT(_rebootId > 0);
return _rebootId;
}
void ServerState::setRebootId(uint64_t const rebootId) {
TRI_ASSERT(rebootId > 0);
_rebootId = rebootId;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the server address
////////////////////////////////////////////////////////////////////////////////

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h"
#include "Basics/ReadWriteSpinLock.h"
#include "Cluster/ResultT.h"
#include "VocBase/voc-types.h"
namespace arangodb {
@ -208,6 +209,10 @@ class ServerState {
/// @brief set the server short id
void setShortId(uint32_t);
uint64_t getRebootId() const;
void setRebootId(uint64_t const rebootId);
/// @brief get the server endpoint
std::string getEndpoint();
@ -270,11 +275,14 @@ class ServerState {
/// @brief check equality of engines with other registered servers
bool checkEngineEquality(AgencyComm&);
/// @brief try to read the rebootID from the Agency
ResultT<uint64_t> readRebootIdFromAgency(AgencyComm& comm);
/// @brief register at agency, might already be done
bool registerAtAgencyPhase1(AgencyComm&, const RoleEnum&);
/// @brief write the Current/ServersRegistered entry
bool registerAtAgencyPhase2(AgencyComm&);
bool registerAtAgencyPhase2(AgencyComm&, bool const hadPersistedId);
void setFoxxmasterSinceNow();
@ -293,6 +301,21 @@ class ServerState {
/// @brief the server's short id, can be set just once
std::atomic<uint32_t> _shortId;
/// @brief the server's rebootId.
///
/// A server
/// * ~boots~ if it is started on a new database directory without a UUID persisted
/// * ~reboots~ if it is started on a pre-existing database directory with a UUID present
///
/// when integrating into a cluster (via integrateIntoCluster), the server tries to increment
/// the agency key Current/KnownServers/_id/rebootId; if this key did not exist it is
/// created with the value 1, so a valid rebootId is always >= 1, and if the server booted
/// must be 1.
///
/// Changes of rebootIds (i.e. server reboots) are noticed in ClusterInfo and
/// can be used through a notification architecture from there
uint64_t _rebootId;
/// @brief the JavaScript startup path, can be set just once
std::string _javaScriptStartupPath;

View File

@ -94,7 +94,7 @@ class BaseEngine {
protected:
arangodb::aql::Query* _query;
transaction::Methods* _trx;
std::shared_ptr<transaction::Methods> _trx;
arangodb::aql::Collections _collections;
std::unordered_map<std::string, std::vector<std::string>> _vertexShards;
};

View File

@ -2884,10 +2884,10 @@ Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id,
{
auto ctx = transaction::StandaloneContext::Create(_vocbase);
auto trx = std::make_unique<SingleCollectionTransaction>(ctx, col, access);
auto trx = std::make_shared<SingleCollectionTransaction>(ctx, col, access);
query->setTransactionContext(ctx);
// Inject will take over responsiblilty of transaction, even on error case.
query->injectTransaction(trx.release());
query->injectTransaction(std::move(trx));
}
auto trx = query->trx();
TRI_ASSERT(trx != nullptr);

View File

@ -119,29 +119,29 @@ void SchedulerFeature::validateOptions(std::shared_ptr<options::ProgramOptions>)
}
}
void SchedulerFeature::start() {
void SchedulerFeature::prepare() {
auto const N = TRI_numberProcessors();
LOG_TOPIC(DEBUG, arangodb::Logger::THREADS)
<< "Detected number of processors: " << N;
<< "Detected number of processors: " << N;
if (_nrMaximalThreads > 8 * N) {
LOG_TOPIC(WARN, arangodb::Logger::THREADS)
<< "--server.threads (" << _nrMaximalThreads
<< ") is more than eight times the number of cores (" << N
<< "), this might overload the server";
<< "--server.threads (" << _nrMaximalThreads
<< ") is more than eight times the number of cores (" << N
<< "), this might overload the server";
}
if (_nrMinimalThreads < 2) {
LOG_TOPIC(WARN, arangodb::Logger::THREADS)
<< "--server.minimal-threads (" << _nrMinimalThreads << ") should be at least 2";
<< "--server.minimal-threads (" << _nrMinimalThreads << ") should be at least 2";
_nrMinimalThreads = 2;
}
if (_nrMinimalThreads >= _nrMaximalThreads) {
LOG_TOPIC(WARN, arangodb::Logger::THREADS)
<< "--server.threads (" << _nrMaximalThreads << ") should be at least "
<< (_nrMinimalThreads + 1) << ", raising it";
<< "--server.threads (" << _nrMaximalThreads << ") should be at least "
<< (_nrMinimalThreads + 1) << ", raising it";
_nrMaximalThreads = _nrMinimalThreads;
}
@ -150,6 +150,11 @@ void SchedulerFeature::start() {
ArangoGlobalContext::CONTEXT->maskAllSignals();
buildScheduler();
TRI_ASSERT(_scheduler != nullptr);
}
void SchedulerFeature::start() {
TRI_ASSERT(_scheduler != nullptr);
bool ok = _scheduler->start();

View File

@ -44,6 +44,7 @@ class SchedulerFeature final : public application_features::ApplicationFeature {
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void beginShutdown() override final;
void stop() override final;

View File

@ -3204,7 +3204,7 @@ int transaction::Methods::lockCollections() {
}
/// @brief Clone this transaction. Only works for selected sub-classes
transaction::Methods* transaction::Methods::clone(transaction::Options const&) const {
std::shared_ptr<transaction::Methods> transaction::Methods::clone(transaction::Options const&) const {
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}

View File

@ -420,7 +420,7 @@ class Methods {
virtual int lockCollections();
/// @brief Clone this transaction. Only works for selected sub-classes
virtual transaction::Methods* clone(transaction::Options const&) const;
virtual std::shared_ptr<transaction::Methods> clone(transaction::Options const&) const;
/// @brief return the collection name resolver
CollectionNameResolver const* resolver() const;

View File

@ -109,6 +109,7 @@ actions.defineHttp({
operations['/arango/Current/DBServers/' + serverId] = {'op': 'delete'};
operations['/arango/Supervision/Health/' + serverId] = {'op': 'delete'};
operations['/arango/Target/MapUniqueToShortID/' + serverId] = {'op': 'delete'};
operations['/arango/Current/ServersKnown/' + serverId] = {'op': 'delete'};
try {
global.ArangoAgency.write([[operations, preconditions]]);

View File

@ -134,6 +134,7 @@
"ERROR_REPLICATION_START_TICK_NOT_PRESENT" : { "code" : 1414, "message" : "start tick not present" },
"ERROR_REPLICATION_WRONG_CHECKSUM" : { "code" : 1416, "message" : "wrong checksum" },
"ERROR_REPLICATION_SHARD_NONEMPTY" : { "code" : 1417, "message" : "shard not empty" },
"ERROR_CLUSTER_SERVER_UNKNOWN" : { "code" : 1449, "message" : "got a request from an unkown server" },
"ERROR_CLUSTER_NO_AGENCY" : { "code" : 1450, "message" : "could not connect to agency" },
"ERROR_CLUSTER_NO_COORDINATOR_HEADER" : { "code" : 1451, "message" : "missing coordinator header" },
"ERROR_CLUSTER_COULD_NOT_LOCK_PLAN" : { "code" : 1452, "message" : "could not lock plan in agency" },

View File

@ -145,7 +145,7 @@ class ApplicationServer {
static bool isPrepared() {
if (server != nullptr) {
ServerState tmp = server->_state.load(std::memory_order_relaxed);
ServerState tmp = server->state();
return tmp == ServerState::IN_START || tmp == ServerState::IN_WAIT ||
tmp == ServerState::IN_STOP;
}
@ -186,7 +186,7 @@ class ApplicationServer {
public:
ApplicationServer(std::shared_ptr<options::ProgramOptions>, char const* binaryPath);
~ApplicationServer();
TEST_VIRTUAL ~ApplicationServer();
std::string helpSection() const { return _helpSection; }
bool helpShown() const { return !_helpSection.empty(); }
@ -234,7 +234,7 @@ class ApplicationServer {
std::shared_ptr<options::ProgramOptions> options() const { return _options; }
// return the server state
ServerState state() const { return _state; }
TEST_VIRTUAL ServerState state() const { return _state; }
void addReporter(ProgressHandler reporter) {
_progressReports.emplace_back(reporter);

View File

@ -172,6 +172,7 @@ ERROR_REPLICATION_SHARD_NONEMPTY,1417,"shard not empty","Will be raised when a s
## ArangoDB cluster errors
################################################################################
ERROR_CLUSTER_SERVER_UNKNOWN,1449,"got a request from an unkown server","Will be raised on some occasions when one server gets a request from another, which has not (yet?) been made known via the agency."
ERROR_CLUSTER_NO_AGENCY,1450,"could not connect to agency","Will be raised when none of the agency servers can be connected to."
ERROR_CLUSTER_NO_COORDINATOR_HEADER,1451,"missing coordinator header","Will be raised when a DB server in a cluster receives a HTTP request without a coordinator header."
ERROR_CLUSTER_COULD_NOT_LOCK_PLAN,1452,"could not lock plan in agency","Will be raised when a coordinator in a cluster cannot lock the Plan hierarchy in the agency."

View File

@ -133,6 +133,7 @@ void TRI_InitializeErrorMessages() {
REG_ERROR(ERROR_REPLICATION_START_TICK_NOT_PRESENT, "start tick not present");
REG_ERROR(ERROR_REPLICATION_WRONG_CHECKSUM, "wrong checksum");
REG_ERROR(ERROR_REPLICATION_SHARD_NONEMPTY, "shard not empty");
REG_ERROR(ERROR_CLUSTER_SERVER_UNKNOWN, "got a request from an unkown server");
REG_ERROR(ERROR_CLUSTER_NO_AGENCY, "could not connect to agency");
REG_ERROR(ERROR_CLUSTER_NO_COORDINATOR_HEADER, "missing coordinator header");
REG_ERROR(ERROR_CLUSTER_COULD_NOT_LOCK_PLAN, "could not lock plan in agency");

View File

@ -675,6 +675,12 @@ constexpr int TRI_ERROR_REPLICATION_WRONG_CHECKSUM
/// Will be raised when a shard is not empty and the follower tries a shortcut
constexpr int TRI_ERROR_REPLICATION_SHARD_NONEMPTY = 1417;
/// 1449: ERROR_CLUSTER_SERVER_UNKNOWN
/// "got a request from an unkown server"
/// Will be raised on some occasions when one server gets a request from
/// another, which has not (yet?) been made known via the agency.
constexpr int TRI_ERROR_CLUSTER_SERVER_UNKNOWN = 1449;
/// 1450: ERROR_CLUSTER_NO_AGENCY
/// "could not connect to agency"
/// Will be raised when none of the agency servers can be connected to.

View File

@ -1,3 +1,5 @@
include_directories(.)
foreach (LINK_DIR ${V8_LINK_DIRECTORIES})
link_directories("${LINK_DIR}")
endforeach()
@ -54,7 +56,6 @@ set(IRESEARCH_TESTS_SOURCES
IResearch/IResearchQueryAggregate-test.cpp
IResearch/IResearchQueryTraversal-test.cpp
IResearch/RestHandlerMock.cpp
IResearch/StorageEngineMock.cpp
IResearch/IResearchViewNode-test.cpp
IResearch/VelocyPackHelper-test.cpp
RestHandler/RestUsersHandler-test.cpp
@ -127,6 +128,7 @@ set(ARANGODB_TESTS_SOURCES
Cluster/ClusterCommTest.cpp
Cluster/ClusterHelpersTest.cpp
Cluster/ClusterRepairsTest.cpp
Cluster/RebootTrackerTest.cpp
Geo/GeoConstructorTest.cpp
Geo/GeoJsonTest.cpp
Geo/GeoFunctionsTest.cpp
@ -136,6 +138,8 @@ set(ARANGODB_TESTS_SOURCES
Maintenance/MaintenanceFeatureTest.cpp
Maintenance/MaintenanceRestHandlerTest.cpp
Maintenance/MaintenanceTest.cpp
Mocks/Servers.cpp
IResearch/StorageEngineMock.cpp
Pregel/typedbuffer.cpp
RocksDBEngine/Endian.cpp
RocksDBEngine/KeyTest.cpp

View File

@ -0,0 +1,862 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#include "catch.hpp"
#include "Cluster/RebootTracker.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Mocks/Servers.h"
#include "lib/Logger/Logger.h"
#include <memory>
#include <type_traits>
using namespace arangodb;
using namespace arangodb::cluster;
using namespace arangodb::tests;
using namespace arangodb::tests::mocks;
using rest::Scheduler;
TEST_CASE("CallbackGuardTest") {
auto counterA = uint64_t{0};
auto counterB = uint64_t{0};
auto incrCounterA = [&counterA]() { ++counterA; };
auto incrCounterB = [&counterB]() { ++counterB; };
SECTION("test_default_constructor") {
// should do nothing, especially not cause an error during destruction
CallbackGuard guard{};
}
SECTION("test_deleted_copy_semantics") {
{
INFO("CallbackGuard should not be copy constructible");
CHECK_FALSE(std::is_copy_constructible<CallbackGuard>::value);
}
{
INFO("CallbackGuard should not be copy assignable");
CHECK_FALSE(std::is_copy_assignable<CallbackGuard>::value);
}
}
SECTION("test_constructor") {
{
CallbackGuard guard{incrCounterA};
{
INFO("construction should not invoke the callback");
CHECK(0 == counterA);
}
}
{
INFO("destruction should invoke the callback");
CHECK(1 == counterA);
}
}
SECTION("test_move_constructor_inline") {
{
CallbackGuard guard{CallbackGuard(incrCounterA)};
{
INFO("move construction should not invoke the callback");
CHECK(0 == counterA);
}
}
{
INFO("destruction should invoke the callback");
CHECK(1 == counterA);
}
}
SECTION("test_move_constructor_explicit") {
{
CallbackGuard guardA1{incrCounterA};
{
INFO("construction should not invoke the callback");
CHECK(0 == counterA);
}
{
CallbackGuard guardA2{std::move(guardA1)};
{
INFO("move construction should not invoke the callback");
CHECK(0 == counterA);
}
}
{
INFO("destroying a move constructed guard should invoke the callback");
CHECK(1 == counterA);
}
}
{
INFO("destroying a moved guard should not invoke the callback");
CHECK(1 == counterA);
}
}
SECTION("test_move_operator_eq_construction") {
{
auto guard = CallbackGuard{incrCounterA};
{
INFO("initialization with operator= should not invoke the callback");
CHECK(0 == counterA);
}
}
{
INFO("destruction should invoke the callback");
CHECK(1 == counterA);
}
}
SECTION("test_move_operator_eq_explicit") {
{
CallbackGuard guardA{incrCounterA};
{
INFO("construction should not invoke the callback");
CHECK(0 == counterA);
}
{
CallbackGuard guardB{incrCounterB};
{
INFO("construction should not invoke the callback");
CHECK(0 == counterB);
}
guardA = std::move(guardB);
{
INFO("being moved should not invoke the callback");
CHECK(0 == counterB);
}
{
INFO("being overwritten should invoke the callback");
CHECK(1 == counterA);
}
}
{
INFO("destroying a moved guard should not invoke the callback");
CHECK(0 == counterB);
}
{
INFO(
"destroying a moved guard should not invoke the "
"overwritten callback again");
CHECK(1 == counterA);
}
}
{
INFO("destroying an overwritten guard should invoke its new callback");
CHECK(1 == counterB);
}
{
INFO(
"destroying an overwritten guard should not invoke "
"its old callback again");
CHECK(1 == counterA);
}
}
}
TEST_CASE("RebootTrackerTest") {
ServerID const serverA = "PRMR-srv-A";
ServerID const serverB = "PRMR-srv-B";
ServerID const serverC = "PRMR-srv-C";
using PeerState = RebootTracker::PeerState;
auto scheduler = std::make_unique<Scheduler>(2, 64, 1024 * 1024, 4096);
static_assert(std::is_same<decltype(*SchedulerFeature::SCHEDULER), decltype(*scheduler)>::value,
"Use the correct scheduler in the tests");
// ApplicationServer needs to be prepared in order for the scheduler to start
// threads.
MockEmptyServer mockApplicationServer;
// Suppress this INFO message:
// When trying to register callback '': The server PRMR-srv-A is not known. If this server joined the cluster in the last seconds, this can happen.
arangodb::LogTopic::setLogLevel(arangodb::Logger::CLUSTER.name(), arangodb::LogLevel::WARN);
TRI_DEFER(scheduler->beginShutdown(); scheduler->shutdown());
scheduler->start();
auto schedulerEmpty = [&]() -> bool {
auto stats = scheduler->queueStatistics();
return stats._queued == 0 && stats._working == 0 && stats._fifo1 == 0 &&
stats._fifo2 == 0 && stats._fifo3 == 0;
};
auto waitForSchedulerEmpty = [&]() {
while (!schedulerEmpty()) {
std::this_thread::yield();
}
};
// Test that a registered callback is called once on the next change, but not
// after that
SECTION("one_server_call_once_after_change") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{1}}};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
std::vector<CallbackGuard> guards{};
CallbackGuard guard;
// Set state to { serverA => 1 }
rebootTracker.updateServerState(state);
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must be called after a change");
CHECK(1 == numCalled);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must not be called twice");
CHECK(1 == numCalled);
}
guards.clear();
{
INFO("Callback must not be called when guards are destroyed");
CHECK(1 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(1 == numCalled);
}
}
// Test that a registered callback is called immediately when its reboot id
// is lower than the last known one, but not after that
SECTION("one_server_call_once_with_old_rebootid") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{2}}};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
std::vector<CallbackGuard> guards{};
CallbackGuard guard;
// Set state to { serverA => 2 }
rebootTracker.updateServerState(state);
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback with lower value must be called immediately");
CHECK(1 == numCalled);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must not be called again");
CHECK(1 == numCalled);
}
guards.clear();
{
INFO("Callback must not be called when guards are destroyed");
CHECK(1 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(1 == numCalled);
}
}
// Tests that callbacks and interleaved updates don't interfere
SECTION("one_server_call_interleaved") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{1}}};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
std::vector<CallbackGuard> guards{};
CallbackGuard guard;
// Set state to { serverA => 1 }
rebootTracker.updateServerState(state);
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must be called after a change");
CHECK(1 == numCalled);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must not be called twice");
CHECK(1 == numCalled);
}
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{3}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(1 == numCalled);
}
// Set state to { serverA => 4 }
state.at(serverA) = RebootId{4};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must be called after a change");
CHECK(2 == numCalled);
}
// Set state to { serverA => 5 }
state.at(serverA) = RebootId{5};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must not be called twice");
CHECK(2 == numCalled);
}
guards.clear();
{
INFO("Callback must not be called when guards are destroyed");
CHECK(2 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(2 == numCalled);
}
}
// Tests that multiple callbacks and updates don't interfere
SECTION("one_server_call_sequential") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{1}}};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
std::vector<CallbackGuard> guards{};
CallbackGuard guard;
// Set state to { serverA => 1 }
rebootTracker.updateServerState(state);
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Both callbacks must be called after a change");
CHECK(2 == numCalled);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("No callback must be called twice");
CHECK(2 == numCalled);
}
guards.clear();
{
INFO("Callback must not be called when guards are destroyed");
CHECK(2 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(2 == numCalled);
}
}
// Test that a registered callback is removed when its guard is destroyed
SECTION("one_server_guard_removes_callback") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{1}}};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
// Set state to { serverA => 1 }
rebootTracker.updateServerState(state);
{
// Register callback
auto guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
}
waitForSchedulerEmpty();
{
INFO("Callback must not be called when the guard is destroyed");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO(
"Callback must not be called after a change "
"when the guard was destroyed before");
CHECK(0 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(0 == numCalled);
}
}
// Test that callback removed by a guard doesn't interfere with other
// registered callbacks for the same server and reboot id
SECTION("one_server_guard_doesnt_interfere") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{1}}};
uint64_t counterA = 0;
uint64_t counterB = 0;
uint64_t counterC = 0;
auto incrCounterA = [&counterA]() { ++counterA; };
auto incrCounterB = [&counterB]() { ++counterB; };
auto incrCounterC = [&counterC]() { ++counterC; };
{
RebootTracker rebootTracker{scheduler.get()};
std::vector<CallbackGuard> guards{};
CallbackGuard guard;
// Set state to { serverA => 1 }
rebootTracker.updateServerState(state);
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
incrCounterA, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == counterA);
}
{
// Register callback with a local guard
auto localGuard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
incrCounterB, "");
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == counterA);
}
{
INFO("Callback must not be called before a change");
CHECK(0 == counterB);
}
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
incrCounterC, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == counterA);
}
{
INFO("Callback must not be called before a change");
CHECK(0 == counterB);
}
{
INFO("Callback must not be called before a change");
CHECK(0 == counterC);
}
}
waitForSchedulerEmpty();
{
INFO("Callback must not be called when the guard is destroyed");
CHECK(0 == counterA);
}
{
INFO("Callback must not be called when the guard is destroyed");
CHECK(0 == counterB);
}
{
INFO("Callback must not be called when the guard is destroyed");
CHECK(0 == counterC);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must be called after a change");
CHECK(1 == counterA);
}
{
INFO("Removed callback must not be called after a change");
CHECK(0 == counterB);
}
{
INFO("Callback must be called after a change");
CHECK(1 == counterC);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("No callback must be called twice");
CHECK(1 == counterA);
}
{
INFO("Removed callback must not be called after a change");
CHECK(0 == counterB);
}
{
INFO("No callback must be called twice");
CHECK(1 == counterC);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(1 == counterA);
}
{
INFO("Callback must not be called during destruction");
CHECK(0 == counterB);
}
{
INFO("Callback must not be called during destruction");
CHECK(1 == counterC);
}
}
SECTION("one_server_add_callback_before_state_with_same_id") {
auto state = std::unordered_map<ServerID, RebootId>{};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
CallbackGuard guard;
// State is empty { }
// Register callback
{
INFO(
"Trying to add a callback for an unknown server should be refused");
CHECK_THROWS_AS(guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, ""),
arangodb::basics::Exception);
}
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Set state to { serverA => 1 }
state.emplace(serverA, RebootId{1});
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO(
"Callback must not be called when the state is "
"set to the same RebootId, as it shouldn't have been registered");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO(
"Callback must not be called after a change, as "
"it shouldn't have been registered");
CHECK(0 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(0 == numCalled);
}
}
SECTION("one_server_add_callback_before_state_with_older_id") {
auto state = std::unordered_map<ServerID, RebootId>{};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
CallbackGuard guard;
// State is empty { }
// Register callback
{
INFO(
"Trying to add a callback for an unknown server should be refused");
CHECK_THROWS_AS(guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{2}},
callback, ""),
arangodb::basics::Exception);
}
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Set state to { serverA => 1 }
state.emplace(serverA, RebootId{1});
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO(
"Callback must not be called when the state is "
"set to an older RebootId");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO(
"Callback must not be called when the state is "
"set to the same RebootId");
CHECK(0 == numCalled);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO(
"Callback must not be called after a change, as "
"it shouldn't have been registered");
CHECK(0 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(0 == numCalled);
}
}
SECTION("two_servers_call_interleaved") {
auto state = std::unordered_map<ServerID, RebootId>{{serverA, RebootId{1}}};
uint64_t numCalled = 0;
auto callback = [&numCalled]() { ++numCalled; };
{
RebootTracker rebootTracker{scheduler.get()};
std::vector<CallbackGuard> guards{};
CallbackGuard guard;
// Set state to { serverA => 1 }
rebootTracker.updateServerState(state);
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{1}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(0 == numCalled);
}
// Set state to { serverA => 2 }
state.at(serverA) = RebootId{2};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must be called after a change");
CHECK(1 == numCalled);
}
// Set state to { serverA => 3 }
state.at(serverA) = RebootId{3};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must not be called twice");
CHECK(1 == numCalled);
}
// Register callback
guard = rebootTracker.callMeOnChange(PeerState{serverA, RebootId{3}},
callback, "");
guards.emplace_back(std::move(guard));
waitForSchedulerEmpty();
{
INFO("Callback must not be called before a change");
CHECK(1 == numCalled);
}
// Set state to { serverA => 4 }
state.at(serverA) = RebootId{4};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must be called after a change");
CHECK(2 == numCalled);
}
// Set state to { serverA => 5 }
state.at(serverA) = RebootId{5};
rebootTracker.updateServerState(state);
waitForSchedulerEmpty();
{
INFO("Callback must not be called twice");
CHECK(2 == numCalled);
}
guards.clear();
{
INFO("Callback must not be called when guards are destroyed");
CHECK(2 == numCalled);
}
}
// RebootTracker was destroyed now
waitForSchedulerEmpty();
{
INFO("Callback must not be called during destruction");
CHECK(2 == numCalled);
}
}
}

View File

@ -22,11 +22,12 @@
////////////////////////////////////////////////////////////////////////////////
#include "AgencyMock.h"
#include "Agency/Store.h"
#include "Basics/ConditionLocker.h"
#include "Basics/NumberUtils.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Agency/Store.h"
#include "lib/Rest/HttpResponse.h"
#include <velocypack/velocypack-aliases.h>

217
tests/Mocks/Servers.cpp Normal file
View File

@ -0,0 +1,217 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "Servers.h"
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Aql/AqlFunctionFeature.h"
#include "Aql/OptimizerRulesFeature.h"
#include "Aql/Query.h"
#include "Cluster/ClusterFeature.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "IResearch/IResearchAnalyzerFeature.h"
#include "IResearch/IResearchCommon.h"
#include "IResearch/IResearchFeature.h"
#include "Logger/LogTopic.h"
#include "Logger/Logger.h"
#include "RestServer/AqlFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/SystemDatabaseFeature.h"
#include "RestServer/TraverserEngineRegistryFeature.h"
#include "RestServer/ViewTypesFeature.h"
#include "Sharding/ShardingFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "Transaction/Methods.h"
#include "Transaction/StandaloneContext.h"
#include "VocBase/vocbase.h"
#include "utils/log.hpp"
#if USE_ENTERPRISE
#include "Enterprise/Ldap/LdapFeature.h"
#endif
using namespace arangodb;
using namespace arangodb::tests;
using namespace arangodb::tests::mocks;
MockServer::MockServer() : _server(nullptr, nullptr), _engine(_server) {
arangodb::EngineSelectorFeature::ENGINE = &_engine;
init();
}
MockServer::~MockServer() {
_system.reset(); // destroy before reseting the 'ENGINE'
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
stopFeatures();
}
void MockServer::init() {
arangodb::transaction::Methods::clearDataSourceRegistrationCallbacks();
}
void MockServer::startFeatures() {
for (auto& f : _features) {
arangodb::application_features::ApplicationServer::server->addFeature(f.first);
}
for (auto& f : _features) {
f.first->prepare();
}
for (auto& f : _features) {
if (f.second) {
f.first->start();
}
}
}
void MockServer::stopFeatures() {
// destroy application features
for (auto& f : _features) {
if (f.second) {
f.first->stop();
}
}
for (auto& f : _features) {
f.first->unprepare();
}
}
TRI_vocbase_t& MockServer::getSystemDatabase() const {
TRI_ASSERT(_system != nullptr);
return *(_system.get());
}
MockAqlServer::MockAqlServer() : MockServer() {
// suppress INFO {authentication} Authentication is turned on (system only), authentication for unix sockets is turned on
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(),
arangodb::LogLevel::WARN);
// suppress log messages since tests check error conditions
arangodb::LogTopic::setLogLevel(arangodb::Logger::FIXME.name(), arangodb::LogLevel::ERR); // suppress WARNING DefaultCustomTypeHandler called
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(),
arangodb::LogLevel::FATAL);
irs::logger::output_le(::iresearch::logger::IRL_FATAL, stderr);
// setup required application features
_features.emplace_back(new arangodb::ViewTypesFeature(_server), true);
_features.emplace_back(new arangodb::AuthenticationFeature(_server), true);
_features.emplace_back(new arangodb::DatabasePathFeature(_server), false);
_features.emplace_back(new arangodb::DatabaseFeature(_server), false);
_features.emplace_back(new arangodb::ShardingFeature(_server), true);
_features.emplace_back(new arangodb::QueryRegistryFeature(_server), false); // must be first
arangodb::application_features::ApplicationServer::server->addFeature(
_features.back().first); // need QueryRegistryFeature feature to be added now in order to create the system database
_system = std::make_unique<TRI_vocbase_t>(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL,
0, TRI_VOC_SYSTEM_DATABASE);
_features.emplace_back(new arangodb::SystemDatabaseFeature(_server, _system.get()),
false); // required for IResearchAnalyzerFeature
_features.emplace_back(new arangodb::TraverserEngineRegistryFeature(_server), false); // must be before AqlFeature
_features.emplace_back(new arangodb::AqlFeature(_server), true);
_features.emplace_back(new arangodb::aql::OptimizerRulesFeature(_server), true);
_features.emplace_back(new arangodb::aql::AqlFunctionFeature(_server), true); // required for IResearchAnalyzerFeature
_features.emplace_back(new arangodb::iresearch::IResearchAnalyzerFeature(_server), true);
_features.emplace_back(new arangodb::iresearch::IResearchFeature(_server), true);
#if USE_ENTERPRISE
_features.emplace_back(new arangodb::LdapFeature(_server),
false); // required for AuthenticationFeature with USE_ENTERPRISE
#endif
startFeatures();
}
MockAqlServer::~MockAqlServer() {
arangodb::AqlFeature(_server).stop(); // unset singleton instance
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(),
arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::FIXME.name(), arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(),
arangodb::LogLevel::DEFAULT);
}
std::shared_ptr<arangodb::transaction::Methods> MockAqlServer::createFakeTransaction() const {
std::vector<std::string> noCollections{};
transaction::Options opts;
auto ctx = transaction::StandaloneContext::Create(getSystemDatabase());
return std::make_shared<arangodb::transaction::Methods>(ctx, noCollections, noCollections,
noCollections, opts);
}
std::unique_ptr<arangodb::aql::Query> MockAqlServer::createFakeQuery() const {
auto bindParams = std::make_shared<VPackBuilder>();
bindParams->openObject();
bindParams->close();
auto queryOptions = std::make_shared<VPackBuilder>();
queryOptions->openObject();
queryOptions->close();
aql::QueryString fakeQueryString("");
auto query =
std::make_unique<arangodb::aql::Query>(false, getSystemDatabase(),
fakeQueryString, bindParams, queryOptions,
arangodb::aql::QueryPart::PART_DEPENDENT);
query->injectTransaction(createFakeTransaction());
return query;
}
MockRestServer::MockRestServer() : MockServer() {
// suppress INFO {authentication} Authentication is turned on (system only), authentication for unix sockets is turned on
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(),
arangodb::LogLevel::WARN);
// suppress log messages since tests check error conditions
arangodb::LogTopic::setLogLevel(arangodb::Logger::FIXME.name(), arangodb::LogLevel::ERR); // suppress WARNING DefaultCustomTypeHandler called
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(),
arangodb::LogLevel::FATAL);
irs::logger::output_le(::iresearch::logger::IRL_FATAL, stderr);
_features.emplace_back(new arangodb::AuthenticationFeature(_server), false);
_features.emplace_back(new arangodb::DatabaseFeature(_server), false);
_features.emplace_back(new arangodb::QueryRegistryFeature(_server), false);
arangodb::application_features::ApplicationServer::server->addFeature(
_features.back().first);
_system = std::make_unique<TRI_vocbase_t>(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL,
0, TRI_VOC_SYSTEM_DATABASE);
_features.emplace_back(new arangodb::SystemDatabaseFeature(_server, _system.get()), false);
#if USE_ENTERPRISE
_features.emplace_back(new arangodb::LdapFeature(_server),
false); // required for AuthenticationFeature with USE_ENTERPRISE
#endif
startFeatures();
}
MockRestServer::~MockRestServer() {
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(),
arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::FIXME.name(), arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(),
arangodb::LogLevel::DEFAULT);
}
MockEmptyServer::MockEmptyServer() {
startFeatures();
}
MockEmptyServer::~MockEmptyServer() = default;

107
tests/Mocks/Servers.h Normal file
View File

@ -0,0 +1,107 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_TESTS_MOCKS_SERVERS_H
#define ARANGODB_TESTS_MOCKS_SERVERS_H 1
#include "ApplicationFeatures/ApplicationServer.h"
#include "IResearch/StorageEngineMock.h"
struct TRI_vocbase_t;
namespace arangodb {
namespace transaction {
class Methods;
}
namespace aql {
class Query;
}
namespace application_features {
class ApplicationFeature;
}
namespace tests {
namespace mocks {
class MockApplicationServer : public arangodb::application_features::ApplicationServer {
public:
MockApplicationServer(std::shared_ptr<options::ProgramOptions> options, char const* binaryPath)
: ApplicationServer(std::move(options), binaryPath) {}
~MockApplicationServer() override = default;
// Appear to be started
application_features::ServerState state() const override { return application_features::ServerState::IN_START; }
};
class MockServer {
public:
MockServer();
virtual ~MockServer();
void init();
TRI_vocbase_t& getSystemDatabase() const;
protected:
// Implementation knows the place when all features are included
void startFeatures();
private:
// Will be called by destructor
void stopFeatures();
protected:
MockApplicationServer _server;
StorageEngineMock _engine;
std::unique_ptr<TRI_vocbase_t> _system;
std::vector<std::pair<arangodb::application_features::ApplicationFeature*, bool>> _features;
};
class MockEmptyServer : public MockServer {
public:
MockEmptyServer();
~MockEmptyServer() override;
};
class MockAqlServer : public MockServer {
public:
MockAqlServer();
~MockAqlServer() override;
std::shared_ptr<arangodb::transaction::Methods> createFakeTransaction() const;
std::unique_ptr<arangodb::aql::Query> createFakeQuery() const;
};
class MockRestServer : public MockServer {
public:
MockRestServer();
~MockRestServer() override;
};
} // namespace mocks
} // namespace tests
} // namespace arangodb
#endif

View File

@ -25,6 +25,16 @@
#include "src/objects-inl.h" // (required to avoid compile warnings) must inclide V8 _before_ "catch.cpp' or CATCH() macro will be broken
#include "src/objects/scope-info.h" // must inclide V8 _before_ "catch.cpp' or CATCH() macro will be broken
#ifdef _WIN32
// V8 breaks several constants used by windows headers. This later introduces compile errors in other includes,
// e.g. via
// ShardingFeature.h -> ShardingStrategy.h -> ClusterInfo.h -> RebootTracker.h -> Scheduler.h -> Basics/asio_ns.h -> <asio/error.hpp>
// which, at some point, includes mswsockdef.h and mswsock.h.
// This fixes these errors.
#define CONST const
#define VOID void
#endif
#include "catch.hpp"
#include "../IResearch/common.h"
#include "../IResearch/StorageEngineMock.h"