//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "Cluster/ClusterUpgradeFeature.h" #include "Agency/AgencyComm.h" #include "Agency/AgencyFeature.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ScopeGuard.h" #include "Cluster/ServerState.h" #include "FeaturePhases/FinalFeaturePhase.h" #include "Logger/LogMacros.h" #include "ProgramOptions/ProgramOptions.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/UpgradeFeature.h" #include "VocBase/vocbase.h" #include "VocBase/Methods/Upgrade.h" #include "VocBase/Methods/Version.h" using namespace arangodb; using namespace arangodb::options; namespace { static std::string const upgradeVersionKey = "ClusterUpgradeVersion"; static std::string const upgradeExecutedByKey = "ClusterUpgradeExecutedBy"; } ClusterUpgradeFeature::ClusterUpgradeFeature(application_features::ApplicationServer& server) : ApplicationFeature(server, "ClusterUpgrade"), _upgradeMode("auto") { startsAfter(); } void ClusterUpgradeFeature::collectOptions(std::shared_ptr options) { options->addOption("--cluster.upgrade", "perform a cluster upgrade if necessary (auto = perform upgrade and shut down only if `--database.auto-upgrade true` is set, disable = never perform upgrade, force = always perform an upgrade and shut down, online = always perform an upgrade but don't shut down)", new DiscreteValuesParameter(&_upgradeMode, std::unordered_set{"auto", "disable", "force", "online"})); } void ClusterUpgradeFeature::validateOptions(std::shared_ptr options) { auto& databaseFeature = server().getFeature(); if (_upgradeMode == "force") { // always perform an upgrade, regardless of the value of `--database.auto-upgrade`. // after the upgrade, shut down the server databaseFeature.enableUpgrade(); } else if (_upgradeMode == "disable") { // never perform an upgrade, regardless of the value of `--database.auto-upgrade`. // don't shut down the server databaseFeature.disableUpgrade(); } else if (_upgradeMode == "online") { // perform an upgrade, but stay online and don't shut down the server. // disabling the upgrade functionality in the database feature is required for this. databaseFeature.disableUpgrade(); } } void ClusterUpgradeFeature::start() { if (!ServerState::instance()->isCoordinator()) { return; } // this feature is doing something meaning only in a coordinator, and only // if the server was started with the option `--database.auto-upgrade true`. auto& databaseFeature = server().getFeature(); if (_upgradeMode == "disable" || (!databaseFeature.upgrade() && (_upgradeMode != "online" && _upgradeMode != "force"))) { return; } tryClusterUpgrade(); if (_upgradeMode != "online") { LOG_TOPIC("d6047", INFO, arangodb::Logger::STARTUP) << "server will now shut down due to upgrade."; server().beginShutdown(); } } void ClusterUpgradeFeature::setBootstrapVersion() { // it is not a fundamental problem if the setValue fails. if it fails, we can't // store the version number in the agency, so an upgrade we will run all the // (idempotent) upgrade tasks for the same version again. VPackBuilder builder; builder.add(VPackValue(arangodb::methods::Version::current())); AgencyComm agency; agency.setValue(::upgradeVersionKey, builder.slice(), 0); } void ClusterUpgradeFeature::tryClusterUpgrade() { TRI_ASSERT(ServerState::instance()->isCoordinator()); AgencyComm agency; AgencyCommResult result = agency.getValues(::upgradeVersionKey); if (!result.successful()) { LOG_TOPIC("26104", ERR, arangodb::Logger::CLUSTER) << "unable to fetch cluster upgrade version from agency: " << result.errorMessage(); return; } uint64_t latestUpgradeVersion = 0; VPackSlice value = result.slice()[0].get( std::vector({AgencyCommManager::path(), ::upgradeVersionKey})); if (value.isNumber()) { latestUpgradeVersion = value.getNumber(); LOG_TOPIC("54f69", DEBUG, arangodb::Logger::CLUSTER) << "found previous cluster upgrade version in agency: " << latestUpgradeVersion; } else { // key not there yet. LOG_TOPIC("5b00d", DEBUG, arangodb::Logger::CLUSTER) << "did not find previous cluster upgrade version in agency"; } if (arangodb::methods::Version::current() <= latestUpgradeVersion) { // nothing to do return; } std::vector precs; if (latestUpgradeVersion == 0) { precs.emplace_back(::upgradeVersionKey, AgencyPrecondition::Type::EMPTY, true); } else { precs.emplace_back(::upgradeVersionKey, AgencyPrecondition::Type::VALUE, latestUpgradeVersion); } // there must be no other coordinator that performs an upgrade at the same time precs.emplace_back(::upgradeExecutedByKey, AgencyPrecondition::Type::EMPTY, true); // try to register ourselves as responsible for the upgrade AgencyOperation operation(::upgradeExecutedByKey, AgencyValueOperationType::SET, ServerState::instance()->getId()); // make the key expire automatically in case we crash // operation._ttl = TRI_microtime() + 1800.0; AgencyWriteTransaction transaction(operation, precs); result = agency.sendTransactionWithFailover(transaction); if (result.successful()) { // we are responsible for the upgrade! LOG_TOPIC("15ac4", INFO, arangodb::Logger::CLUSTER) << "running cluster upgrade from " << (latestUpgradeVersion == 0 ? std::string("an unknown version") : std::string("version ") + std::to_string(latestUpgradeVersion)) << " to version " << arangodb::methods::Version::current() << "..."; bool success = false; try { success = upgradeCoordinator(); } catch (std::exception const& ex) { LOG_TOPIC("f2a84", ERR, Logger::CLUSTER) << "caught exception during cluster upgrade: " << ex.what(); TRI_ASSERT(!success); } // now finally remove the upgrading key and store the new version number std::vector precs; precs.emplace_back(::upgradeExecutedByKey, AgencyPrecondition::Type::VALUE, ServerState::instance()->getId()); std::vector operations; if (success) { // upgrade successful - store our current version number operations.emplace_back(::upgradeVersionKey, AgencyValueOperationType::SET, arangodb::methods::Version::current()); } // remove the key that locks out other coordinators from upgrading operations.emplace_back(::upgradeExecutedByKey, AgencySimpleOperationType::DELETE_OP); AgencyWriteTransaction transaction(operations, precs); result = agency.sendTransactionWithFailover(transaction); if (result.successful()) { LOG_TOPIC("853de", INFO, arangodb::Logger::CLUSTER) << "cluster upgrade to version " << arangodb::methods::Version::current() << " completed successfully"; } else { LOG_TOPIC("a0b4f", ERR, arangodb::Logger::CLUSTER) << "unable to store cluster upgrade information in agency: " << result.errorMessage(); } } else if (result.httpCode() != (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { LOG_TOPIC("482a3", WARN, arangodb::Logger::CLUSTER) << "unable to fetch upgrade information: " << result.errorMessage(); } else { // someone else is performing the upgrade LOG_TOPIC("ab6eb", DEBUG, arangodb::Logger::CLUSTER) << "someone else is running the cluster upgrade right now"; } } bool ClusterUpgradeFeature::upgradeCoordinator() { LOG_TOPIC("a2d65", TRACE, arangodb::Logger::FIXME) << "starting coordinator upgrade"; bool success = true; DatabaseFeature& databaseFeature = server().getFeature(); for (auto& name : databaseFeature.getDatabaseNames()) { TRI_vocbase_t* vocbase = databaseFeature.useDatabase(name); if (vocbase == nullptr) { // probably deleted in the meantime... so we can ignore it here continue; } auto guard = scopeGuard([&vocbase]() { vocbase->release(); }); auto res = methods::Upgrade::startupCoordinator(*vocbase); if (res.fail()) { LOG_TOPIC("f51b1", ERR, arangodb::Logger::FIXME) << "Database '" << vocbase->name() << "' upgrade failed (" << res.errorMessage() << "). " << "Please inspect the logs from the upgrade procedure" << " and try starting the server again."; success = false; } } LOG_TOPIC("efd49", TRACE, arangodb::Logger::FIXME) << "finished coordinator upgrade"; return success; }