From 1476ddbd0d49e9796899d140bd3ab406f7053b79 Mon Sep 17 00:00:00 2001 From: Jan Date: Fri, 15 Nov 2019 10:42:04 +0100 Subject: [PATCH] Bug fix/fix cluster upgrade (#10411) --- CMakeLists.txt | 8 +- arangod/CMakeLists.txt | 2 +- arangod/Cluster/ClusterFeature.cpp | 41 ++-- arangod/Cluster/ClusterFeature.h | 4 +- arangod/Cluster/ClusterUpgradeFeature.cpp | 223 ++++++++++++++++++ arangod/Cluster/ClusterUpgradeFeature.h | 55 +++++ .../IResearchAnalyzerCollectionFeature.cpp | 73 ------ .../IResearchAnalyzerCollectionFeature.h | 42 ---- .../IResearch/IResearchAnalyzerFeature.cpp | 1 - arangod/IResearch/IResearchFeature.cpp | 37 --- arangod/RestHandler/RestWalAccessHandler.cpp | 4 +- arangod/RestServer/BootstrapFeature.cpp | 76 +++--- arangod/RestServer/BootstrapFeature.h | 3 + arangod/RestServer/DatabaseFeature.h | 1 + arangod/RestServer/UpgradeFeature.cpp | 60 +++-- arangod/RestServer/UpgradeFeature.h | 18 +- arangod/RestServer/arangod.cpp | 5 +- arangod/RocksDBEngine/RocksDBThrottle.cpp | 12 +- arangod/Statistics/StatisticsFeature.cpp | 7 + .../StorageEngine/EngineSelectorFeature.cpp | 12 +- arangod/VocBase/Methods/Tasks.cpp | 6 + arangod/VocBase/Methods/Tasks.h | 1 + arangod/VocBase/Methods/Upgrade.cpp | 54 +++-- arangod/VocBase/Methods/Upgrade.h | 7 +- arangod/VocBase/Methods/UpgradeTasks.cpp | 22 +- arangod/VocBase/Methods/Version.cpp | 15 +- lib/ApplicationFeatures/ApplicationServer.cpp | 2 +- lib/Basics/StringUtils.cpp | 10 +- lib/Logger/LogThread.cpp | 4 +- lib/Logger/LogThread.h | 2 +- lib/Logger/Logger.cpp | 31 ++- lib/Logger/LoggerStream.cpp | 17 +- lib/Rest/Version.cpp | 7 +- 33 files changed, 544 insertions(+), 318 deletions(-) create mode 100644 arangod/Cluster/ClusterUpgradeFeature.cpp create mode 100644 arangod/Cluster/ClusterUpgradeFeature.h delete mode 100644 arangod/IResearch/IResearchAnalyzerCollectionFeature.cpp delete mode 100644 arangod/IResearch/IResearchAnalyzerCollectionFeature.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5bcf91b3d3..5e8bbb4938 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -808,7 +808,7 @@ set_property(CACHE USE_IPO PROPERTY STRINGS AUTO ON OFF) set(IPO_ENABLED False) # Determine value if IPO_ENABLED from USE_IPO and CMAKE_BUILD_TYPE -if(USE_IPO STREQUAL "AUTO") +if (USE_IPO STREQUAL "AUTO") # When USE_IPO=AUTO, enable IPO for optimized / release builds. # But to work around a g++ segfault triggered by using both -flto and # -fno-devirtualize-functions, we disable IPO when using google tests, because @@ -825,7 +825,7 @@ if(USE_IPO STREQUAL "AUTO") else() set(IPO_ENABLED False) endif () -elseif(USE_IPO) +elseif (USE_IPO) set(IPO_ENABLED True) else() set(IPO_ENABLED False) @@ -833,6 +833,10 @@ endif() message(STATUS "IPO_ENABLED: ${IPO_ENABLED}") set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ${IPO_ENABLED}) + +if (IPO_ENABLED) + add_definitions("-DARANGODB_USE_IPO=1") +endif() ################################################################################ ## LIBRARY RESOLV diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index cebbfe8ebd..0aa6b7c446 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -52,7 +52,6 @@ add_library(arango_iresearch IResearch/ApplicationServerHelper.h IResearch/ApplicationServerHelper.cpp IResearch/Containers.cpp IResearch/Containers.h IResearch/IResearchAnalyzerFeature.cpp IResearch/IResearchAnalyzerFeature.h - IResearch/IResearchAnalyzerCollectionFeature.cpp IResearch/IResearchCommon.cpp IResearch/IResearchCommon.h IResearch/IResearchKludge.cpp IResearch/IResearchKludge.h IResearch/IResearchLink.cpp IResearch/IResearchLink.h @@ -543,6 +542,7 @@ set(LIB_ARANGOSERVER_SOURCES Cluster/ClusterInfo.cpp Cluster/ClusterRepairDistributeShardsLike.cpp Cluster/ClusterRepairOperations.cpp + Cluster/ClusterUpgradeFeature.cpp Cluster/ClusterTrxMethods.cpp Cluster/ClusterTypes.cpp Cluster/CreateCollection.cpp diff --git a/arangod/Cluster/ClusterFeature.cpp b/arangod/Cluster/ClusterFeature.cpp index e929de13cb..386e69d445 100644 --- a/arangod/Cluster/ClusterFeature.cpp +++ b/arangod/Cluster/ClusterFeature.cpp @@ -547,22 +547,7 @@ void ClusterFeature::start() { void ClusterFeature::beginShutdown() { ClusterComm::instance()->disable(); } void ClusterFeature::stop() { - if (_heartbeatThread != nullptr) { - _heartbeatThread->beginShutdown(); - } - - if (_heartbeatThread != nullptr) { - int counter = 0; - while (_heartbeatThread->isRunning()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - // emit warning after 5 seconds - if (++counter == 10 * 5) { - LOG_TOPIC("acaa9", WARN, arangodb::Logger::CLUSTER) - << "waiting for heartbeat thread to finish"; - } - } - } - + shutdownHeartbeatThread(); ClusterComm::instance()->stopBackgroundThreads(); } @@ -572,9 +557,7 @@ void ClusterFeature::unprepare() { return; } - if (_heartbeatThread != nullptr) { - _heartbeatThread->beginShutdown(); - } + shutdownHeartbeatThread(); // change into shutdown state ServerState::instance()->setState(ServerState::STATE_SHUTDOWN); @@ -667,7 +650,7 @@ void ClusterFeature::setUnregisterOnShutdown(bool unregisterOnShutdown) { /// @brief common routine to start heartbeat with or without cluster active void ClusterFeature::startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry, uint64_t interval_ms, uint64_t maxFailsBeforeWarning, - const std::string& endpoints) { + std::string const& endpoints) { _heartbeatThread = std::make_shared(server(), agencyCallbackRegistry, std::chrono::microseconds(interval_ms * 1000), @@ -686,6 +669,24 @@ void ClusterFeature::startHeartbeatThread(AgencyCallbackRegistry* agencyCallback } } +void ClusterFeature::shutdownHeartbeatThread() { + if (_heartbeatThread == nullptr) { + return; + } + + _heartbeatThread->beginShutdown(); + + int counter = 0; + while (_heartbeatThread->isRunning()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // emit warning after 5 seconds + if (++counter == 10 * 5) { + LOG_TOPIC("acaa9", WARN, arangodb::Logger::CLUSTER) + << "waiting for heartbeat thread to finish"; + } + } +} + void ClusterFeature::syncDBServerStatusQuo() { if (_heartbeatThread != nullptr) { _heartbeatThread->syncDBServerStatusQuo(true); diff --git a/arangod/Cluster/ClusterFeature.h b/arangod/Cluster/ClusterFeature.h index 5ef04cdc8c..e757466a1c 100644 --- a/arangod/Cluster/ClusterFeature.h +++ b/arangod/Cluster/ClusterFeature.h @@ -87,7 +87,9 @@ class ClusterFeature : public application_features::ApplicationFeature { protected: void startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry, uint64_t interval_ms, uint64_t maxFailsBeforeWarning, - const std::string& endpoints); + std::string const& endpoints); + + void shutdownHeartbeatThread(); private: void reportRole(ServerState::RoleEnum); diff --git a/arangod/Cluster/ClusterUpgradeFeature.cpp b/arangod/Cluster/ClusterUpgradeFeature.cpp new file mode 100644 index 0000000000..fee1a98dfc --- /dev/null +++ b/arangod/Cluster/ClusterUpgradeFeature.cpp @@ -0,0 +1,223 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#include "Cluster/ClusterUpgradeFeature.h" + +#include "Agency/AgencyComm.h" +#include "Agency/AgencyFeature.h" +#include "ApplicationFeatures/ApplicationServer.h" +#include "Basics/ScopeGuard.h" +#include "Cluster/ServerState.h" +#include "FeaturePhases/FinalFeaturePhase.h" +#include "Logger/LogMacros.h" +#include "ProgramOptions/ProgramOptions.h" +#include "RestServer/DatabaseFeature.h" +#include "RestServer/UpgradeFeature.h" +#include "VocBase/vocbase.h" +#include "VocBase/Methods/Upgrade.h" +#include "VocBase/Methods/Version.h" + +using namespace arangodb; +using namespace arangodb::options; + +namespace { +static std::string const upgradeVersionKey = "ClusterUpgradeVersion"; +static std::string const upgradeExecutedByKey = "ClusterUpgradeExecutedBy"; +} + +ClusterUpgradeFeature::ClusterUpgradeFeature(application_features::ApplicationServer& server) + : ApplicationFeature(server, "ClusterUpgrade"), + _upgradeMode("auto") { + startsAfter(); +} + +void ClusterUpgradeFeature::collectOptions(std::shared_ptr options) { + options->addOption("--cluster.upgrade", + "perform a cluster upgrade if necessary (auto = perform upgrade and shut down only if `--database.auto-upgrade true` is set, disable = never perform upgrade, force = always perform an upgrade and shut down, online = always perform an upgrade but don't shut down)", + new DiscreteValuesParameter(&_upgradeMode, std::unordered_set{"auto", "disable", "force", "online"})); +} + +void ClusterUpgradeFeature::validateOptions(std::shared_ptr options) { + auto& databaseFeature = server().getFeature(); + if (_upgradeMode == "force") { + // always perform an upgrade, regardless of the value of `--database.auto-upgrade`. + // after the upgrade, shut down the server + databaseFeature.enableUpgrade(); + } else if (_upgradeMode == "disable") { + // never perform an upgrade, regardless of the value of `--database.auto-upgrade`. + // don't shut down the server + databaseFeature.disableUpgrade(); + } else if (_upgradeMode == "online") { + // perform an upgrade, but stay online and don't shut down the server. + // disabling the upgrade functionality in the database feature is required for this. + databaseFeature.disableUpgrade(); + } +} + +void ClusterUpgradeFeature::start() { + if (!ServerState::instance()->isCoordinator()) { + return; + } + + // this feature is doing something meaning only in a coordinator, and only + // if the server was started with the option `--database.auto-upgrade true`. + auto& databaseFeature = server().getFeature(); + if (_upgradeMode == "disable" || (!databaseFeature.upgrade() && (_upgradeMode != "online" && _upgradeMode != "force"))) { + return; + } + + tryClusterUpgrade(); + + if (_upgradeMode != "online") { + LOG_TOPIC("d6047", INFO, arangodb::Logger::STARTUP) << "server will now shut down due to upgrade."; + server().beginShutdown(); + } +} + +void ClusterUpgradeFeature::setBootstrapVersion() { + // it is not a fundamental problem if the setValue fails. if it fails, we can't + // store the version number in the agency, so an upgrade we will run all the + // (idempotent) upgrade tasks for the same version again. + VPackBuilder builder; + builder.add(VPackValue(arangodb::methods::Version::current())); + + AgencyComm agency; + agency.setValue(::upgradeVersionKey, builder.slice(), 0); +} + +void ClusterUpgradeFeature::tryClusterUpgrade() { + TRI_ASSERT(ServerState::instance()->isCoordinator()); + + AgencyComm agency; + AgencyCommResult result = agency.getValues(::upgradeVersionKey); + + if (!result.successful()) { + LOG_TOPIC("26104", ERR, arangodb::Logger::CLUSTER) << "unable to fetch cluster upgrade version from agency: " << result.errorMessage(); + return; + } + + uint64_t latestUpgradeVersion = 0; + VPackSlice value = result.slice()[0].get( + std::vector({AgencyCommManager::path(), ::upgradeVersionKey})); + if (value.isNumber()) { + latestUpgradeVersion = value.getNumber(); + LOG_TOPIC("54f69", DEBUG, arangodb::Logger::CLUSTER) << "found previous cluster upgrade version in agency: " << latestUpgradeVersion; + } else { + // key not there yet. + LOG_TOPIC("5b00d", DEBUG, arangodb::Logger::CLUSTER) << "did not find previous cluster upgrade version in agency"; + } + + if (arangodb::methods::Version::current() <= latestUpgradeVersion) { + // nothing to do + return; + } + + std::vector precs; + if (latestUpgradeVersion == 0) { + precs.emplace_back(::upgradeVersionKey, AgencyPrecondition::Type::EMPTY, true); + } else { + precs.emplace_back(::upgradeVersionKey, AgencyPrecondition::Type::VALUE, latestUpgradeVersion); + } + // there must be no other coordinator that performs an upgrade at the same time + precs.emplace_back(::upgradeExecutedByKey, AgencyPrecondition::Type::EMPTY, true); + + // try to register ourselves as responsible for the upgrade + AgencyOperation operation(::upgradeExecutedByKey, AgencyValueOperationType::SET, ServerState::instance()->getId()); + // make the key expire automatically in case we crash + // operation._ttl = TRI_microtime() + 1800.0; + AgencyWriteTransaction transaction(operation, precs); + + result = agency.sendTransactionWithFailover(transaction); + if (result.successful()) { + // we are responsible for the upgrade! + LOG_TOPIC("15ac4", INFO, arangodb::Logger::CLUSTER) + << "running cluster upgrade from " + << (latestUpgradeVersion == 0 ? std::string("an unknown version") : std::string("version ") + std::to_string(latestUpgradeVersion)) + << " to version " << arangodb::methods::Version::current() << "..."; + + bool success = false; + try { + success = upgradeCoordinator(); + } catch (std::exception const& ex) { + LOG_TOPIC("f2a84", ERR, Logger::CLUSTER) << "caught exception during cluster upgrade: " << ex.what(); + TRI_ASSERT(!success); + } + + // now finally remove the upgrading key and store the new version number + std::vector precs; + precs.emplace_back(::upgradeExecutedByKey, AgencyPrecondition::Type::VALUE, ServerState::instance()->getId()); + + std::vector operations; + if (success) { + // upgrade successful - store our current version number + operations.emplace_back(::upgradeVersionKey, AgencyValueOperationType::SET, arangodb::methods::Version::current()); + } + // remove the key that locks out other coordinators from upgrading + operations.emplace_back(::upgradeExecutedByKey, AgencySimpleOperationType::DELETE_OP); + AgencyWriteTransaction transaction(operations, precs); + + result = agency.sendTransactionWithFailover(transaction); + if (result.successful()) { + LOG_TOPIC("853de", INFO, arangodb::Logger::CLUSTER) + << "cluster upgrade to version " << arangodb::methods::Version::current() + << " completed successfully"; + } else { + LOG_TOPIC("a0b4f", ERR, arangodb::Logger::CLUSTER) << "unable to store cluster upgrade information in agency: " << result.errorMessage(); + } + } else if (result.httpCode() != (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { + LOG_TOPIC("482a3", WARN, arangodb::Logger::CLUSTER) << "unable to fetch upgrade information: " << result.errorMessage(); + } else { + // someone else is performing the upgrade + LOG_TOPIC("ab6eb", DEBUG, arangodb::Logger::CLUSTER) << "someone else is running the cluster upgrade right now"; + } +} + +bool ClusterUpgradeFeature::upgradeCoordinator() { + LOG_TOPIC("a2d65", TRACE, arangodb::Logger::FIXME) << "starting coordinator upgrade"; + + bool success = true; + DatabaseFeature& databaseFeature = server().getFeature(); + + for (auto& name : databaseFeature.getDatabaseNames()) { + TRI_vocbase_t* vocbase = databaseFeature.useDatabase(name); + + if (vocbase == nullptr) { + // probably deleted in the meantime... so we can ignore it here + continue; + } + + auto guard = scopeGuard([&vocbase]() { vocbase->release(); }); + + auto res = methods::Upgrade::startupCoordinator(*vocbase); + if (res.fail()) { + LOG_TOPIC("f51b1", ERR, arangodb::Logger::FIXME) + << "Database '" << vocbase->name() << "' upgrade failed (" + << res.errorMessage() << "). " + << "Please inspect the logs from the upgrade procedure" + << " and try starting the server again."; + success = false; + } + } + + LOG_TOPIC("efd49", TRACE, arangodb::Logger::FIXME) << "finished coordinator upgrade"; + return success; +} diff --git a/arangod/Cluster/ClusterUpgradeFeature.h b/arangod/Cluster/ClusterUpgradeFeature.h new file mode 100644 index 0000000000..40460ab857 --- /dev/null +++ b/arangod/Cluster/ClusterUpgradeFeature.h @@ -0,0 +1,55 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#ifndef APPLICATION_FEATURES_CLUSTER_UPGRADE_FEATURE_H +#define APPLICATION_FEATURES_CLUSTER_UPGRADE_FEATURE_H 1 + +#include "ApplicationFeatures/ApplicationFeature.h" + +namespace arangodb { + +// this feature is responsible for performing a cluster upgrade. +// it is only doing something in a coordinator, and only if the server was started +// with the option `--database.auto-upgrade true`. The feature is late in the +// startup sequence, so it can use the full cluster functionality when run. +// after the feature has executed the upgrade, it will shut down the server. +class ClusterUpgradeFeature final : public application_features::ApplicationFeature { + public: + explicit ClusterUpgradeFeature(application_features::ApplicationServer& server); + + void collectOptions(std::shared_ptr) override final; + void validateOptions(std::shared_ptr) override final; + void start() override final; + + void setBootstrapVersion(); + + private: + void tryClusterUpgrade(); + bool upgradeCoordinator(); + + private: + std::string _upgradeMode; +}; + +} // namespace arangodb + +#endif diff --git a/arangod/IResearch/IResearchAnalyzerCollectionFeature.cpp b/arangod/IResearch/IResearchAnalyzerCollectionFeature.cpp deleted file mode 100644 index 89beeb4f2a..0000000000 --- a/arangod/IResearch/IResearchAnalyzerCollectionFeature.cpp +++ /dev/null @@ -1,73 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2017 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Steemann -//////////////////////////////////////////////////////////////////////////////// - -#include "ApplicationServerHelper.h" -#include "Basics/StaticStrings.h" -#include "Cluster/ServerState.h" -#include "FeaturePhases/ClusterFeaturePhase.h" -#include "FeaturePhases/DatabaseFeaturePhase.h" -#include "FeaturePhases/ServerFeaturePhase.h" -#include "IResearch/IResearchAnalyzerCollectionFeature.h" -#include "IResearch/IResearchCommon.h" -#include "Logger/Logger.h" -#include "Logger/LogMacros.h" -#include "RestServer/BootstrapFeature.h" -#include "RestServer/DatabaseFeature.h" -#include "VocBase/Methods/Collections.h" - -using namespace arangodb; - -IResearchAnalyzerCollectionFeature::IResearchAnalyzerCollectionFeature(application_features::ApplicationServer& server) - : ApplicationFeature(server, "ArangoSearchAnalyzerCollection") { - setOptional(true); - startsAfter(); - // should be relatively late in startup sequence - startsAfter(); - startsAfter(); - startsAfter(); -} - -void IResearchAnalyzerCollectionFeature::start() { - if (ServerState::instance()->isDBServer()) { - // no need to execute this in DB server - return; - } - - DatabaseFeature* databaseFeature = DatabaseFeature::DATABASE; - TRI_ASSERT(databaseFeature != nullptr); - - databaseFeature->enumerateDatabases([](TRI_vocbase_t& vocbase) { - Result res = methods::Collections::lookup(vocbase, StaticStrings::AnalyzersCollection, [](std::shared_ptr const&) { - }); - - if (res.is(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) { - // collection does not yet exist, so let's create it now - auto res = methods::Collections::createSystem(vocbase, StaticStrings::AnalyzersCollection, false); - if (res.first.ok()) { - LOG_TOPIC("c2e33", DEBUG, arangodb::iresearch::TOPIC) << "successfully created '" << StaticStrings::AnalyzersCollection << "' collection in database '" << vocbase.name() << "'"; - } else if (res.first.fail() && !res.first.is(TRI_ERROR_ARANGO_CONFLICT)) { - LOG_TOPIC("ecc23", WARN, arangodb::iresearch::TOPIC) << "unable to create '" << StaticStrings::AnalyzersCollection << "' collection: " << res.first.errorMessage(); - // don't abort startup here. the next startup may fix this - } - } - }); -} diff --git a/arangod/IResearch/IResearchAnalyzerCollectionFeature.h b/arangod/IResearch/IResearchAnalyzerCollectionFeature.h deleted file mode 100644 index 572522b9da..0000000000 --- a/arangod/IResearch/IResearchAnalyzerCollectionFeature.h +++ /dev/null @@ -1,42 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2017 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Steemann -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGOD_IRESEARCH__IRESEARCH_ANALYZER_COLLECTION_FEATURE_H -#define ARANGOD_IRESEARCH__IRESEARCH_ANALYZER_COLLECTION_FEATURE_H 1 - -#include "ApplicationFeatures/ApplicationFeature.h" - -namespace arangodb { - -/// @brief the sole purpose of this feature is to create potentially -/// missing `_analyzers` collection after startup. It can be removed -/// eventually once the entire upgrading logic has been revised -class IResearchAnalyzerCollectionFeature final : public arangodb::application_features::ApplicationFeature { - public: - explicit IResearchAnalyzerCollectionFeature(arangodb::application_features::ApplicationServer& server); - - void start() override; -}; - -} // namespace arangodb - -#endif diff --git a/arangod/IResearch/IResearchAnalyzerFeature.cpp b/arangod/IResearch/IResearchAnalyzerFeature.cpp index 2857ca338e..45bbed5107 100644 --- a/arangod/IResearch/IResearchAnalyzerFeature.cpp +++ b/arangod/IResearch/IResearchAnalyzerFeature.cpp @@ -65,7 +65,6 @@ #include "RestServer/DatabaseFeature.h" #include "RestServer/QueryRegistryFeature.h" #include "RestServer/SystemDatabaseFeature.h" -#include "RestServer/UpgradeFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "Transaction/StandaloneContext.h" diff --git a/arangod/IResearch/IResearchFeature.cpp b/arangod/IResearch/IResearchFeature.cpp index 4cddbe912d..0780fdda58 100644 --- a/arangod/IResearch/IResearchFeature.cpp +++ b/arangod/IResearch/IResearchFeature.cpp @@ -213,43 +213,6 @@ bool upgradeSingleServerArangoSearchView0_1( arangodb::velocypack::Slice const& /*upgradeParams*/) { using arangodb::application_features::ApplicationServer; - // NOTE: during the upgrade 'ClusterFeature' is disabled which means 'ClusterFeature::validateOptions(...)' - // hasn't been called and server role in 'ServerState' is not set properly. - // In order to upgrade ArangoSearch views from version 0 to version 1 we need to - // differentiate between single server and cluster, therefore we temporary set role in 'ServerState', - // actually supplied by a user, only for the duration of task to avoid other upgrade tasks, that - // potentially rely on the original behavior, to be affected. - struct ServerRoleGuard { - ServerRoleGuard() { - auto& server = ApplicationServer::server(); - auto* state = arangodb::ServerState::instance(); - - if (state && server.hasFeature()) { - auto const& clusterFeature = server.getFeature(); - if (!clusterFeature.isEnabled()) { - auto const role = arangodb::ServerState::stringToRole(clusterFeature.myRole()); - - // only for cluster - if (arangodb::ServerState::isClusterRole(role)) { - _originalRole = state->getRole(); - state->setRole(role); - _state = state; - } - } - } - } - - ~ServerRoleGuard() { - if (_state) { - // restore the original server role - _state->setRole(_originalRole); - } - } - - arangodb::ServerState* _state{}; - arangodb::ServerState::RoleEnum _originalRole{arangodb::ServerState::ROLE_UNDEFINED}; - } guard; - if (!arangodb::ServerState::instance()->isSingleServer() && !arangodb::ServerState::instance()->isDBServer()) { return true; // not applicable for other ServerState roles diff --git a/arangod/RestHandler/RestWalAccessHandler.cpp b/arangod/RestHandler/RestWalAccessHandler.cpp index 20b9e7167a..7040e6f0b3 100644 --- a/arangod/RestHandler/RestWalAccessHandler.cpp +++ b/arangod/RestHandler/RestWalAccessHandler.cpp @@ -173,7 +173,7 @@ RestStatus RestWalAccessHandler::execute() { std::vector suffixes = _request->decodedSuffixes(); if (suffixes.empty()) { generateError(ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, - "expected GET _api/wal/[tail|range|lastTick]>"); + "expected GET /_api/wal/[tail|range|lastTick|open-transactions]>"); return RestStatus::DONE; } @@ -195,7 +195,7 @@ RestStatus RestWalAccessHandler::execute() { } else { generateError( ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, - "expected GET _api/wal/[tail|range|lastTick|open-transactions]>"); + "expected GET /_api/wal/[tail|range|lastTick|open-transactions]>"); } return RestStatus::DONE; diff --git a/arangod/RestServer/BootstrapFeature.cpp b/arangod/RestServer/BootstrapFeature.cpp index d2d1f171fb..2993b503a7 100644 --- a/arangod/RestServer/BootstrapFeature.cpp +++ b/arangod/RestServer/BootstrapFeature.cpp @@ -26,6 +26,7 @@ #include "Aql/QueryList.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" +#include "Cluster/ClusterUpgradeFeature.h" #include "Cluster/ServerState.h" #include "FeaturePhases/ServerFeaturePhase.h" #include "GeneralServer/AuthenticationFeature.h" @@ -59,7 +60,9 @@ using namespace arangodb; using namespace arangodb::options; BootstrapFeature::BootstrapFeature(application_features::ApplicationServer& server) - : ApplicationFeature(server, ::FEATURE_NAME), _isReady(false), _bark(false) { + : ApplicationFeature(server, ::FEATURE_NAME), + _isReady(false), + _bark(false) { startsAfter(); startsAfter(); @@ -91,7 +94,7 @@ namespace { /// Initialize certain agency entries, like Plan, system collections /// and various similar things. Only runs through on a SINGLE coordinator. -/// must only return if we are bootstrap lead or bootstrap is done +/// must only return if we are bootstrap lead or bootstrap is done. void raceForClusterBootstrap(BootstrapFeature& feature) { AgencyComm agency; auto& ci = feature.server().getFeature().clusterInfo(); @@ -185,6 +188,12 @@ void raceForClusterBootstrap(BootstrapFeature& feature) { b.add(VPackValue(arangodb::ServerState::instance()->getId() + ": done")); result = agency.setValue(::bootstrapKey, b.slice(), 0); if (result.successful()) { + // store current version number in agency to avoid unnecessary upgrades + // to the same version + if (feature.server().hasFeature()) { + ClusterUpgradeFeature& clusterUpgradeFeature = feature.server().getFeature(); + clusterUpgradeFeature.setBootstrapVersion(); + } return; } @@ -277,6 +286,8 @@ void runActiveFailoverStart(std::string const& myId) { } // namespace void BootstrapFeature::start() { + auto& databaseFeature = server().getFeature(); + arangodb::SystemDatabaseFeature::ptr vocbase = server().hasFeature() ? server().getFeature().use() @@ -284,7 +295,6 @@ void BootstrapFeature::start() { bool v8Enabled = V8DealerFeature::DEALER && V8DealerFeature::DEALER->isEnabled(); TRI_ASSERT(vocbase.get() != nullptr); - auto ss = ServerState::instance(); ServerState::RoleEnum role = ServerState::instance()->getRole(); if (ServerState::isRunningInCluster(role)) { @@ -295,7 +305,7 @@ void BootstrapFeature::start() { LOG_TOPIC("724e0", DEBUG, Logger::STARTUP) << "Racing for cluster bootstrap..."; raceForClusterBootstrap(*this); - if (v8Enabled) { + if (v8Enabled && !databaseFeature.upgrade()) { ::runCoordinatorJS(vocbase.get()); } } else if (ServerState::isDBServer(role)) { @@ -317,7 +327,7 @@ void BootstrapFeature::start() { if (ServerState::isSingleServer(role) && AgencyCommManager::isEnabled()) { ::runActiveFailoverStart(myId); } else { - ss->setFoxxmaster(myId); // could be empty, but set anyway + ServerState::instance()->setFoxxmaster(myId); // could be empty, but set anyway } if (v8Enabled) { // runs the single server bootstrap JS @@ -344,35 +354,17 @@ void BootstrapFeature::start() { } if (ServerState::isCoordinator(role)) { - LOG_TOPIC("4000c", DEBUG, arangodb::Logger::CLUSTER) << "waiting for our health entry to appear in Supervision/Health"; - bool found = false; - AgencyComm agency; - int tries = 0; - while (++tries < 30) { - AgencyCommResult result = agency.getValues(::healthKey); - if (result.successful()) { - VPackSlice value = result.slice()[0].get( - std::vector({AgencyCommManager::path(), "Supervision", "Health", ServerState::instance()->getId(), "Status"})); - if (value.isString() && !value.copyString().empty()) { - found = true; - break; - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - if (found) { - LOG_TOPIC("b0de6", DEBUG, arangodb::Logger::CLUSTER) << "found our health entry in Supervision/Health"; - } else { - LOG_TOPIC("2c993", INFO, arangodb::Logger::CLUSTER) << "did not find our health entry after 15 s in Supervision/Health"; - } + waitForHealthEntry(); } + if (!databaseFeature.upgrade()) { + LOG_TOPIC("cf3f4", INFO, arangodb::Logger::FIXME) + << "ArangoDB (version " << ARANGODB_VERSION_FULL + << ") is ready for business. Have fun!"; + } - LOG_TOPIC("cf3f4", INFO, arangodb::Logger::FIXME) - << "ArangoDB (version " << ARANGODB_VERSION_FULL - << ") is ready for business. Have fun!"; if (_bark) { - LOG_TOPIC("bb9b7", INFO, arangodb::Logger::FIXME) << "The dog says: wau wau!"; + LOG_TOPIC("bb9b7", INFO, arangodb::Logger::FIXME) << "The dog says: Гав гав"; } _isReady = true; @@ -391,3 +383,27 @@ void BootstrapFeature::unprepare() { } } } + +void BootstrapFeature::waitForHealthEntry() { + LOG_TOPIC("4000c", DEBUG, arangodb::Logger::CLUSTER) << "waiting for our health entry to appear in Supervision/Health"; + bool found = false; + AgencyComm agency; + int tries = 0; + while (++tries < 30) { + AgencyCommResult result = agency.getValues(::healthKey); + if (result.successful()) { + VPackSlice value = result.slice()[0].get( + std::vector({AgencyCommManager::path(), "Supervision", "Health", ServerState::instance()->getId(), "Status"})); + if (value.isString() && !value.copyString().empty()) { + found = true; + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + if (found) { + LOG_TOPIC("b0de6", DEBUG, arangodb::Logger::CLUSTER) << "found our health entry in Supervision/Health"; + } else { + LOG_TOPIC("2c993", INFO, arangodb::Logger::CLUSTER) << "did not find our health entry after 15 s in Supervision/Health"; + } +} diff --git a/arangod/RestServer/BootstrapFeature.h b/arangod/RestServer/BootstrapFeature.h index f7797605fd..89d63f53fc 100644 --- a/arangod/RestServer/BootstrapFeature.h +++ b/arangod/RestServer/BootstrapFeature.h @@ -39,6 +39,9 @@ class BootstrapFeature final : public application_features::ApplicationFeature { bool isReady() const { return _isReady; } + private: + void waitForHealthEntry(); + private: bool _isReady; bool _bark; diff --git a/arangod/RestServer/DatabaseFeature.h b/arangod/RestServer/DatabaseFeature.h index 0f89274ad7..8de4dee58e 100644 --- a/arangod/RestServer/DatabaseFeature.h +++ b/arangod/RestServer/DatabaseFeature.h @@ -143,6 +143,7 @@ class DatabaseFeature : public application_features::ApplicationFeature { void enableCheckVersion() { _checkVersion = true; } void enableUpgrade() { _upgrade = true; } + void disableUpgrade() { _upgrade = false; } bool throwCollectionNotLoadedError() const { return _throwCollectionNotLoadedError.load(std::memory_order_relaxed); } diff --git a/arangod/RestServer/UpgradeFeature.cpp b/arangod/RestServer/UpgradeFeature.cpp index 3934ceb351..5ef201b612 100644 --- a/arangod/RestServer/UpgradeFeature.cpp +++ b/arangod/RestServer/UpgradeFeature.cpp @@ -22,9 +22,14 @@ #include "UpgradeFeature.h" +#include "ApplicationFeatures/DaemonFeature.h" #include "ApplicationFeatures/HttpEndpointProvider.h" +#include "ApplicationFeatures/GreetingsFeature.h" +#include "ApplicationFeatures/SupervisorFeature.h" #include "Basics/application-exit.h" +#include "Basics/ScopeGuard.h" #include "Cluster/ClusterFeature.h" +#include "Cluster/ServerState.h" #include "FeaturePhases/AqlFeaturePhase.h" #include "GeneralServer/AuthenticationFeature.h" #include "Logger/LogMacros.h" @@ -32,6 +37,7 @@ #include "Logger/LoggerStream.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" +#include "Pregel/PregelFeature.h" #include "Replication/ReplicationFeature.h" #include "RestServer/BootstrapFeature.h" #include "RestServer/DatabaseFeature.h" @@ -110,32 +116,40 @@ void UpgradeFeature::validateOptions(std::shared_ptr options) { "'--database.upgrade-check false'"; FATAL_ERROR_EXIT(); } - + if (!_upgrade) { LOG_TOPIC("ed226", TRACE, arangodb::Logger::FIXME) << "executing upgrade check: not disabling server features"; return; } - + LOG_TOPIC("23525", INFO, arangodb::Logger::FIXME) - << "executing upgrade procedure: disabling server features"; + << "executing upgrade procedure: disabling server features"; - server().forceDisableFeatures(_nonServerFeatures); - std::vector otherFeaturesToDisable = { - std::type_index(typeid(BootstrapFeature)), - std::type_index(typeid(HttpEndpointProvider)), - }; - server().forceDisableFeatures(otherFeaturesToDisable); + // if we run the upgrade, we need to disable a few features that may get + // in the way... + if (ServerState::instance()->isCoordinator()) { + std::vector otherFeaturesToDisable = { + std::type_index(typeid(DaemonFeature)), + std::type_index(typeid(GreetingsFeature)), + std::type_index(typeid(pregel::PregelFeature)), + std::type_index(typeid(SupervisorFeature)) + }; + server().forceDisableFeatures(otherFeaturesToDisable); + } else { + server().forceDisableFeatures(_nonServerFeatures); + std::vector otherFeaturesToDisable = { + std::type_index(typeid(BootstrapFeature)), + std::type_index(typeid(HttpEndpointProvider)) + }; + server().forceDisableFeatures(otherFeaturesToDisable); + } ReplicationFeature& replicationFeature = server().getFeature(); replicationFeature.disableReplicationApplier(); DatabaseFeature& database = server().getFeature(); database.enableUpgrade(); - - ClusterFeature& cluster = server().getFeature(); - cluster.forceDisable(); - ServerState::instance()->setRole(ServerState::ROLE_SINGLE); } void UpgradeFeature::prepare() { @@ -149,7 +163,10 @@ void UpgradeFeature::start() { // upgrade the database if (_upgradeCheck) { - upgradeDatabase(); + if (!ServerState::instance()->isCoordinator()) { + // no need to run local upgrades in the coordinator + upgradeLocalDatabase(); + } if (!init.restoreAdmin() && !init.defaultPassword().empty() && um != nullptr) { um->updateUser("root", [&](auth::User& user) { @@ -194,15 +211,20 @@ void UpgradeFeature::start() { *_result = EXIT_SUCCESS; } - LOG_TOPIC("7da27", INFO, arangodb::Logger::STARTUP) - << "server will now shut down due to upgrade, database initialization " - "or admin restoration."; + if (!ServerState::instance()->isCoordinator() || !_upgrade) { + LOG_TOPIC("7da27", INFO, arangodb::Logger::STARTUP) + << "server will now shut down due to upgrade, database initialization " + "or admin restoration."; - server().beginShutdown(); + // in the non-coordinator case, we are already done now and will shut down. + // in the coordinator case, the actual upgrade is performed by the + // ClusterUpgradeFeature, which is way later in the startup sequence. + server().beginShutdown(); + } } } -void UpgradeFeature::upgradeDatabase() { +void UpgradeFeature::upgradeLocalDatabase() { LOG_TOPIC("05dff", TRACE, arangodb::Logger::FIXME) << "starting database init/upgrade"; DatabaseFeature& databaseFeature = server().getFeature(); diff --git a/arangod/RestServer/UpgradeFeature.h b/arangod/RestServer/UpgradeFeature.h index 4b091ff5c6..973f88ff87 100644 --- a/arangod/RestServer/UpgradeFeature.h +++ b/arangod/RestServer/UpgradeFeature.h @@ -28,16 +28,30 @@ namespace arangodb { +// this feature is responsible for performing a database upgrade. +// it is only doing something if the server was started with the option +// `--database.auto-upgrade true` or `--database.check-version true`. +// On a coordinator this feature will *not* perform the actual upgrade, +// because it is too early in the sequence. Coordinator upgrades are +// performed by the ClusterUpgradeFeature, which is way later in the +// startup sequence, so it can use the full cluster functionality when run. +// after this feature has executed the upgrade, it will shut down the server. +// in the coordinator case, this feature will not shut down the server. +// instead, the shutdown is performed by the ClusterUpgradeFeature. class UpgradeFeature final : public application_features::ApplicationFeature { public: UpgradeFeature(application_features::ApplicationServer& server, int* result, std::vector const& nonServerFeatures); - void addTask(methods::Upgrade::Task&& task); void collectOptions(std::shared_ptr) override final; void validateOptions(std::shared_ptr) override final; void prepare() override final; void start() override final; + + void addTask(methods::Upgrade::Task&& task); + + private: + void upgradeLocalDatabase(); private: friend struct methods::Upgrade; // to allow access to '_tasks' @@ -45,8 +59,6 @@ class UpgradeFeature final : public application_features::ApplicationFeature { bool _upgrade; bool _upgradeCheck; - void upgradeDatabase(); - int* _result; std::vector _nonServerFeatures; std::vector _tasks; diff --git a/arangod/RestServer/arangod.cpp b/arangod/RestServer/arangod.cpp index 7abe917efb..2c82d7eb28 100644 --- a/arangod/RestServer/arangod.cpp +++ b/arangod/RestServer/arangod.cpp @@ -52,6 +52,7 @@ #include "Basics/FileUtils.h" #include "Cache/CacheManagerFeature.h" #include "Cluster/ClusterFeature.h" +#include "Cluster/ClusterUpgradeFeature.h" #include "Cluster/MaintenanceFeature.h" #include "Cluster/ReplicationTimeoutFeature.h" #include "FeaturePhases/AgencyFeaturePhase.h" @@ -118,7 +119,6 @@ #endif #include "IResearch/IResearchAnalyzerFeature.h" -#include "IResearch/IResearchAnalyzerCollectionFeature.h" #include "IResearch/IResearchFeature.h" // storage engines @@ -155,6 +155,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) { std::type_index(typeid(GreetingsFeature)), std::type_index(typeid(HttpEndpointProvider)), std::type_index(typeid(LoggerBufferFeature)), + std::type_index(typeid(pregel::PregelFeature)), std::type_index(typeid(ServerFeature)), std::type_index(typeid(SslServerFeature)), std::type_index(typeid(StatisticsFeature)), @@ -184,6 +185,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) { server.addFeature(); server.addFeature(&ret, nonServerFeatures); server.addFeature(); + server.addFeature(); server.addFeature(name); server.addFeature(); server.addFeature(); @@ -259,7 +261,6 @@ static int runServer(int argc, char** argv, ArangoGlobalContext& context) { server.addFeature(); server.addFeature(); - server.addFeature(); // storage engines server.addFeature(); diff --git a/arangod/RocksDBEngine/RocksDBThrottle.cpp b/arangod/RocksDBEngine/RocksDBThrottle.cpp index 373357f109..759db8bb06 100644 --- a/arangod/RocksDBEngine/RocksDBThrottle.cpp +++ b/arangod/RocksDBEngine/RocksDBThrottle.cpp @@ -154,9 +154,6 @@ void RocksDBThrottle::OnFlushBegin(rocksdb::DB* db, const rocksdb::FlushJobInfo& std::chrono::steady_clock::time_point osx_hack = std::chrono::steady_clock::now(); memcpy(gFlushStart, &osx_hack, sizeof(std::chrono::steady_clock::time_point)); AdjustThreadPriority(1); - - return; - } // RocksDBThrottle::OnFlushBegin void RocksDBThrottle::OnFlushCompleted(rocksdb::DB* db, @@ -241,8 +238,6 @@ void RocksDBThrottle::SetThrottleWriteRate(std::chrono::microseconds Micros, LOG_TOPIC("7afe9", DEBUG, arangodb::Logger::ENGINES) << "SetThrottleWriteRate: Micros " << Micros.count() << ", Keys " << Keys << ", Bytes " << Bytes << ", IsLevel0 " << IsLevel0; - - return; } // RocksDBThrottle::SetThrottleWriteRate void RocksDBThrottle::ThreadLoop() { @@ -508,13 +503,10 @@ void RocksDBThrottle::AdjustThreadPriority(int Adjustment) { #ifndef _WIN32 // initialize thread infor if this the first time the thread has ever called if (!gThreadPriority._baseSet) { - pid_t tid; - int ret_val; - - tid = syscall(SYS_gettid); + pid_t tid = syscall(SYS_gettid); if (-1 != (int)tid) { errno = 0; - ret_val = getpriority(PRIO_PROCESS, tid); + int ret_val = getpriority(PRIO_PROCESS, tid); // ret_val could be -1 legally, so double test if (-1 != ret_val || 0 == errno) { gThreadPriority._baseSet = true; diff --git a/arangod/Statistics/StatisticsFeature.cpp b/arangod/Statistics/StatisticsFeature.cpp index ac30c1f431..3f5057d4cb 100644 --- a/arangod/Statistics/StatisticsFeature.cpp +++ b/arangod/Statistics/StatisticsFeature.cpp @@ -31,6 +31,7 @@ #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" #include "RestServer/SystemDatabaseFeature.h" +#include "RestServer/DatabaseFeature.h" #include "Statistics/ConnectionStatistics.h" #include "Statistics/Descriptions.h" #include "Statistics/RequestStatistics.h" @@ -97,6 +98,12 @@ class StatisticsThread final : public Thread { public: void run() override { + auto& databaseFeature = server().getFeature(); + if (databaseFeature.upgrade()) { + // don't start the thread when we are running an upgrade + return; + } + uint64_t const MAX_SLEEP_TIME = 250; uint64_t sleepTime = 100; diff --git a/arangod/StorageEngine/EngineSelectorFeature.cpp b/arangod/StorageEngine/EngineSelectorFeature.cpp index 9c7442eaf5..c4352995e5 100644 --- a/arangod/StorageEngine/EngineSelectorFeature.cpp +++ b/arangod/StorageEngine/EngineSelectorFeature.cpp @@ -82,11 +82,12 @@ void EngineSelectorFeature::prepare() { auto& databasePathFeature = server().getFeature(); auto path = databasePathFeature.directory(); _engineFilePath = basics::FileUtils::buildFilename(path, "ENGINE"); - LOG_TOPIC("98b5c", DEBUG, Logger::STARTUP) - << "looking for previously selected engine in file '" << _engineFilePath << "'"; - // file if engine in file does not match command-line option - if (basics::FileUtils::isRegularFile(_engineFilePath)) { + // fail if engine value in file does not match command-line option + if (!ServerState::instance()->isCoordinator() && + basics::FileUtils::isRegularFile(_engineFilePath)) { + LOG_TOPIC("98b5c", DEBUG, Logger::STARTUP) + << "looking for previously selected engine in file '" << _engineFilePath << "'"; try { std::string content = basics::StringUtils::trim(basics::FileUtils::slurp(_engineFilePath)); @@ -170,7 +171,8 @@ void EngineSelectorFeature::start() { TRI_ASSERT(ENGINE != nullptr); // write engine File - if (!basics::FileUtils::isRegularFile(_engineFilePath)) { + if (!ServerState::instance()->isCoordinator() && + !basics::FileUtils::isRegularFile(_engineFilePath)) { try { basics::FileUtils::spit(_engineFilePath, _engine, true); } catch (std::exception const& ex) { diff --git a/arangod/VocBase/Methods/Tasks.cpp b/arangod/VocBase/Methods/Tasks.cpp index 700602776a..f0d872a775 100644 --- a/arangod/VocBase/Methods/Tasks.cpp +++ b/arangod/VocBase/Methods/Tasks.cpp @@ -193,7 +193,13 @@ void Task::shutdownTasks() { if (++iterations % 10 == 0) { LOG_TOPIC("3966b", INFO, Logger::FIXME) << "waiting for " << size << " task(s) to complete"; + } else if (iterations >= 25) { + LOG_TOPIC("54653", INFO, Logger::FIXME) << "giving up waiting for unfinished tasks"; + MUTEX_LOCKER(guard, _tasksLock); + _tasks.clear(); + break; } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } diff --git a/arangod/VocBase/Methods/Tasks.h b/arangod/VocBase/Methods/Tasks.h index 44d1f0ff17..7b82d56dee 100644 --- a/arangod/VocBase/Methods/Tasks.h +++ b/arangod/VocBase/Methods/Tasks.h @@ -59,6 +59,7 @@ class Task : public std::enable_shared_from_this { private: static Mutex _tasksLock; + // id => [ user, task ] static std::unordered_map>> _tasks; public: diff --git a/arangod/VocBase/Methods/Upgrade.cpp b/arangod/VocBase/Methods/Upgrade.cpp index 50a2f79d89..bfa07b96a1 100644 --- a/arangod/VocBase/Methods/Upgrade.cpp +++ b/arangod/VocBase/Methods/Upgrade.cpp @@ -99,14 +99,23 @@ UpgradeResult Upgrade::createDB(TRI_vocbase_t& vocbase, } UpgradeResult Upgrade::startup(TRI_vocbase_t& vocbase, bool isUpgrade, bool ignoreFileErrors) { - uint32_t clusterFlag = Flags::CLUSTER_LOCAL; - + if (ServerState::instance()->isCoordinator()) { + // coordinators do not have any persistent data, so there is no VERSION file + // available. We don't know the previous version we are upgrading from, so we + // need to pretend no upgrade is necessary + return UpgradeResult(TRI_ERROR_NO_ERROR, methods::VersionResult::VERSION_MATCH); + } + + uint32_t clusterFlag = 0; + if (ServerState::instance()->isSingleServer()) { clusterFlag = Flags::CLUSTER_NONE; + } else { + clusterFlag = Flags::CLUSTER_LOCAL; } uint32_t dbflag = Flags::DATABASE_EXISTING; - auto vinfo = Version::check(&vocbase); + VersionResult vinfo = Version::check(&vocbase); if (vinfo.status == methods::VersionResult::CANNOT_PARSE_VERSION_FILE || vinfo.status == methods::VersionResult::CANNOT_READ_VERSION_FILE) { @@ -202,11 +211,23 @@ UpgradeResult Upgrade::startup(TRI_vocbase_t& vocbase, bool isUpgrade, bool igno return runTasks(vocbase, vinfo, params, clusterFlag, dbflag); } +UpgradeResult methods::Upgrade::startupCoordinator(TRI_vocbase_t& vocbase) { + TRI_ASSERT(ServerState::instance()->isCoordinator()); + + // this will return a hard-coded version result + VersionResult vinfo = Version::check(&vocbase); + + VPackSlice const params = VPackSlice::emptyObjectSlice(); + return runTasks(vocbase, vinfo, params, Flags::CLUSTER_COORDINATOR_GLOBAL, Flags::DATABASE_UPGRADE); +} + /// @brief register tasks, only run once on startup void methods::Upgrade::registerTasks(arangodb::UpgradeFeature& upgradeFeature) { auto& _tasks = upgradeFeature._tasks; TRI_ASSERT(_tasks.empty()); + // note: all tasks here should be idempotent, so that they produce the same + // result when run again addTask(upgradeFeature, "createSystemCollectionsAndIndices", "creates all system collections including their indices", /*system*/ Flags::DATABASE_ALL, @@ -252,10 +273,10 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult& arangodb::velocypack::Slice const& params, uint32_t clusterFlag, uint32_t dbFlag) { auto& upgradeFeature = vocbase.server().getFeature(); - auto& _tasks = upgradeFeature._tasks; + auto& tasks = upgradeFeature._tasks; TRI_ASSERT(clusterFlag != 0 && dbFlag != 0); - TRI_ASSERT(!_tasks.empty()); // forgot to call registerTask!! + TRI_ASSERT(!tasks.empty()); // forgot to call registerTask!! // needs to run in superuser scope, otherwise we get errors ExecContextSuperuserScope scope; // only local should actually write a VERSION file @@ -264,7 +285,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult& bool ranOnce = false; // execute all tasks - for (Task const& t : _tasks) { + for (Task const& t : tasks) { // check for system database if (t.systemFlag == DATABASE_SYSTEM && !vocbase.isSystem()) { LOG_TOPIC("bb1ef", DEBUG, Logger::STARTUP) @@ -273,7 +294,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult& } if (t.systemFlag == DATABASE_EXCEPT_SYSTEM && vocbase.isSystem()) { LOG_TOPIC("fd4e0", DEBUG, Logger::STARTUP) - << "Upgrade: DB system, Skipping " << t.name; + << "Upgrade: DB system, skipping " << t.name; continue; } @@ -288,7 +309,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult& if (it != vinfo.tasks.end()) { if (it->second) { LOG_TOPIC("ffe7f", DEBUG, Logger::STARTUP) - << "Upgrade: Already executed, skipping " << t.name; + << "Upgrade: already executed, skipping " << t.name; continue; } vinfo.tasks.erase(it); // in case we encounter false @@ -306,24 +327,19 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult& continue; } - LOG_TOPIC("15144", DEBUG, Logger::STARTUP) << "Upgrade: Executing " << t.name; + LOG_TOPIC("15144", DEBUG, Logger::STARTUP) << "Upgrade: executing " << t.name; try { bool ranTask = t.action(vocbase, params); if (!ranTask) { std::string msg = - "Executing " + t.name + " (" + t.description + ") failed."; - LOG_TOPIC("0a886", ERR, Logger::STARTUP) << msg << " Aborting procedure."; + "executing " + t.name + " (" + t.description + ") failed."; + LOG_TOPIC("0a886", ERR, Logger::STARTUP) << msg << " aborting upgrade procedure."; return UpgradeResult(TRI_ERROR_INTERNAL, msg, vinfo.status); } - } catch (arangodb::basics::Exception const& e) { - LOG_TOPIC("65ac5", ERR, Logger::STARTUP) - << "Executing " << t.name << " (" << t.description - << ") failed with error: " << e.what() << ". Aborting procedure."; - return UpgradeResult(e.code(), e.what(), vinfo.status); } catch (std::exception const& e) { LOG_TOPIC("022fe", ERR, Logger::STARTUP) - << "Executing " << t.name << " (" << t.description - << ") failed with error: " << e.what() << ". Aborting procedure."; + << "executing " << t.name << " (" << t.description + << ") failed with error: " << e.what() << ". aborting upgrade procedure."; return UpgradeResult(TRI_ERROR_FAILED, e.what(), vinfo.status); } @@ -344,7 +360,7 @@ UpgradeResult methods::Upgrade::runTasks(TRI_vocbase_t& vocbase, VersionResult& if (isLocal) { // no need to write this for cluster bootstrap // save even if no tasks were executed LOG_TOPIC("e5a77", DEBUG, Logger::STARTUP) - << "Upgrade: Writing VERSION file"; + << "Upgrade: writing VERSION file"; auto res = methods::Version::write(&vocbase, vinfo.tasks, /*sync*/ ranOnce); if (res.fail()) { diff --git a/arangod/VocBase/Methods/Upgrade.h b/arangod/VocBase/Methods/Upgrade.h index 8dd4fdb109..8595e4738a 100644 --- a/arangod/VocBase/Methods/Upgrade.h +++ b/arangod/VocBase/Methods/Upgrade.h @@ -96,10 +96,15 @@ struct Upgrade { static UpgradeResult createDB(TRI_vocbase_t& vocbase, arangodb::velocypack::Slice const& users); - /// @brief executed on startup + /// @brief executed on startup for non-coordinators /// @param upgrade Perform an actual upgrade /// Corresponds to upgrade-database.js static UpgradeResult startup(TRI_vocbase_t& vocbase, bool upgrade, bool ignoreFileErrors); + + /// @brief executed on startup for coordinators + /// @param upgrade Perform an actual upgrade + /// Corresponds to upgrade-database.js + static UpgradeResult startupCoordinator(TRI_vocbase_t& vocbase); private: /// @brief register tasks, only run once on startup diff --git a/arangod/VocBase/Methods/UpgradeTasks.cpp b/arangod/VocBase/Methods/UpgradeTasks.cpp index 26f4219740..f50146babd 100644 --- a/arangod/VocBase/Methods/UpgradeTasks.cpp +++ b/arangod/VocBase/Methods/UpgradeTasks.cpp @@ -322,17 +322,17 @@ Result createSystemStatisticsCollections(TRI_vocbase_t& vocbase, return {TRI_ERROR_NO_ERROR}; } -static Result createIndex(std::string const name, Index::IndexType type, +static Result createIndex(std::string const& name, Index::IndexType type, std::vector const& fields, bool unique, bool sparse, std::vector>& collections) { // Static helper function that wraps creating an index. If we fail to // create an index with some indices created, we clean up by removing all // collections later on. Find the collection by name - auto colIt = find_if(collections.begin(), collections.end(), - [name](std::shared_ptr col) { - TRI_ASSERT(col != nullptr); - return col->name() == name; - }); + auto colIt = std::find_if(collections.begin(), collections.end(), + [&name](std::shared_ptr const& col) { + TRI_ASSERT(col != nullptr); + return col->name() == name; + }); if (colIt == collections.end()) { return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, "Collection " + name + " not found"); @@ -363,7 +363,7 @@ Result createSystemStatisticsIndices(TRI_vocbase_t& vocbase, return res; } } - return {TRI_ERROR_NO_ERROR}; + return res; } Result createSystemCollectionsIndices(TRI_vocbase_t& vocbase, @@ -403,19 +403,17 @@ Result createSystemCollectionsIndices(TRI_vocbase_t& vocbase, return res; } - return {TRI_ERROR_NO_ERROR}; + return res; } } // namespace bool UpgradeTasks::createSystemCollectionsAndIndices(TRI_vocbase_t& vocbase, arangodb::velocypack::Slice const& slice) { - Result res; - - // This vector should after the call to ::createSystemCollections contain + // after the call to ::createSystemCollections this vector should contain // a LogicalCollection for *every* (required) system collection. std::vector> presentSystemCollections; - res = ::createSystemCollections(vocbase, presentSystemCollections); + Result res = ::createSystemCollections(vocbase, presentSystemCollections); // TODO: Maybe check or assert that all collections are present (i.e. were // present or created), raise an error if not? diff --git a/arangod/VocBase/Methods/Version.cpp b/arangod/VocBase/Methods/Version.cpp index 69b7502e4d..bc1e227f5b 100644 --- a/arangod/VocBase/Methods/Version.cpp +++ b/arangod/VocBase/Methods/Version.cpp @@ -25,6 +25,7 @@ #include "Basics/FileUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/files.h" +#include "Cluster/ServerState.h" #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" @@ -90,6 +91,16 @@ VersionResult::StatusCode Version::compare(uint64_t current, uint64_t other) { } VersionResult Version::check(TRI_vocbase_t* vocbase) { + uint64_t lastVersion = UINT64_MAX; + uint64_t serverVersion = Version::current(); + std::map tasks; + + if (ServerState::instance()->isCoordinator()) { + // in a coordinator, we don't have any persistent data, so there is no VERSION + // file available. In this case we don't know the previous version we are + // upgrading from, so we can't do anything sensible here. + return VersionResult{VersionResult::VERSION_MATCH, serverVersion, serverVersion, tasks}; + } StorageEngine* engine = EngineSelectorFeature::ENGINE; TRI_ASSERT(engine != nullptr); @@ -106,10 +117,6 @@ VersionResult Version::check(TRI_vocbase_t* vocbase) { return VersionResult{VersionResult::CANNOT_READ_VERSION_FILE, 0, 0, {}}; } - uint64_t lastVersion = UINT64_MAX; - uint64_t serverVersion = Version::current(); - std::map tasks; - try { std::shared_ptr parsed = velocypack::Parser::fromJson(versionInfo); VPackSlice versionVals = parsed->slice(); diff --git a/lib/ApplicationFeatures/ApplicationServer.cpp b/lib/ApplicationFeatures/ApplicationServer.cpp index 193a9c5c82..fd6c765a35 100644 --- a/lib/ApplicationFeatures/ApplicationServer.cpp +++ b/lib/ApplicationFeatures/ApplicationServer.cpp @@ -78,7 +78,7 @@ ApplicationServer& ApplicationServer::server() { } ApplicationServer::ApplicationServer(std::shared_ptr options, - const char* binaryPath) + char const* binaryPath) : _state(State::UNINITIALIZED), _options(options), _binaryPath(binaryPath) { diff --git a/lib/Basics/StringUtils.cpp b/lib/Basics/StringUtils.cpp index 0506c7f66b..0c635a9794 100644 --- a/lib/Basics/StringUtils.cpp +++ b/lib/Basics/StringUtils.cpp @@ -488,13 +488,12 @@ std::vector split(std::string const& source, std::string const& del std::string trim(std::string const& sourceStr, std::string const& trimStr) { size_t s = sourceStr.find_first_not_of(trimStr); - size_t e = sourceStr.find_last_not_of(trimStr); if (s == std::string::npos) { return std::string(); - } else { - return std::string(sourceStr, s, e - s + 1); } + size_t e = sourceStr.find_last_not_of(trimStr); + return std::string(sourceStr, s, e - s + 1); } void trimInPlace(std::string& str, std::string const& trimStr) { @@ -517,9 +516,8 @@ std::string lTrim(std::string const& str, std::string const& trimStr) { if (s == std::string::npos) { return std::string(); - } else { - return std::string(str, s); - } + } + return std::string(str, s); } std::string rTrim(std::string const& sourceStr, std::string const& trimStr) { diff --git a/lib/Logger/LogThread.cpp b/lib/Logger/LogThread.cpp index a081d62e70..42d2e6b527 100644 --- a/lib/Logger/LogThread.cpp +++ b/lib/Logger/LogThread.cpp @@ -44,12 +44,14 @@ LogThread::~LogThread() { shutdown(); } -void LogThread::log(std::unique_ptr& message) { +bool LogThread::log(std::unique_ptr& message) { if (MESSAGES->push(message.get())) { // only release message if adding to the queue succeeded // otherwise we would leak here message.release(); + return true; } + return false; } void LogThread::flush() { diff --git a/lib/Logger/LogThread.h b/lib/Logger/LogThread.h index acc75ec59a..de1cb4d06b 100644 --- a/lib/Logger/LogThread.h +++ b/lib/Logger/LogThread.h @@ -41,7 +41,7 @@ struct LogMessage; class LogThread final : public Thread { public: - static void log(std::unique_ptr&); + static bool log(std::unique_ptr&); // flush all pending log messages static void flush(); diff --git a/lib/Logger/Logger.cpp b/lib/Logger/Logger.cpp index cbc96a3d00..e6c857c20d 100644 --- a/lib/Logger/Logger.cpp +++ b/lib/Logger/Logger.cpp @@ -317,12 +317,6 @@ void Logger::log(char const* function, char const* file, int line, } #endif - if (!_active.load(std::memory_order_relaxed)) { - LogAppenderStdStream::writeLogMessage(STDERR_FILENO, (isatty(STDERR_FILENO) == 1), - level, message.data(), message.size(), true); - return; - } - std::stringstream out; LogTimeFormats::writeTime(out, _timeFormat); out << ' '; @@ -385,25 +379,36 @@ void Logger::log(char const* function, char const* file, int line, // generate the complete message out << message; std::string ostreamContent = out.str(); + + if (!_active.load(std::memory_order_relaxed)) { + LogAppenderStdStream::writeLogMessage(STDERR_FILENO, (isatty(STDERR_FILENO) == 1), + level, ostreamContent.data(), ostreamContent.size(), true); + return; + } + size_t offset = ostreamContent.size() - message.size(); auto msg = std::make_unique(level, topicId, std::move(ostreamContent), offset); // now either queue or output the message + bool handled = false; if (_threaded) { try { - _loggingThread->log(msg); - bool const isDirectLogLevel = - (level == LogLevel::FATAL || level == LogLevel::ERR || level == LogLevel::WARN); - if (isDirectLogLevel) { - _loggingThread->flush(); + handled = _loggingThread->log(msg); + if (handled) { + bool const isDirectLogLevel = + (level == LogLevel::FATAL || level == LogLevel::ERR || level == LogLevel::WARN); + if (isDirectLogLevel) { + _loggingThread->flush(); + } } - return; } catch (...) { // fall-through to non-threaded logging } } - LogAppender::log(msg.get()); + if (!handled) { + LogAppender::log(msg.get()); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Logger/LoggerStream.cpp b/lib/Logger/LoggerStream.cpp index 65b04b85a5..beb3432791 100644 --- a/lib/Logger/LoggerStream.cpp +++ b/lib/Logger/LoggerStream.cpp @@ -48,8 +48,6 @@ LoggerStream::~LoggerStream() { // print a hex representation of the binary data LoggerStream& LoggerStream::operator<<(Logger::BINARY const& binary) { try { - std::ostringstream tmp; - uint8_t const* ptr = static_cast(binary.baseAddress); uint8_t const* end = ptr + binary.size; @@ -59,11 +57,10 @@ LoggerStream& LoggerStream::operator<<(Logger::BINARY const& binary) { uint8_t n1 = n >> 4; uint8_t n2 = n & 0x0F; - tmp << "\\x" << static_cast((n1 < 10) ? ('0' + n1) : ('A' + n1 - 10)) - << static_cast((n2 < 10) ? ('0' + n2) : ('A' + n2 - 10)); + _out << "\\x" << static_cast((n1 < 10) ? ('0' + n1) : ('A' + n1 - 10)) + << static_cast((n2 < 10) ? ('0' + n2) : ('A' + n2 - 10)); ++ptr; } - _out << tmp.str(); } catch (...) { // ignore any errors here. logging should not have side effects } @@ -84,12 +81,10 @@ LoggerStream& LoggerStream::operator<<(Logger::CHARS const& data) { LoggerStream& LoggerStream::operator<<(Logger::RANGE const& range) { try { - std::ostringstream tmp; - tmp << range.baseAddress << " - " - << static_cast(static_cast(range.baseAddress) + - range.size) - << " (" << range.size << " bytes)"; - _out << tmp.str(); + _out << range.baseAddress << " - " + << static_cast(static_cast(range.baseAddress) + + range.size) + << " (" << range.size << " bytes)"; } catch (...) { // ignore any errors here. logging should not have side effects } diff --git a/lib/Rest/Version.cpp b/lib/Rest/Version.cpp index 4a401eb475..a3f328a390 100644 --- a/lib/Rest/Version.cpp +++ b/lib/Rest/Version.cpp @@ -107,12 +107,17 @@ void Version::initialize() { #else Values["debug"] = "false"; #endif +#ifdef ARANGODB_USE_IPO + Values["ipo"] = "true"; +#else + Values["ipo"] = "false"; +#endif #ifdef NDEBUG Values["ndebug"] = "true"; #else Values["ndebug"] = "false"; #endif -#if defined(ARCHITECTURE_OPTIMIZATIONS) +#ifdef ARCHITECTURE_OPTIMIZATIONS Values["optimization-flags"] = std::string(ARCHITECTURE_OPTIMIZATIONS); #endif Values["endianness"] = getEndianness();