From 0d96ba4f4b46084a68c7a9be140f21963f1f6b7d Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Wed, 26 Apr 2017 12:06:43 +0200 Subject: [PATCH] merge from 3.1 --- arangod/Cluster/ClusterFeature.cpp | 54 ++++++++++++++++-------------- arangod/Cluster/ClusterInfo.cpp | 50 +++++++++++++-------------- 2 files changed, 53 insertions(+), 51 deletions(-) diff --git a/arangod/Cluster/ClusterFeature.cpp b/arangod/Cluster/ClusterFeature.cpp index 33773b7207..d793800572 100644 --- a/arangod/Cluster/ClusterFeature.cpp +++ b/arangod/Cluster/ClusterFeature.cpp @@ -95,7 +95,7 @@ void ClusterFeature::collectOptions(std::shared_ptr options) { "agency endpoint to connect to", new VectorParameter(&_agencyEndpoints)); - options->addOption("--cluster.agency-prefix", "agency prefix", + options->addHiddenOption("--cluster.agency-prefix", "agency prefix", new StringParameter(&_agencyPrefix)); options->addOption("--cluster.my-local-info", "this server's local info", @@ -134,7 +134,11 @@ void ClusterFeature::collectOptions(std::shared_ptr options) { "replication factor for system collections", new UInt32Parameter(&_systemReplicationFactor)); - options->addOption("--cluster.create-waits-for-sync-replication", +options->addOption("--cluster.create-waits-for-sync-replication", + "active coordinator will wait for all replicas to create collection", + new BooleanParameter(&_createWaitsForSyncReplication)); + +options->addOption("--cluster.create-waits-for-sync-replication", "active coordinator will wait for all replicas to create collection", new BooleanParameter(&_createWaitsForSyncReplication)); } @@ -144,7 +148,7 @@ void ClusterFeature::validateOptions(std::shared_ptr options) { if (options->processingResult().touched("cluster.disable-dispatcher-kickstarter") || options->processingResult().touched("cluster.disable-dispatcher-frontend")) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) - << "The dispatcher feature isn't available anymore. Use ArangoDBStarter for this now! See https://github.com/arangodb-helper/ArangoDBStarter/ for more details."; + << "The dispatcher feature isn't available anymore. Use ArangoDBStarter for this now! See https://github.com/arangodb-helper/ArangoDBStarter/ for more details."; FATAL_ERROR_EXIT(); } @@ -158,7 +162,7 @@ void ClusterFeature::validateOptions(std::shared_ptr options) { // validate --cluster.agency-endpoint (currently a noop) if (_agencyEndpoints.empty()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) + LOG_TOPIC(FATAL, Logger::CLUSTER) << "must at least specify one endpoint in --cluster.agency-endpoint"; FATAL_ERROR_EXIT(); } @@ -173,13 +177,13 @@ void ClusterFeature::validateOptions(std::shared_ptr options) { "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789/"); if (found != std::string::npos || _agencyPrefix.empty()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value specified for --cluster.agency-prefix"; + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "invalid value specified for --cluster.agency-prefix"; FATAL_ERROR_EXIT(); } // validate system-replication-factor if (_systemReplicationFactor == 0) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "system replication factor must be greater 0"; + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "system replication factor must be greater 0"; FATAL_ERROR_EXIT(); } @@ -190,7 +194,7 @@ void ClusterFeature::validateOptions(std::shared_ptr options) { _requestedRole == ServerState::ROLE_AGENT || _requestedRole == ServerState::ROLE_UNDEFINED ) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Invalid role provided. Possible values: PRIMARY, " + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "Invalid role provided. Possible values: PRIMARY, " "SECONDARY, COORDINATOR"; FATAL_ERROR_EXIT(); } @@ -202,7 +206,7 @@ void ClusterFeature::reportRole(arangodb::ServerState::RoleEnum role) { if (role == ServerState::ROLE_UNDEFINED) { roleString += ". Determining real role from agency"; } - LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Starting up with role " << roleString; + LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Starting up with role " << roleString; } void ClusterFeature::prepare() { @@ -247,7 +251,7 @@ void ClusterFeature::prepare() { "Authentication"); if (authenticationFeature->isEnabled() && !authenticationFeature->hasUserdefinedJwt()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Cluster authentication enabled but jwt not set via command line. Please" + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "Cluster authentication enabled but jwt not set via command line. Please" << " provide --server.jwt-secret which is used throughout the cluster."; FATAL_ERROR_EXIT(); } @@ -275,7 +279,7 @@ void ClusterFeature::prepare() { std::string const unified = Endpoint::unifiedForm(_agencyEndpoints[i]); if (unified.empty()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid endpoint '" << _agencyEndpoints[i] + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _agencyEndpoints[i] << "' specified for --cluster.agency-endpoint"; FATAL_ERROR_EXIT(); } @@ -293,7 +297,7 @@ void ClusterFeature::prepare() { ClusterComm::instance()->enableConnectionErrorLogging(false); // perform an initial connect to the agency if (!AgencyCommManager::MANAGER->start()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "Could not connect to any agency endpoints (" + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "Could not connect to any agency endpoints (" << AgencyCommManager::MANAGER->endpointsString() << ")"; FATAL_ERROR_EXIT(); } @@ -310,13 +314,13 @@ void ClusterFeature::prepare() { if (role == ServerState::ROLE_UNDEFINED) { // no role found - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to determine unambiguous role for server '" << _myId + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to determine unambiguous role for server '" << _myId << "'. No role configured in agency (" << endpoints << ")"; FATAL_ERROR_EXIT(); } if (role == ServerState::ROLE_SINGLE) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "determined single-server role for server '" << _myId + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "determined single-server role for server '" << _myId << "'. Please check the configurarion in the agency (" << endpoints << ")"; FATAL_ERROR_EXIT(); @@ -343,12 +347,12 @@ void ClusterFeature::prepare() { double start = TRI_microtime(); while (true) { - LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Waiting for DBservers to show up..."; + LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Waiting for DBservers to show up..."; ci->loadCurrentDBServers(); std::vector DBServers = ci->getCurrentDBServers(); if (DBServers.size() >= 1 && (DBServers.size() > 1 || TRI_microtime() - start > 15.0)) { - LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Found " << DBServers.size() << " DBservers."; + LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Found " << DBServers.size() << " DBservers."; break; } sleep(1); @@ -357,7 +361,7 @@ void ClusterFeature::prepare() { } if (_myAddress.empty()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to determine internal address for server '" << _myId + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to determine internal address for server '" << _myId << "'. Please specify --cluster.my-address or configure the " "address for this server in the agency."; FATAL_ERROR_EXIT(); @@ -365,7 +369,7 @@ void ClusterFeature::prepare() { // now we can validate --cluster.my-address if (Endpoint::unifiedForm(_myAddress).empty()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid endpoint '" << _myAddress + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "invalid endpoint '" << _myAddress << "' specified for --cluster.my-address"; FATAL_ERROR_EXIT(); } @@ -396,7 +400,7 @@ void ClusterFeature::start() { ServerState::RoleEnum role = ServerState::instance()->getRole(); - LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "Cluster feature is turned on. Agency version: " << version + LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Cluster feature is turned on. Agency version: " << version << ", Agency endpoints: " << endpoints << ", server id: '" << _myId << "', internal address: " << _myAddress << ", role: " << ServerState::roleToString(role); @@ -412,7 +416,7 @@ void ClusterFeature::start() { if (HeartbeatIntervalMs.isInteger()) { try { _heartbeatInterval = HeartbeatIntervalMs.getUInt(); - LOG_TOPIC(INFO, arangodb::Logger::FIXME) << "using heartbeat interval value '" << _heartbeatInterval + LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "using heartbeat interval value '" << _heartbeatInterval << " ms' from agency"; } catch (...) { // Ignore if it is not a small int or uint @@ -424,7 +428,7 @@ void ClusterFeature::start() { if (_heartbeatInterval == 0) { _heartbeatInterval = 5000; // 1/s - LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "unable to read heartbeat interval from agency. Using " + LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "unable to read heartbeat interval from agency. Using " << "default value '" << _heartbeatInterval << " ms'"; } @@ -434,7 +438,7 @@ void ClusterFeature::start() { SchedulerFeature::SCHEDULER->ioService()); if (!_heartbeatThread->init() || !_heartbeatThread->start()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "heartbeat could not connect to agency endpoints (" + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "heartbeat could not connect to agency endpoints (" << endpoints << ")"; FATAL_ERROR_EXIT(); } @@ -453,7 +457,7 @@ void ClusterFeature::start() { VPackObjectBuilder b(&builder); builder.add("endpoint", VPackValue(_myAddress)); } catch (...) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "out of memory"; + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "out of memory"; FATAL_ERROR_EXIT(); } @@ -461,7 +465,7 @@ void ClusterFeature::start() { builder.slice(), 0.0); if (!result.successful()) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to register server in agency: http code: " + LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to register server in agency: http code: " << result.httpCode() << ", body: " << result.body(); FATAL_ERROR_EXIT(); } else { @@ -494,7 +498,7 @@ void ClusterFeature::stop() { usleep(100000); // emit warning after 5 seconds if (++counter == 10 * 5) { - LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "waiting for heartbeat thread to finish"; + LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish"; } } } @@ -521,7 +525,7 @@ void ClusterFeature::unprepare() { usleep(100000); // emit warning after 5 seconds if (++counter == 10 * 5) { - LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "waiting for heartbeat thread to finish"; + LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish"; } } } diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index a170b323e0..5f847e9c86 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1101,10 +1101,19 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, [=](VPackSlice const& result) { if (result.isObject() && result.length() == (size_t)numberOfShards) { std::string tmpMsg = ""; + bool tmpHaveError = false; for (auto const& p : VPackObjectIterator(result)) { + if (replicationFactor == 0) { + VPackSlice servers = p.value.get("servers"); + if (!servers.isArray() || servers.length() < dbServers.size()) { + return true; + } + } + if (arangodb::basics::VelocyPackHelper::getBooleanValue( p.value, "error", false)) { + tmpHaveError = true; tmpMsg += " shardID:" + p.key.copyString() + ":"; tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue( p.value, "errorMessage", ""); @@ -1117,25 +1126,14 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, tmpMsg += ")"; } } - *errMsg = "Error in creation of collection:" + tmpMsg + " " - + __FILE__ + std::to_string(__LINE__); - *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; - return true; - } - - // wait that all followers have created our new collection - if (waitForReplication) { - uint64_t mutableReplicationFactor = replicationFactor; - if (mutableReplicationFactor == 0) { - mutableReplicationFactor = dbServers.size(); - } - - VPackSlice servers = p.value.get("servers"); - if (!servers.isArray() || servers.length() < mutableReplicationFactor) { - return true; - } } } + if (tmpHaveError) { + *errMsg = "Error in creation of collection:" + tmpMsg + " " + + __FILE__ + std::to_string(__LINE__); + *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; + return true; + } *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); } return true; @@ -1315,17 +1313,17 @@ int ClusterInfo::dropCollectionCoordinator( clones.push_back(p->name()); } } - if (!clones.empty()){ - errorMsg += "Collection must not be dropped while it is sharding " - "prototype for collection[s]"; - for (auto const& i : clones) { + + if (!clones.empty()){ + errorMsg += "Collection must not be dropped while it is sharding " + "prototype for collection[s]"; + for (auto const& i : clones) { errorMsg += std::string(" ") + i; - } - errorMsg += "."; - return TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE; - } - + errorMsg += "."; + return TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE; + } + double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval();