From 883c11ea45428cbf1dd74b9a1f4a0e96461a8433 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Tue, 7 Feb 2017 15:31:40 +0100 Subject: [PATCH 1/3] Handle the case that ClusterComm is already shut down gracefully. This touches every single place where ClusterComm is being used. --- arangod/Agency/AgencyComm.cpp | 10 +- arangod/Agency/Agent.cpp | 14 +- arangod/Agency/AgentActivator.cpp | 14 +- arangod/Agency/Constituent.cpp | 13 +- arangod/Agency/Inception.cpp | 29 +- arangod/Agency/Store.cpp | 14 +- arangod/Aql/ClusterBlocks.cpp | 48 ++- arangod/Aql/ExecutionEngine.cpp | 188 ++++---- arangod/Aql/TraversalBlock.cpp | 35 +- arangod/Cluster/ClusterMethods.cpp | 48 +++ arangod/Cluster/HeartbeatThread.cpp | 3 +- arangod/Cluster/v8-cluster.cpp | 24 +- .../RestHandler/RestReplicationHandler.cpp | 6 + arangod/Utils/Transaction.cpp | 405 +++++++++--------- arangod/V8Server/v8-actions.cpp | 3 + arangod/V8Server/v8-replication.cpp | 6 +- arangod/V8Server/v8-vocbase.cpp | 44 +- 17 files changed, 525 insertions(+), 379 deletions(-) diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index f2a867a6c4..baf8836495 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1529,7 +1529,15 @@ AgencyCommResult AgencyComm::send( << "': " << body; arangodb::httpclient::SimpleHttpClient client(connection, timeout, false); - client.setJwt(ClusterComm::instance()->jwt()); + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens during controlled shutdown + result._message = "could not send request to agency because of shutdown"; + LOG_TOPIC(TRACE, Logger::AGENCYCOMM) << "could not send request to agency"; + + return result; + } + client.setJwt(cc->jwt()); client.keepConnectionOnDestruction(true); // set up headers diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 5520624117..65a286d927 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -313,6 +313,11 @@ bool Agent::recvAppendEntriesRPC( /// Leader's append entries void Agent::sendAppendEntriesRPC() { + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens during controlled shutdown + return; + } // _lastSent, _lastHighest and _confirmed only accessed in main thread std::string const myid = id(); @@ -387,7 +392,7 @@ void Agent::sendAppendEntriesRPC() { // Send request auto headerFields = std::make_unique>(); - arangodb::ClusterComm::instance()->asyncRequest( + cc->asyncRequest( "1", 1, _config.poolAt(followerId), arangodb::rest::RequestType::POST, path.str(), std::make_shared(builder.toJson()), headerFields, @@ -1002,6 +1007,11 @@ TimePoint const& Agent::leaderSince() const { // Notify inactive pool members of configuration change() void Agent::notifyInactive() const { + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens during controlled shutdown + return; + } std::map pool = _config.pool(); std::string path = "/_api/agency_priv/inform"; @@ -1023,7 +1033,7 @@ void Agent::notifyInactive() const { auto headerFields = std::make_unique>(); - arangodb::ClusterComm::instance()->asyncRequest( + cc->asyncRequest( "1", 1, p.second, arangodb::rest::RequestType::POST, path, std::make_shared(out.toJson()), headerFields, nullptr, 1.0, true); diff --git a/arangod/Agency/AgentActivator.cpp b/arangod/Agency/AgentActivator.cpp index 54d78ace2a..688e16bc46 100644 --- a/arangod/Agency/AgentActivator.cpp +++ b/arangod/Agency/AgentActivator.cpp @@ -66,11 +66,15 @@ void AgentActivator::run() { auto headerFields = std::make_unique>(); - arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, endpoint, rest::RequestType::POST, path, - std::make_shared(allLogs->toJson()), headerFields, - std::make_shared(_agent, _failed, _replacement), - 5.0, true, 1.0); + auto cc = arangodb::ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown + cc->asyncRequest( + "1", 1, endpoint, rest::RequestType::POST, path, + std::make_shared(allLogs->toJson()), headerFields, + std::make_shared(_agent, _failed, _replacement), + 5.0, true, 1.0); + } _cv.wait(10000000); // 10 sec diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index d2a03cc8dc..941878fef9 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -389,11 +389,17 @@ void Constituent::callElection() { << "&prevLogTerm=" << _agent->lastLog().term; // Ask everyone for their vote + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // only happens on controlled shutdown + follow(_term); + return; + } for (auto const& i : active) { if (i != _id) { auto headerFields = std::make_unique>(); - ClusterComm::instance()->asyncRequest( + cc->asyncRequest( "", coordinatorTransactionID, _agent->config().poolAt(i), rest::RequestType::GET, path.str(), std::make_shared(body), headerFields, @@ -419,8 +425,7 @@ void Constituent::callElection() { break; } - auto res = ClusterComm::instance()->wait( - "", coordinatorTransactionID, 0, "", + auto res = cc->wait("", coordinatorTransactionID, 0, "", duration(steady_clock::now()-timeout).count()); if (res.status == CL_COMM_SENT) { @@ -461,7 +466,7 @@ void Constituent::callElection() { << (yea >= majority ? "yeas" : "nays") << " have it."; // Clean up - ClusterComm::instance()->drop("", coordinatorTransactionID, 0, ""); + cc->drop("", coordinatorTransactionID, 0, ""); } diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index f355f789f5..b5dc46aae4 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -49,6 +49,11 @@ Inception::~Inception() { shutdown(); } /// - Create outgoing gossip. /// - Send to all peers void Inception::gossip() { + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens during controlled shutdown + return; + } LOG_TOPIC(INFO, Logger::AGENCY) << "Entering gossip phase ..."; using namespace std::chrono; @@ -93,7 +98,7 @@ void Inception::gossip() { std::make_unique>(); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " << out->toJson() << " to peer " << clientid; - arangodb::ClusterComm::instance()->asyncRequest( + cc->asyncRequest( clientid, 1, p, rest::RequestType::POST, path, std::make_shared(out->toJson()), hf, std::make_shared(_agent, version), 1.0, true, 0.5); @@ -116,7 +121,7 @@ void Inception::gossip() { std::make_unique>(); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " << out->toJson() << " to pool member " << clientid; - arangodb::ClusterComm::instance()->asyncRequest( + cc->asyncRequest( clientid, 1, pair.second, rest::RequestType::POST, path, std::make_shared(out->toJson()), hf, std::make_shared(_agent, version), 1.0, true, 0.5); @@ -156,6 +161,11 @@ void Inception::gossip() { bool Inception::restartingActiveAgent() { + auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens during controlled shutdown + return false; + } LOG_TOPIC(INFO, Logger::AGENCY) << "Restarting agent from persistence ..."; @@ -200,7 +210,7 @@ bool Inception::restartingActiveAgent() { std::vector informed; for (auto& p : gp) { - auto comres = arangodb::ClusterComm::instance()->syncRequest( + auto comres = cc->syncRequest( clientId, 1, p, rest::RequestType::POST, path, greetstr, std::unordered_map(), 2.0); if (comres->status == CL_COMM_SENT) { @@ -224,7 +234,7 @@ bool Inception::restartingActiveAgent() { if (p.first != myConfig.id() && p.first != "") { - auto comres = arangodb::ClusterComm::instance()->syncRequest( + auto comres = cc->syncRequest( clientId, 1, p.second, rest::RequestType::POST, path, greetstr, std::unordered_map(), 2.0); @@ -249,7 +259,7 @@ bool Inception::restartingActiveAgent() { // Contact leader to update endpoint if (theirLeaderId != theirId) { - comres = arangodb::ClusterComm::instance()->syncRequest( + comres = cc->syncRequest( clientId, 1, theirLeaderEp, rest::RequestType::POST, path, greetstr, std::unordered_map(), 2.0); // Failed to contact leader move on until we do. This way at @@ -365,6 +375,11 @@ void Inception::reportVersionForEp(std::string const& endpoint, size_t version) bool Inception::estimateRAFTInterval() { + auto cc = arangodb::ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens during controlled shutdown + return false; + } using namespace std::chrono; LOG_TOPIC(INFO, Logger::AGENCY) << "Estimating RAFT timeouts ..."; @@ -382,7 +397,7 @@ bool Inception::estimateRAFTInterval() { std::string clientid = peer.first + std::to_string(i); auto hf = std::make_unique>(); - arangodb::ClusterComm::instance()->asyncRequest( + cc->asyncRequest( clientid, 1, peer.second, rest::RequestType::GET, path, std::make_shared(), hf, std::make_shared(this, peer.second, timeStamp()), @@ -448,7 +463,7 @@ bool Inception::estimateRAFTInterval() { for (auto const& peer : config.pool()) { if (peer.first != myid) { auto clientId = "1"; - auto comres = arangodb::ClusterComm::instance()->syncRequest( + auto comres = cc->syncRequest( clientId, 1, peer.second, rest::RequestType::POST, path, measjson, std::unordered_map(), 5.0); } diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index fcd606999e..def645c030 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -352,11 +352,15 @@ std::vector Store::apply( auto headerFields = std::make_unique>(); - arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, endpoint, rest::RequestType::POST, path, - std::make_shared(body.toString()), headerFields, - std::make_shared(path, body.toJson()), 1.0, true, 0.01); - + auto cc = ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown + cc->asyncRequest( + "1", 1, endpoint, rest::RequestType::POST, path, + std::make_shared(body.toString()), headerFields, + std::make_shared(path, body.toJson()), 1.0, true, + 0.01); + } } else { LOG_TOPIC(WARN, Logger::AGENCY) << "Malformed URL " << url; } diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index f34672e158..4034e87958 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -1233,30 +1233,34 @@ std::unique_ptr RemoteBlock::sendRequest( std::string const& body) const { DEBUG_BEGIN_BLOCK(); auto cc = ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown - // Later, we probably want to set these sensibly: - ClientTransactionID const clientTransactionId = "AQL"; - CoordTransactionID const coordTransactionId = TRI_NewTickServer(); - std::unordered_map headers; - if (!_ownName.empty()) { - headers.emplace("Shard-Id", _ownName); - } - - ++_engine->_stats.httpRequests; - { - JobGuard guard(SchedulerFeature::SCHEDULER); - guard.block(); - - auto result = - cc->syncRequest(clientTransactionId, coordTransactionId, _server, type, - std::string("/_db/") + - arangodb::basics::StringUtils::urlEncode( - _engine->getQuery()->trx()->vocbase()->name()) + - urlPart + _queryId, - body, headers, defaultTimeOut); - - return result; + // Later, we probably want to set these sensibly: + ClientTransactionID const clientTransactionId = "AQL"; + CoordTransactionID const coordTransactionId = TRI_NewTickServer(); + std::unordered_map headers; + if (!_ownName.empty()) { + headers.emplace("Shard-Id", _ownName); + } + + ++_engine->_stats.httpRequests; + { + JobGuard guard(SchedulerFeature::SCHEDULER); + guard.block(); + + auto result = + cc->syncRequest(clientTransactionId, coordTransactionId, _server, type, + std::string("/_db/") + + arangodb::basics::StringUtils::urlEncode( + _engine->getQuery()->trx()->vocbase()->name()) + + urlPart + _queryId, + body, headers, defaultTimeOut); + + return result; + } } + return std::make_unique(); // cppcheck-suppress style DEBUG_END_BLOCK(); diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index d171e22fd9..8e2b232315 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -582,16 +582,19 @@ struct CoordinatorInstanciator : public WalkerWorker { // << "\n"; auto cc = arangodb::ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown - std::string const url("/_db/" - + arangodb::basics::StringUtils::urlEncode(collection->vocbase->name()) + - "/_api/aql/instantiate"); + std::string const url("/_db/" + + arangodb::basics::StringUtils::urlEncode(collection->vocbase->name()) + + "/_api/aql/instantiate"); - auto headers = std::make_unique>(); - (*headers)["X-Arango-Nolock"] = shardId; // Prevent locking - cc->asyncRequest("", coordTransactionID, "shard:" + shardId, - arangodb::rest::RequestType::POST, - url, body, headers, nullptr, 30.0); + auto headers = std::make_unique>(); + (*headers)["X-Arango-Nolock"] = shardId; // Prevent locking + cc->asyncRequest("", coordTransactionID, "shard:" + shardId, + arangodb::rest::RequestType::POST, + url, body, headers, nullptr, 30.0); + } } /// @brief aggregateQueryIds, get answers for all shards in a Scatter/Gather @@ -670,28 +673,29 @@ struct CoordinatorInstanciator : public WalkerWorker { // now send the plan to the remote servers arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer(); auto cc = arangodb::ClusterComm::instance(); - TRI_ASSERT(cc != nullptr); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown + // iterate over all shards of the collection + size_t nr = 0; + auto shardIds = collection->shardIds(); + for (auto const& shardId : *shardIds) { + // inject the current shard id into the collection + VPackBuilder b; + collection->setCurrentShard(shardId); + generatePlanForOneShard(b, nr++, info, connectedId, shardId, true); - // iterate over all shards of the collection - size_t nr = 0; - auto shardIds = collection->shardIds(); - for (auto const& shardId : *shardIds) { - // inject the current shard id into the collection - VPackBuilder b; - collection->setCurrentShard(shardId); - generatePlanForOneShard(b, nr++, info, connectedId, shardId, true); + distributePlanToShard(coordTransactionID, info, + connectedId, shardId, + b.slice()); + } + collection->resetCurrentShard(); + for (auto const& auxiliaryCollection: auxiliaryCollections) { + TRI_ASSERT(auxiliaryCollection->shardIds()->size() == 1); + auxiliaryCollection->resetCurrentShard(); + } - distributePlanToShard(coordTransactionID, info, - connectedId, shardId, - b.slice()); + aggregateQueryIds(info, cc, coordTransactionID, collection); } - collection->resetCurrentShard(); - for (auto const& auxiliaryCollection: auxiliaryCollections) { - TRI_ASSERT(auxiliaryCollection->shardIds()->size() == 1); - auxiliaryCollection->resetCurrentShard(); - } - - aggregateQueryIds(info, cc, coordTransactionID, collection); } /// @brief buildEngineCoordinator, for a single piece @@ -931,6 +935,10 @@ struct CoordinatorInstanciator : public WalkerWorker { query->vocbase()->name()) + "/_internal/traverser"); auto cc = arangodb::ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens on controlled shutdown + return; + } bool hasVars = false; VPackBuilder varInfo; std::vector vars; @@ -1229,6 +1237,11 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan( // Lock shard on DBserver: arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer(); auto cc = arangodb::ClusterComm::instance(); + if (cc == nullptr) { + // nullptr only happens on controlled shutdown + THROW_ARANGO_EXCEPTION( TRI_ERROR_SHUTTING_DOWN); + } + TRI_vocbase_t* vocbase = query->vocbase(); std::unique_ptr res; std::unordered_map headers; @@ -1263,70 +1276,73 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan( // the DBservers via HTTP: TRI_vocbase_t* vocbase = query->vocbase(); auto cc = arangodb::ClusterComm::instance(); - for (auto& q : inst.get()->queryIds) { - std::string theId = q.first; - std::string queryId = q.second; - auto pos = theId.find(':'); - if (pos != std::string::npos) { - // So this is a remote one on a DBserver: - std::string shardId = theId.substr(pos + 1); - // Remove query from DBserver: - arangodb::CoordTransactionID coordTransactionID = - TRI_NewTickServer(); - if (queryId.back() == '*') { - queryId.pop_back(); + if (cc != nullptr) { + // nullptr only happens during controlled shutdown + for (auto& q : inst.get()->queryIds) { + std::string theId = q.first; + std::string queryId = q.second; + auto pos = theId.find(':'); + if (pos != std::string::npos) { + // So this is a remote one on a DBserver: + std::string shardId = theId.substr(pos + 1); + // Remove query from DBserver: + arangodb::CoordTransactionID coordTransactionID = + TRI_NewTickServer(); + if (queryId.back() == '*') { + queryId.pop_back(); + } + std::string const url( + "/_db/" + + arangodb::basics::StringUtils::urlEncode(vocbase->name()) + + "/_api/aql/shutdown/" + queryId); + std::unordered_map headers; + auto res = + cc->syncRequest("", coordTransactionID, "shard:" + shardId, + arangodb::rest::RequestType::PUT, + url, "{\"code\": 0}", headers, 120.0); + // Ignore result, we need to try to remove all. + // However, log the incident if we have an errorMessage. + if (!res->errorMessage.empty()) { + std::string msg("while trying to unregister query "); + msg += queryId + ": " + res->stringifyErrorMessage(); + LOG(WARN) << msg; + } + } else { + // Remove query from registry: + try { + queryRegistry->destroy( + vocbase, arangodb::basics::StringUtils::uint64(queryId), + TRI_ERROR_INTERNAL); + } catch (...) { + // Ignore problems + } } + } + // Also we need to destroy all traverser engines that have been pushed to DBServers + { + std::string const url( "/_db/" + arangodb::basics::StringUtils::urlEncode(vocbase->name()) + - "/_api/aql/shutdown/" + queryId); - std::unordered_map headers; - auto res = - cc->syncRequest("", coordTransactionID, "shard:" + shardId, - arangodb::rest::RequestType::PUT, - url, "{\"code\": 0}", headers, 120.0); - // Ignore result, we need to try to remove all. - // However, log the incident if we have an errorMessage. - if (!res->errorMessage.empty()) { - std::string msg("while trying to unregister query "); - msg += queryId + ": " + res->stringifyErrorMessage(); - LOG(WARN) << msg; - } - } else { - // Remove query from registry: - try { - queryRegistry->destroy( - vocbase, arangodb::basics::StringUtils::uint64(queryId), - TRI_ERROR_INTERNAL); - } catch (...) { - // Ignore problems - } - } - } - // Also we need to destroy all traverser engines that have been pushed to DBServers - { + "/_internal/traverser/"); + for (auto& te : inst.get()->traverserEngines) { + std::string traverserId = arangodb::basics::StringUtils::itoa(te.first); + arangodb::CoordTransactionID coordTransactionID = + TRI_NewTickServer(); + std::unordered_map headers; + // NOTE: te.second is the list of shards. So we just send delete + // to the first of those shards + auto res = cc->syncRequest( + "", coordTransactionID, "shard:" + *(te.second.begin()), + RequestType::DELETE_REQ, url + traverserId, "", headers, 30.0); - std::string const url( - "/_db/" + - arangodb::basics::StringUtils::urlEncode(vocbase->name()) + - "/_internal/traverser/"); - for (auto& te : inst.get()->traverserEngines) { - std::string traverserId = arangodb::basics::StringUtils::itoa(te.first); - arangodb::CoordTransactionID coordTransactionID = - TRI_NewTickServer(); - std::unordered_map headers; - // NOTE: te.second is the list of shards. So we just send delete - // to the first of those shards - auto res = cc->syncRequest( - "", coordTransactionID, "shard:" + *(te.second.begin()), - RequestType::DELETE_REQ, url + traverserId, "", headers, 30.0); - - // Ignore result, we need to try to remove all. - // However, log the incident if we have an errorMessage. - if (!res->errorMessage.empty()) { - std::string msg("while trying to unregister traverser engine "); - msg += traverserId + ": " + res->stringifyErrorMessage(); - LOG(WARN) << msg; + // Ignore result, we need to try to remove all. + // However, log the incident if we have an errorMessage. + if (!res->errorMessage.empty()) { + std::string msg("while trying to unregister traverser engine "); + msg += traverserId + ": " + res->stringifyErrorMessage(); + LOG(WARN) << msg; + } } } } diff --git a/arangod/Aql/TraversalBlock.cpp b/arangod/Aql/TraversalBlock.cpp index 613e33cbc9..96213ba2ec 100644 --- a/arangod/Aql/TraversalBlock.cpp +++ b/arangod/Aql/TraversalBlock.cpp @@ -192,23 +192,26 @@ int TraversalBlock::shutdown(int errorCode) { // We have to clean up the engines in Coordinator Case. if (arangodb::ServerState::instance()->isCoordinator()) { auto cc = arangodb::ClusterComm::instance(); - std::string const url( - "/_db/" + arangodb::basics::StringUtils::urlEncode(_trx->vocbase()->name()) + - "/_internal/traverser/"); - for (auto const& it : *_engines) { - arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer(); - std::unordered_map headers; - auto res = cc->syncRequest( - "", coordTransactionID, "server:" + it.first, RequestType::DELETE_REQ, - url + arangodb::basics::StringUtils::itoa(it.second), "", headers, - 30.0); - if (res->status != CL_COMM_SENT) { - // Note If there was an error on server side we do not have CL_COMM_SENT - std::string message("Could not destroy all traversal engines"); - if (!res->errorMessage.empty()) { - message += std::string(": ") + res->errorMessage; + if (cc != nullptr) { + // nullptr only happens on controlled server shutdown + std::string const url( + "/_db/" + arangodb::basics::StringUtils::urlEncode(_trx->vocbase()->name()) + + "/_internal/traverser/"); + for (auto const& it : *_engines) { + arangodb::CoordTransactionID coordTransactionID = TRI_NewTickServer(); + std::unordered_map headers; + auto res = cc->syncRequest( + "", coordTransactionID, "server:" + it.first, RequestType::DELETE_REQ, + url + arangodb::basics::StringUtils::itoa(it.second), "", headers, + 30.0); + if (res->status != CL_COMM_SENT) { + // Note If there was an error on server side we do not have CL_COMM_SENT + std::string message("Could not destroy all traversal engines"); + if (!res->errorMessage.empty()) { + message += std::string(": ") + res->errorMessage; + } + LOG(ERR) << message; } - LOG(ERR) << message; } } } diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index cf1c364aa9..4fac2f0e6c 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -562,6 +562,10 @@ int revisionOnCoordinator(std::string const& dbname, // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo; @@ -636,6 +640,10 @@ int figuresOnCoordinator(std::string const& dbname, std::string const& collname, // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo; @@ -701,6 +709,10 @@ int countOnCoordinator(std::string const& dbname, std::string const& collname, // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } result.clear(); @@ -771,6 +783,10 @@ int createDocumentOnCoordinator( // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo; @@ -906,6 +922,10 @@ int deleteDocumentOnCoordinator( // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo; @@ -1135,6 +1155,10 @@ int truncateCollectionOnCoordinator(std::string const& dbname, // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo; @@ -1191,6 +1215,10 @@ int getDocumentOnCoordinator( // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo; @@ -1462,6 +1490,10 @@ int fetchEdgesFromEngines( size_t& filtered, size_t& read) { auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // TODO map id => ServerID if possible // And go fast-path @@ -1546,6 +1578,10 @@ void fetchVerticesFromEngines( result, VPackBuilder& builder) { auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return; + } // TODO map id => ServerID if possible // And go fast-path @@ -1636,6 +1672,10 @@ int getFilteredEdgesOnCoordinator( // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo = @@ -1755,6 +1795,10 @@ int modifyDocumentOnCoordinator( // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } // First determine the collection ID from the name: std::shared_ptr collinfo = @@ -2005,6 +2049,10 @@ int modifyDocumentOnCoordinator( int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector) { ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + return TRI_ERROR_SHUTTING_DOWN; + } std::vector DBservers = ci->getCurrentDBServers(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::string url = std::string("/_admin/wal/flush?waitForSync=") + diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index df3947c55a..3f0ee447f5 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -745,7 +745,8 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) { ClusterInfo::instance()->flush(); // turn on error logging now - if (!ClusterComm::instance()->enableConnectionErrorLogging(true)) { + auto cc = ClusterComm::instance(); + if (cc != nullptr && cc->enableConnectionErrorLogging(true)) { LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "created coordinator databases for the first time"; } diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index b9adef19f2..0b418ca770 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -1811,8 +1811,8 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo const& args) { auto cc = ClusterComm::instance(); if (cc == nullptr) { - TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "clustercomm object not found"); + TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN, + "clustercomm object not found (JS_AsyncRequest)"); } arangodb::rest::RequestType reqType; @@ -1878,7 +1878,7 @@ static void JS_SyncRequest(v8::FunctionCallbackInfo const& args) { auto cc = ClusterComm::instance(); if (cc == nullptr) { - TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN, "clustercomm object not found"); } @@ -1931,7 +1931,7 @@ static void JS_Enquire(v8::FunctionCallbackInfo const& args) { if (cc == nullptr) { TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "clustercomm object not found"); + "clustercomm object not found (JS_SyncRequest)"); } OperationID operationID = TRI_ObjectToUInt64(args[0], true); @@ -1967,8 +1967,8 @@ static void JS_Wait(v8::FunctionCallbackInfo const& args) { auto cc = ClusterComm::instance(); if (cc == nullptr) { - TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "clustercomm object not found"); + TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN, + "clustercomm object not found (JS_Wait)"); } ClientTransactionID myclientTransactionID = ""; @@ -2038,7 +2038,7 @@ static void JS_Drop(v8::FunctionCallbackInfo const& args) { if (cc == nullptr) { TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "clustercomm object not found"); + "clustercomm object not found (JS_Drop)"); } ClientTransactionID myclientTransactionID = ""; @@ -2116,9 +2116,13 @@ static void JS_ClusterDownload(v8::FunctionCallbackInfo const& args) } options->Set(TRI_V8_ASCII_STRING("headers"), headers); - std::string const authorization = "bearer " + ClusterComm::instance()->jwt(); - v8::Handle v8Authorization = TRI_V8_STD_STRING(authorization); - headers->Set(TRI_V8_ASCII_STRING("Authorization"), v8Authorization); + auto cc = ClusterComm::instance(); + if (cc != nullptr) { + // nullptr happens only during controlled shutdown + std::string authorization = "bearer " + ClusterComm::instance()->jwt(); + v8::Handle v8Authorization = TRI_V8_STD_STRING(authorization); + headers->Set(TRI_V8_ASCII_STRING("Authorization"), v8Authorization); + } args[2] = options; } TRI_V8_TRY_CATCH_END diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index e285350e25..ef510f996b 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -775,6 +775,12 @@ void RestReplicationHandler::handleTrampolineCoordinator() { // Set a few variables needed for our work: auto cc = ClusterComm::instance(); + if (cc == nullptr) { + // nullptr happens only during controlled shutdown + generateError(rest::ResponseCode::BAD, TRI_ERROR_SHUTTING_DOWN, + "shutting down server"); + return; + } std::unique_ptr res; if (!useVpp) { diff --git a/arangod/Utils/Transaction.cpp b/arangod/Utils/Transaction.cpp index ab486f84b8..67ed0b95db 100644 --- a/arangod/Utils/Transaction.cpp +++ b/arangod/Utils/Transaction.cpp @@ -1917,30 +1917,33 @@ OperationResult Transaction::insertLocal(std::string const& collectionName, path, body); } auto cc = arangodb::ClusterComm::instance(); - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, chooseTimeout(count), - nrDone, Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::CREATED); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; - } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "insertLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + if (cc != nullptr) { + // nullptr only happens on controlled shutdown + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::CREATED); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "insertLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } @@ -2175,82 +2178,84 @@ OperationResult Transaction::modifyLocal( // Now replicate the good operations on all followers: auto cc = arangodb::ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown + std::string path + = "/_db/" + + arangodb::basics::StringUtils::urlEncode(_vocbase->name()) + + "/_api/document/" + + arangodb::basics::StringUtils::urlEncode(collection->name()) + + "?isRestore=true"; - std::string path - = "/_db/" + - arangodb::basics::StringUtils::urlEncode(_vocbase->name()) + - "/_api/document/" + - arangodb::basics::StringUtils::urlEncode(collection->name()) - + "?isRestore=true"; + VPackBuilder payload; - VPackBuilder payload; + auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) { + VPackObjectBuilder guard(&payload); + VPackSlice s = result.get(StaticStrings::KeyString); + payload.add(StaticStrings::KeyString, s); + s = result.get(StaticStrings::RevString); + payload.add(StaticStrings::RevString, s); + TRI_SanitizeObject(doc, payload); + }; - auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) { - VPackObjectBuilder guard(&payload); - VPackSlice s = result.get(StaticStrings::KeyString); - payload.add(StaticStrings::KeyString, s); - s = result.get(StaticStrings::RevString); - payload.add(StaticStrings::RevString, s); - TRI_SanitizeObject(doc, payload); - }; - - VPackSlice ourResult = resultBuilder.slice(); - size_t count = 0; - if (multiCase) { - VPackArrayBuilder guard(&payload); - VPackArrayIterator itValue(newValue); - VPackArrayIterator itResult(ourResult); - while (itValue.valid() && itResult.valid()) { - TRI_ASSERT((*itResult).isObject()); - if (!(*itResult).hasKey("error")) { - doOneDoc(itValue.value(), itResult.value()); - count++; - } - itValue.next(); - itResult.next(); - } - } else { - VPackArrayBuilder guard(&payload); - doOneDoc(newValue, ourResult); - count++; - } - if (count > 0) { - auto body = std::make_shared(); - *body = payload.slice().toJson(); - - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? - arangodb::rest::RequestType::PUT : - arangodb::rest::RequestType::PATCH, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, chooseTimeout(count), - nrDone, Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; + VPackSlice ourResult = resultBuilder.slice(); + size_t count = 0; + if (multiCase) { + VPackArrayBuilder guard(&payload); + VPackArrayIterator itValue(newValue); + VPackArrayIterator itResult(ourResult); + while (itValue.valid() && itResult.valid()) { + TRI_ASSERT((*itResult).isObject()); + if (!(*itResult).hasKey("error")) { + doOneDoc(itValue.value(), itResult.value()); + count++; } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "modifyLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + itValue.next(); + itResult.next(); + } + } else { + VPackArrayBuilder guard(&payload); + doOneDoc(newValue, ourResult); + count++; + } + if (count > 0) { + auto body = std::make_shared(); + *body = payload.slice().toJson(); + + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? + arangodb::rest::RequestType::PUT : + arangodb::rest::RequestType::PATCH, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "modifyLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } @@ -2417,80 +2422,83 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, // Now replicate the good operations on all followers: auto cc = arangodb::ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controled shutdown - std::string path - = "/_db/" + - arangodb::basics::StringUtils::urlEncode(_vocbase->name()) + - "/_api/document/" + - arangodb::basics::StringUtils::urlEncode(collection->name()) - + "?isRestore=true"; + std::string path + = "/_db/" + + arangodb::basics::StringUtils::urlEncode(_vocbase->name()) + + "/_api/document/" + + arangodb::basics::StringUtils::urlEncode(collection->name()) + + "?isRestore=true"; - VPackBuilder payload; + VPackBuilder payload; - auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) { - VPackObjectBuilder guard(&payload); - VPackSlice s = result.get(StaticStrings::KeyString); - payload.add(StaticStrings::KeyString, s); - s = result.get(StaticStrings::RevString); - payload.add(StaticStrings::RevString, s); - TRI_SanitizeObject(doc, payload); - }; + auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) { + VPackObjectBuilder guard(&payload); + VPackSlice s = result.get(StaticStrings::KeyString); + payload.add(StaticStrings::KeyString, s); + s = result.get(StaticStrings::RevString); + payload.add(StaticStrings::RevString, s); + TRI_SanitizeObject(doc, payload); + }; - VPackSlice ourResult = resultBuilder.slice(); - size_t count = 0; - if (value.isArray()) { - VPackArrayBuilder guard(&payload); - VPackArrayIterator itValue(value); - VPackArrayIterator itResult(ourResult); - while (itValue.valid() && itResult.valid()) { - TRI_ASSERT((*itResult).isObject()); - if (!(*itResult).hasKey("error")) { - doOneDoc(itValue.value(), itResult.value()); - count++; - } - itValue.next(); - itResult.next(); - } - } else { - VPackArrayBuilder guard(&payload); - doOneDoc(value, ourResult); - count++; - } - if (count > 0) { - auto body = std::make_shared(); - *body = payload.slice().toJson(); - - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - arangodb::rest::RequestType::DELETE_REQ, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, chooseTimeout(count), - nrDone, Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (replicationWorked) { - bool found; - requests[i].result.answer->header(StaticStrings::ErrorCodes, found); - replicationWorked = !found; + VPackSlice ourResult = resultBuilder.slice(); + size_t count = 0; + if (value.isArray()) { + VPackArrayBuilder guard(&payload); + VPackArrayIterator itValue(value); + VPackArrayIterator itResult(ourResult); + while (itValue.valid() && itResult.valid()) { + TRI_ASSERT((*itResult).isObject()); + if (!(*itResult).hasKey("error")) { + doOneDoc(itValue.value(), itResult.value()); + count++; } - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "removeLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + itValue.next(); + itResult.next(); + } + } else { + VPackArrayBuilder guard(&payload); + doOneDoc(value, ourResult); + count++; + } + if (count > 0) { + auto body = std::make_shared(); + *body = payload.slice().toJson(); + + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + arangodb::rest::RequestType::DELETE_REQ, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, chooseTimeout(count), + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header(StaticStrings::ErrorCodes, found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "removeLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } @@ -2667,44 +2675,45 @@ OperationResult Transaction::truncateLocal(std::string const& collectionName, // Now replicate the good operations on all followers: auto cc = arangodb::ClusterComm::instance(); + if (cc != nullptr) { + // nullptr only happens on controlled shutdown + std::string path + = "/_db/" + + arangodb::basics::StringUtils::urlEncode(_vocbase->name()) + + "/_api/collection/" + collectionName + "/truncate"; - std::string path - = "/_db/" + - arangodb::basics::StringUtils::urlEncode(_vocbase->name()) + - "/_api/collection/" + collectionName + "/truncate"; + auto body = std::make_shared(); - auto body = std::make_shared(); - - // Now prepare the requests: - std::vector requests; - for (auto const& f : *followers) { - requests.emplace_back("server:" + f, - arangodb::rest::RequestType::PUT, - path, body); - } - size_t nrDone = 0; - size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, - nrDone, Logger::REPLICATION); - if (nrGood < followers->size()) { - // we drop all followers that were not successful: - for (size_t i = 0; i < followers->size(); ++i) { - bool replicationWorked - = requests[i].done && - requests[i].result.status == CL_COMM_RECEIVED && - (requests[i].result.answer_code == - rest::ResponseCode::ACCEPTED || - requests[i].result.answer_code == - rest::ResponseCode::OK); - if (!replicationWorked) { - auto const& followerInfo = collection->followers(); - followerInfo->remove((*followers)[i]); - LOG_TOPIC(ERR, Logger::REPLICATION) - << "truncateLocal: dropping follower " - << (*followers)[i] << " for shard " << collectionName; + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + arangodb::rest::RequestType::PUT, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, TRX_FOLLOWER_TIMEOUT, + nrDone, Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code == + rest::ResponseCode::ACCEPTED || + requests[i].result.answer_code == + rest::ResponseCode::OK); + if (!replicationWorked) { + auto const& followerInfo = collection->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "truncateLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } } } } - } } diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index ff33103596..9cb74f9953 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -1348,6 +1348,9 @@ static bool clusterSendToAllServers( arangodb::rest::RequestType const& method, std::string const& body) { ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); + if (cc == nullptr) { + return TRI_ERROR_SHUTTING_DOWN; + } std::string url = "/_db/" + StringUtils::urlEncode(dbname) + "/" + path; // Have to propagate to DB Servers diff --git a/arangod/V8Server/v8-replication.cpp b/arangod/V8Server/v8-replication.cpp index ebe8e0954f..34504c8cf8 100644 --- a/arangod/V8Server/v8-replication.cpp +++ b/arangod/V8Server/v8-replication.cpp @@ -209,7 +209,11 @@ void addReplicationAuthentication(v8::Isolate* isolate, if (!hasUsernamePassword) { auto cluster = application_features::ApplicationServer::getFeature("Cluster"); if (cluster->isEnabled()) { - config._jwt = ClusterComm::instance()->jwt(); + auto cc = ClusterComm::instance(); + if (cc != nullptr) { + // nullptr happens only during controlled shutdown + config._jwt = ClusterComm::instance()->jwt(); + } } } } diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index c00df86185..a1ee079c7f 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -2138,31 +2138,33 @@ static void ListDatabasesCoordinator( if (!DBServers.empty()) { ServerID sid = DBServers[0]; auto cc = ClusterComm::instance(); + if (cc != nullptr) { + // nullptr happens only during controlled shutdown + std::unordered_map headers; + headers["Authentication"] = TRI_ObjectToString(args[2]); + auto res = cc->syncRequest( + "", 0, "server:" + sid, arangodb::rest::RequestType::GET, + "/_api/database/user", std::string(), headers, 0.0); - std::unordered_map headers; - headers["Authentication"] = TRI_ObjectToString(args[2]); - auto res = cc->syncRequest( - "", 0, "server:" + sid, arangodb::rest::RequestType::GET, - "/_api/database/user", std::string(), headers, 0.0); + if (res->status == CL_COMM_SENT) { + // We got an array back as JSON, let's parse it and build a v8 + StringBuffer& body = res->result->getBody(); - if (res->status == CL_COMM_SENT) { - // We got an array back as JSON, let's parse it and build a v8 - StringBuffer& body = res->result->getBody(); + std::shared_ptr builder = + VPackParser::fromJson(body.c_str(), body.length()); + VPackSlice resultSlice = builder->slice(); - std::shared_ptr builder = - VPackParser::fromJson(body.c_str(), body.length()); - VPackSlice resultSlice = builder->slice(); - - if (resultSlice.isObject()) { - VPackSlice r = resultSlice.get("result"); - if (r.isArray()) { - uint32_t i = 0; - v8::Handle result = v8::Array::New(isolate); - for (auto const& it : VPackArrayIterator(r)) { - std::string v = it.copyString(); - result->Set(i++, TRI_V8_STD_STRING(v)); + if (resultSlice.isObject()) { + VPackSlice r = resultSlice.get("result"); + if (r.isArray()) { + uint32_t i = 0; + v8::Handle result = v8::Array::New(isolate); + for (auto const& it : VPackArrayIterator(r)) { + std::string v = it.copyString(); + result->Set(i++, TRI_V8_STD_STRING(v)); + } + TRI_V8_RETURN(result); } - TRI_V8_RETURN(result); } } } From 9386a1b2d535809f2c8438334ac281db78d735b3 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Tue, 7 Feb 2017 17:43:30 +0100 Subject: [PATCH 2/3] symlinks for windows! --- .../Books/Manual/GettingStarted/Installing/Compiling.mdpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Documentation/Books/Manual/GettingStarted/Installing/Compiling.mdpp b/Documentation/Books/Manual/GettingStarted/Installing/Compiling.mdpp index 1e3f0e4381..50d5de7c8f 100644 --- a/Documentation/Books/Manual/GettingStarted/Installing/Compiling.mdpp +++ b/Documentation/Books/Manual/GettingStarted/Installing/Compiling.mdpp @@ -12,6 +12,7 @@ By default, cloning the github repository will checkout **devel**. This version contains the development version of the ArangoDB. Use this branch if you want to make changes to the ArangoDB source. +On windows you first [need to allow and enable symlinks for your user](https://github.com/git-for-windows/git/wiki/Symbolic-Links#allowing-non-administrators-to-create-symbolic-links). We now use [git submodules](https://git-scm.com/docs/git-submodule) for V8 and Rocksdb. Since the V8 git repository also requires external addons to be present, we end up with recursive submodules. Thus a clone command now has to look like: @@ -21,5 +22,8 @@ up with recursive submodules. Thus a clone command now has to look like: git submodule update --recursive git submodule update --init --recursive + + + Please checkout the [cookbook](https://docs.arangodb.com/cookbook) on how to compile ArangoDB. From cbcda7932c3d49389da3cb2ac03636da80dc40b9 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Tue, 7 Feb 2017 22:37:38 +0100 Subject: [PATCH 3/3] Change undocumented behaviour in case of invalid rev in If-Match headers. An invalid rev should lead to a 412 PRECONDITION FAILED rather than a 404 BAD error. This is more intuitive, in particular since we have never documented what valid rev strings are. Also adjust tests and CHANGELOG. --- CHANGELOG | 3 +++ .../HttpInterface/api-document-delete-spec.rb | 12 +++++------ .../HttpInterface/api-document-read-spec.rb | 2 +- .../HttpInterface/api-document-update-spec.rb | 12 +++++------ arangod/RestHandler/RestDocumentHandler.cpp | 20 ++++++------------- 5 files changed, 22 insertions(+), 27 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index b3991a849e..3f059e2316 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ devel ----- +* change undocumented behaviour in case of invalid revision ids in + If-Match and If-None-Match headers from 400 (BAD) to 412 (PRECONDITION + FAILED). v3.2.alpha1 (2017-02-05) ------------------------ diff --git a/UnitTests/HttpInterface/api-document-delete-spec.rb b/UnitTests/HttpInterface/api-document-delete-spec.rb index c3dc052d92..d9609c047e 100644 --- a/UnitTests/HttpInterface/api-document-delete-spec.rb +++ b/UnitTests/HttpInterface/api-document-delete-spec.rb @@ -190,20 +190,20 @@ describe ArangoDB do hdr = { "if-match" => "\"*abcd\"" } doc = ArangoDB.log_delete("#{prefix}-rev-invalid", cmd, :headers => hdr ) - doc.code.should eq(400) + doc.code.should eq(412) doc.parsed_response['error'].should eq(true) - doc.parsed_response['errorNum'].should eq(400) - doc.parsed_response['code'].should eq(400) + doc.parsed_response['errorNum'].should eq(1200) + doc.parsed_response['code'].should eq(412) # delete document, invalid revision cmd = "/_api/document/#{did}" hdr = { "if-match" => "'*abcd'" } doc = ArangoDB.log_delete("#{prefix}-rev-invalid", cmd, :headers => hdr) - doc.code.should eq(400) + doc.code.should eq(412) doc.parsed_response['error'].should eq(true) - doc.parsed_response['errorNum'].should eq(400) - doc.parsed_response['code'].should eq(400) + doc.parsed_response['errorNum'].should eq(1200) + doc.parsed_response['code'].should eq(412) # delete document, correct revision cmd = "/_api/document/#{did}" diff --git a/UnitTests/HttpInterface/api-document-read-spec.rb b/UnitTests/HttpInterface/api-document-read-spec.rb index 2e31cb8a05..107dbafe51 100644 --- a/UnitTests/HttpInterface/api-document-read-spec.rb +++ b/UnitTests/HttpInterface/api-document-read-spec.rb @@ -567,7 +567,7 @@ describe ArangoDB do hdr = { "if-match" => "'*abcd'" } doc = ArangoDB.log_head("#{prefix}-head-rev-invalid", cmd, :headers => hdr) - doc.code.should eq(400) + doc.code.should eq(412) end end diff --git a/UnitTests/HttpInterface/api-document-update-spec.rb b/UnitTests/HttpInterface/api-document-update-spec.rb index 6ecdc12fb5..b36fed20c1 100644 --- a/UnitTests/HttpInterface/api-document-update-spec.rb +++ b/UnitTests/HttpInterface/api-document-update-spec.rb @@ -266,20 +266,20 @@ describe ArangoDB do hdr = { "if-match" => "\"*abcd\"" } doc = ArangoDB.log_put("#{prefix}-rev-invalid", cmd, :headers => hdr, :body => body) - doc.code.should eq(400) + doc.code.should eq(412) doc.parsed_response['error'].should eq(true) - doc.parsed_response['errorNum'].should eq(400) - doc.parsed_response['code'].should eq(400) + doc.parsed_response['errorNum'].should eq(1200) + doc.parsed_response['code'].should eq(412) # update document, invalid revision cmd = "/_api/document/#{did}" hdr = { "if-match" => "'*abcd'" } doc = ArangoDB.log_put("#{prefix}-rev-invalid", cmd, :headers => hdr, :body => body) - doc.code.should eq(400) + doc.code.should eq(412) doc.parsed_response['error'].should eq(true) - doc.parsed_response['errorNum'].should eq(400) - doc.parsed_response['code'].should eq(400) + doc.parsed_response['errorNum'].should eq(1200) + doc.parsed_response['code'].should eq(412) # update document, correct revision cmd = "/_api/document/#{did}" diff --git a/arangod/RestHandler/RestDocumentHandler.cpp b/arangod/RestHandler/RestDocumentHandler.cpp index 6a1e75e2b2..110eca6167 100644 --- a/arangod/RestHandler/RestDocumentHandler.cpp +++ b/arangod/RestHandler/RestDocumentHandler.cpp @@ -203,23 +203,19 @@ bool RestDocumentHandler::readSingleDocument(bool generateBody) { // check for an etag bool isValidRevision; - TRI_voc_rid_t const ifNoneRid = + TRI_voc_rid_t ifNoneRid = extractRevision("if-none-match", isValidRevision); if (!isValidRevision) { - generateError(rest::ResponseCode::BAD, - TRI_ERROR_HTTP_BAD_PARAMETER, "invalid revision number"); - return false; + ifNoneRid = 1; // an impossible rev, so precondition failed will happen } OperationOptions options; options.ignoreRevs = true; - TRI_voc_rid_t const ifRid = + TRI_voc_rid_t ifRid = extractRevision("if-match", isValidRevision); if (!isValidRevision) { - generateError(rest::ResponseCode::BAD, - TRI_ERROR_HTTP_BAD_PARAMETER, "invalid revision number"); - return false; + ifRid = 1; // an impossible rev, so precondition failed will happen } VPackBuilder builder; @@ -396,9 +392,7 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) { bool isValidRevision; revision = extractRevision("if-match", isValidRevision); if (!isValidRevision) { - generateError(rest::ResponseCode::BAD, - TRI_ERROR_HTTP_BAD_PARAMETER, "invalid revision number"); - return false; + revision = 1; // an impossible revision, so precondition failed } VPackSlice keyInBody = body.get(StaticStrings::KeyString); if ((revision != 0 && TRI_ExtractRevisionId(body) != revision) || @@ -502,9 +496,7 @@ bool RestDocumentHandler::deleteDocument() { bool isValidRevision = false; revision = extractRevision("if-match", isValidRevision); if (!isValidRevision) { - generateError(rest::ResponseCode::BAD, - TRI_ERROR_HTTP_BAD_PARAMETER, "invalid revision number"); - return false; + revision = 1; // an impossible revision, so precondition failed } }