//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "ClusterFeature.h" #include "Basics/FileUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/files.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterInfo.h" #include "Cluster/HeartbeatThread.h" #include "Endpoint/Endpoint.h" #include "GeneralServer/AuthenticationFeature.h" #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" #include "RestServer/DatabaseFeature.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" using namespace arangodb; using namespace arangodb::application_features; using namespace arangodb::basics; using namespace arangodb::options; ClusterFeature::ClusterFeature(application_features::ApplicationServer& server) : ApplicationFeature(server, "Cluster"), _unregisterOnShutdown(false), _enableCluster(false), _requirePersistedId(false), _heartbeatThread(nullptr), _heartbeatInterval(0), _agencyCallbackRegistry(nullptr), _requestedRole(ServerState::RoleEnum::ROLE_UNDEFINED) { setOptional(true); startsAfter("DatabasePhase"); startsAfter("CommunicationPhase"); } ClusterFeature::~ClusterFeature() { if (_enableCluster) { AgencyCommManager::shutdown(); } } void ClusterFeature::collectOptions(std::shared_ptr options) { options->addSection("cluster", "Configure the cluster"); options->addObsoleteOption("--cluster.username", "username used for cluster-internal communication", true); options->addObsoleteOption("--cluster.password", "password used for cluster-internal communication", true); options->addObsoleteOption("--cluster.disable-dispatcher-kickstarter", "The dispatcher feature isn't available anymore; " "Use ArangoDBStarter for this now!", true); options->addObsoleteOption("--cluster.disable-dispatcher-frontend", "The dispatcher feature isn't available anymore; " "Use ArangoDBStarter for this now!", true); options->addObsoleteOption( "--cluster.dbserver-config", "The dbserver-config is not available anymore, Use ArangoDBStarter", true); options->addObsoleteOption( "--cluster.coordinator-config", "The coordinator-config is not available anymore, Use ArangoDBStarter", true); options->addObsoleteOption("--cluster.data-path", "path to cluster database directory", true); options->addObsoleteOption("--cluster.log-path", "path to log directory for the cluster", true); options->addObsoleteOption("--cluster.arangod-path", "path to the arangod for the cluster", true); options->addOption( "--cluster.require-persisted-id", "if set to true, then the instance will only start if a UUID file is " "found in the database on startup. Setting this option will make sure " "the instance is started using an already existing database directory " "and not a new one. For the first start, the UUID file must either be " "created manually or the option must be set to false for the initial " "startup", new BooleanParameter(&_requirePersistedId)); options->addOption("--cluster.agency-endpoint", "agency endpoint to connect to", new VectorParameter(&_agencyEndpoints)); options->addOption("--cluster.agency-prefix", "agency prefix", new StringParameter(&_agencyPrefix), arangodb::options::makeFlags(arangodb::options::Flags::Hidden)); options->addObsoleteOption("--cluster.my-local-info", "this server's local info", false); options->addObsoleteOption("--cluster.my-id", "this server's id", false); options->addOption("--cluster.my-role", "this server's role", new StringParameter(&_myRole)); options->addOption("--cluster.my-address", "this server's endpoint (cluster internal)", new StringParameter(&_myEndpoint)); options->addOption("--cluster.my-advertised-endpoint", "this server's advertised endpoint (e.g. external IP " "address or load balancer, optional)", new StringParameter(&_myAdvertisedEndpoint)); options->addOption("--cluster.default-replication-factor", "default replication factor for new collections", new UInt32Parameter(&_defaultReplicationFactor)).setIntroducedIn(30501); options->addOption("--cluster.system-replication-factor", "replication factor for system collections", new UInt32Parameter(&_systemReplicationFactor)); options->addOption("--cluster.min-replication-factor", "minimum replication factor for new collections", new UInt32Parameter(&_minReplicationFactor)).setIntroducedIn(30501); options->addOption("--cluster.max-replication-factor", "maximum replication factor for new collections (0 = unrestricted)", new UInt32Parameter(&_maxReplicationFactor)).setIntroducedIn(30501); options->addOption( "--cluster.create-waits-for-sync-replication", "active coordinator will wait for all replicas to create collection", new BooleanParameter(&_createWaitsForSyncReplication), arangodb::options::makeFlags(arangodb::options::Flags::Hidden)); options->addOption("--cluster.max-number-of-shards", "maximum number of shards when creating new collections (0 = unrestricted)", new UInt32Parameter(&_maxNumberOfShards)).setIntroducedIn(30501); options->addOption( "--cluster.index-create-timeout", "amount of time (in seconds) the coordinator will wait for an index to " "be created before giving up", new DoubleParameter(&_indexCreationTimeout), arangodb::options::makeFlags(arangodb::options::Flags::Hidden)); } void ClusterFeature::validateOptions(std::shared_ptr options) { if (options->processingResult().touched( "cluster.disable-dispatcher-kickstarter") || options->processingResult().touched( "cluster.disable-dispatcher-frontend")) { LOG_TOPIC("33707", FATAL, arangodb::Logger::CLUSTER) << "The dispatcher feature isn't available anymore. Use " << "ArangoDBStarter for this now! See " << "https://github.com/arangodb-helper/ArangoDBStarter/ for more " << "details."; FATAL_ERROR_EXIT(); } if (_minReplicationFactor == 0) { // min replication factor must not be 0 LOG_TOPIC("2fbdd", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.min-replication-factor`. The value must be at least 1"; FATAL_ERROR_EXIT(); } if (_maxReplicationFactor > 10) { // 10 is a hard-coded limit for the replication factor LOG_TOPIC("886c6", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.max-replication-factor`. The value must not exceed 10"; FATAL_ERROR_EXIT(); } TRI_ASSERT(_minReplicationFactor > 0); if (!options->processingResult().touched("cluster.default-replication-factor")) { // no default replication factor set. now use the minimum value, which is // guaranteed to be at least 1 _defaultReplicationFactor = _minReplicationFactor; } if (!options->processingResult().touched("cluster.system-replication-factor")) { // no system replication factor set. now make sure it is between min and max if (_systemReplicationFactor > _maxReplicationFactor) { _systemReplicationFactor = _maxReplicationFactor; } else if (_systemReplicationFactor < _minReplicationFactor) { _systemReplicationFactor = _minReplicationFactor; } } if (_defaultReplicationFactor == 0) { // default replication factor must not be 0 LOG_TOPIC("fc8a9", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.default-replication-factor`. The value must be at least 1"; FATAL_ERROR_EXIT(); } if (_systemReplicationFactor == 0) { // default replication factor must not be 0 LOG_TOPIC("46935", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.system-replication-factor`. The value must be at least 1"; FATAL_ERROR_EXIT(); } if (_defaultReplicationFactor > 0 && _maxReplicationFactor > 0 && _defaultReplicationFactor > _maxReplicationFactor) { LOG_TOPIC("5af7e", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.default-replication-factor`. Must not be higher than `--cluster.max-replication-factor`"; FATAL_ERROR_EXIT(); } if (_defaultReplicationFactor > 0 && _defaultReplicationFactor < _minReplicationFactor) { LOG_TOPIC("b9aea", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.default-replication-factor`. Must not be lower than `--cluster.min-replication-factor`"; FATAL_ERROR_EXIT(); } if (_systemReplicationFactor > 0 && _maxReplicationFactor > 0 && _systemReplicationFactor > _maxReplicationFactor) { LOG_TOPIC("6cf0c", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.system-replication-factor`. Must not be higher than `--cluster.max-replication-factor`"; FATAL_ERROR_EXIT(); } if (_systemReplicationFactor > 0 && _systemReplicationFactor < _minReplicationFactor) { LOG_TOPIC("dfc38", FATAL, arangodb::Logger::CLUSTER) << "Invalid value for `--cluster.system-replication-factor`. Must not be lower than `--cluster.min-replication-factor`"; FATAL_ERROR_EXIT(); } // check if the cluster is enabled _enableCluster = !_agencyEndpoints.empty(); if (!_enableCluster) { _requestedRole = ServerState::ROLE_SINGLE; ServerState::instance()->setRole(ServerState::ROLE_SINGLE); ServerState::instance()->findHost("localhost"); return; } // validate --cluster.agency-endpoint if (_agencyEndpoints.empty()) { LOG_TOPIC("d283a", FATAL, Logger::CLUSTER) << "must at least specify one endpoint in --cluster.agency-endpoint"; FATAL_ERROR_EXIT(); } // validate --cluster.my-address if (_myEndpoint.empty()) { LOG_TOPIC("c1532", FATAL, arangodb::Logger::CLUSTER) << "unable to determine internal address for server '" << ServerState::instance()->getId() << "'. Please specify --cluster.my-address or configure the " "address for this server in the agency."; FATAL_ERROR_EXIT(); } // now we can validate --cluster.my-address if (Endpoint::unifiedForm(_myEndpoint).empty()) { LOG_TOPIC("41256", FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _myEndpoint << "' specified for --cluster.my-address"; FATAL_ERROR_EXIT(); } if (!_myAdvertisedEndpoint.empty() && Endpoint::unifiedForm(_myAdvertisedEndpoint).empty()) { LOG_TOPIC("ece6a", FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _myAdvertisedEndpoint << "' specified for --cluster.my-advertised-endpoint"; FATAL_ERROR_EXIT(); } // validate if (_agencyPrefix.empty()) { _agencyPrefix = "arango"; } // validate --cluster.agency-prefix size_t found = _agencyPrefix.find_first_not_of( "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789/"); if (found != std::string::npos || _agencyPrefix.empty()) { LOG_TOPIC("7259b", FATAL, arangodb::Logger::CLUSTER) << "invalid value specified for --cluster.agency-prefix"; FATAL_ERROR_EXIT(); } // validate system-replication-factor if (_systemReplicationFactor == 0) { LOG_TOPIC("cb945", FATAL, arangodb::Logger::CLUSTER) << "system replication factor must be greater 0"; FATAL_ERROR_EXIT(); } std::string fallback = _myEndpoint; // Now extract the hostname/IP: auto pos = fallback.find("://"); if (pos != std::string::npos) { fallback = fallback.substr(pos + 3); } pos = fallback.rfind(':'); if (pos != std::string::npos) { fallback = fallback.substr(0, pos); } auto ss = ServerState::instance(); ss->findHost(fallback); if (!_myRole.empty()) { _requestedRole = ServerState::stringToRole(_myRole); std::vector const disallowedRoles = { /*ServerState::ROLE_SINGLE,*/ ServerState::ROLE_AGENT, ServerState::ROLE_UNDEFINED}; if (std::find(disallowedRoles.begin(), disallowedRoles.end(), _requestedRole) != disallowedRoles.end()) { LOG_TOPIC("198c3", FATAL, arangodb::Logger::CLUSTER) << "Invalid role provided for `--cluster.my-role`. Possible values: " "DBSERVER, PRIMARY, COORDINATOR"; FATAL_ERROR_EXIT(); } ServerState::instance()->setRole(_requestedRole); } } void ClusterFeature::reportRole(arangodb::ServerState::RoleEnum role) { std::string roleString(ServerState::roleToString(role)); if (role == ServerState::ROLE_UNDEFINED) { roleString += ". Determining real role from agency"; } LOG_TOPIC("3bb7d", INFO, arangodb::Logger::CLUSTER) << "Starting up with role " << roleString; } void ClusterFeature::prepare() { if (_enableCluster && _requirePersistedId && !ServerState::instance()->hasPersistedId()) { LOG_TOPIC("d2194", FATAL, arangodb::Logger::CLUSTER) << "required persisted UUID file '" << ServerState::instance()->getUuidFilename() << "' not found. Please make sure this instance is started using an " "already existing database directory"; FATAL_ERROR_EXIT(); } // create callback registery _agencyCallbackRegistry.reset(new AgencyCallbackRegistry(agencyCallbacksPath())); // Initialize ClusterInfo library: ClusterInfo::createInstance(_agencyCallbackRegistry.get()); // create an instance (this will not yet create a thread) ClusterComm::instance(); if (ServerState::instance()->isAgent() || _enableCluster) { AuthenticationFeature* af = AuthenticationFeature::instance(); // nullptr happens only during shutdown if (af->isActive() && !af->hasUserdefinedJwt()) { LOG_TOPIC("6e615", FATAL, arangodb::Logger::CLUSTER) << "Cluster authentication enabled but JWT not set via command line. " "Please" << " provide --server.jwt-secret which is used throughout the " "cluster."; FATAL_ERROR_EXIT(); } } // return if cluster is disabled if (!_enableCluster) { reportRole(ServerState::instance()->getRole()); return; } else { reportRole(_requestedRole); } // register the prefix with the communicator AgencyCommManager::initialize(_agencyPrefix); TRI_ASSERT(AgencyCommManager::MANAGER != nullptr); for (size_t i = 0; i < _agencyEndpoints.size(); ++i) { std::string const unified = Endpoint::unifiedForm(_agencyEndpoints[i]); if (unified.empty()) { LOG_TOPIC("1b759", FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _agencyEndpoints[i] << "' specified for --cluster.agency-endpoint"; FATAL_ERROR_EXIT(); } AgencyCommManager::MANAGER->addEndpoint(unified); } // disable error logging for a while ClusterComm::instance()->enableConnectionErrorLogging(false); // perform an initial connect to the agency if (!AgencyCommManager::MANAGER->start()) { LOG_TOPIC("54560", FATAL, arangodb::Logger::CLUSTER) << "Could not connect to any agency endpoints (" << AgencyCommManager::MANAGER->endpointsString() << ")"; FATAL_ERROR_EXIT(); } if (!ServerState::instance()->integrateIntoCluster(_requestedRole, _myEndpoint, _myAdvertisedEndpoint)) { LOG_TOPIC("fea1e", FATAL, Logger::STARTUP) << "Couldn't integrate into cluster."; FATAL_ERROR_EXIT(); } auto role = ServerState::instance()->getRole(); auto endpoints = AgencyCommManager::MANAGER->endpointsString(); if (role == ServerState::ROLE_UNDEFINED) { // no role found LOG_TOPIC("613f4", FATAL, arangodb::Logger::CLUSTER) << "unable to determine unambiguous role for server '" << ServerState::instance()->getId() << "'. No role configured in agency (" << endpoints << ")"; FATAL_ERROR_EXIT(); } // if nonempty, it has already been set above // If we are a coordinator, we wait until at least one DBServer is there, // otherwise we can do very little, in particular, we cannot create // any collection: if (role == ServerState::ROLE_COORDINATOR) { auto ci = ClusterInfo::instance(); double start = TRI_microtime(); #ifdef ARANGODB_ENABLE_MAINTAINER_MODE // in maintainer mode, a developer does not want to spend that much time // waiting for extra nodes to start up constexpr double waitTime = 5.0; #else constexpr double waitTime = 15.0; #endif while (true) { LOG_TOPIC("d4db4", INFO, arangodb::Logger::CLUSTER) << "Waiting for DBservers to show up..."; ci->loadCurrentDBServers(); std::vector DBServers = ci->getCurrentDBServers(); if (DBServers.size() >= 1 && (DBServers.size() > 1 || TRI_microtime() - start > waitTime)) { LOG_TOPIC("22f55", INFO, arangodb::Logger::CLUSTER) << "Found " << DBServers.size() << " DBservers."; break; } std::this_thread::sleep_for(std::chrono::seconds(1)); } } } void ClusterFeature::start() { if (ServerState::instance()->isAgent() || _enableCluster) { ClusterComm::initialize(); } // return if cluster is disabled if (!_enableCluster) { startHeartbeatThread(nullptr, 5000, 5, std::string()); return; } ServerState::instance()->setState(ServerState::STATE_STARTUP); // the agency about our state AgencyComm comm; comm.sendServerState(0.0); std::string const version = comm.version(); ServerState::instance()->setInitialized(); std::string const endpoints = AgencyCommManager::MANAGER->endpointsString(); ServerState::RoleEnum role = ServerState::instance()->getRole(); std::string myId = ServerState::instance()->getId(); LOG_TOPIC("b6826", INFO, arangodb::Logger::CLUSTER) << "Cluster feature is turned on. Agency version: " << version << ", Agency endpoints: " << endpoints << ", server id: '" << myId << "', internal endpoint / address: " << _myEndpoint << "', advertised endpoint: " << _myAdvertisedEndpoint << ", role: " << role; AgencyCommResult result = comm.getValues("Sync/HeartbeatIntervalMs"); if (result.successful()) { velocypack::Slice HeartbeatIntervalMs = result.slice()[0].get(std::vector( {AgencyCommManager::path(), "Sync", "HeartbeatIntervalMs"})); if (HeartbeatIntervalMs.isInteger()) { try { _heartbeatInterval = HeartbeatIntervalMs.getUInt(); LOG_TOPIC("805b2", INFO, arangodb::Logger::CLUSTER) << "using heartbeat interval value '" << _heartbeatInterval << " ms' from agency"; } catch (...) { // Ignore if it is not a small int or uint } } } // no value set in agency. use default if (_heartbeatInterval == 0) { _heartbeatInterval = 5000; // 1/s LOG_TOPIC("3d871", WARN, arangodb::Logger::CLUSTER) << "unable to read heartbeat interval from agency. Using " << "default value '" << _heartbeatInterval << " ms'"; } startHeartbeatThread(_agencyCallbackRegistry.get(), _heartbeatInterval, 5, endpoints); comm.increment("Current/Version"); ServerState::instance()->setState(ServerState::STATE_SERVING); } void ClusterFeature::beginShutdown() { ClusterComm::instance()->disable(); } void ClusterFeature::stop() { if (_heartbeatThread != nullptr) { _heartbeatThread->beginShutdown(); } if (_heartbeatThread != nullptr) { int counter = 0; while (_heartbeatThread->isRunning()) { std::this_thread::sleep_for(std::chrono::microseconds(100000)); // emit warning after 5 seconds if (++counter == 10 * 5) { LOG_TOPIC("acaa9", WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish"; } } } ClusterComm::instance()->stopBackgroundThreads(); } void ClusterFeature::unprepare() { if (!_enableCluster) { ClusterComm::cleanup(); return; } if (_heartbeatThread != nullptr) { _heartbeatThread->beginShutdown(); } // change into shutdown state ServerState::instance()->setState(ServerState::STATE_SHUTDOWN); AgencyComm comm; comm.sendServerState(0.0); if (_heartbeatThread != nullptr) { int counter = 0; while (_heartbeatThread->isRunning()) { std::this_thread::sleep_for(std::chrono::microseconds(100000)); // emit warning after 5 seconds if (++counter == 10 * 5) { LOG_TOPIC("26835", WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish"; } } } if (_unregisterOnShutdown) { ServerState::instance()->unregister(); } comm.sendServerState(0.0); // Try only once to unregister because maybe the agencycomm // is shutting down as well... // Remove from role list ServerState::RoleEnum role = ServerState::instance()->getRole(); std::string alk = ServerState::roleToAgencyListKey(role); std::string me = ServerState::instance()->getId(); AgencyWriteTransaction unreg; unreg.operations.push_back(AgencyOperation("Current/" + alk + "/" + me, AgencySimpleOperationType::DELETE_OP)); // Unregister unreg.operations.push_back(AgencyOperation("Current/ServersRegistered/" + me, AgencySimpleOperationType::DELETE_OP)); unreg.operations.push_back( AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP)); comm.sendTransactionWithFailover(unreg, 120.0); while (_heartbeatThread->isRunning()) { std::this_thread::sleep_for(std::chrono::microseconds(50000)); } AgencyCommManager::MANAGER->stop(); ClusterInfo::cleanup(); } void ClusterFeature::setUnregisterOnShutdown(bool unregisterOnShutdown) { _unregisterOnShutdown = unregisterOnShutdown; } /// @brief common routine to start heartbeat with or without cluster active void ClusterFeature::startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry, uint64_t interval_ms, uint64_t maxFailsBeforeWarning, const std::string& endpoints) { _heartbeatThread = std::make_shared(agencyCallbackRegistry, std::chrono::microseconds(interval_ms * 1000), maxFailsBeforeWarning); if (!_heartbeatThread->init() || !_heartbeatThread->start()) { // failure only occures in cluster mode. LOG_TOPIC("7e050", FATAL, arangodb::Logger::CLUSTER) << "heartbeat could not connect to agency endpoints (" << endpoints << ")"; FATAL_ERROR_EXIT(); } while (!_heartbeatThread->isReady()) { // wait until heartbeat is ready std::this_thread::sleep_for(std::chrono::microseconds(10000)); } } void ClusterFeature::syncDBServerStatusQuo() { if (_heartbeatThread != nullptr) { _heartbeatThread->syncDBServerStatusQuo(true); } }