From 288f42c5316b99ee428151b1ea7a487683a38c0a Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Wed, 7 Sep 2016 11:52:21 +0200 Subject: [PATCH] constituent starting anyway without inception --- arangod/Agency/Agent.cpp | 3 +- arangod/Agency/Constituent.cpp | 2 +- arangod/Cluster/ClusterComm.cpp | 119 ++++++++++++++++++-------------- 3 files changed, 69 insertions(+), 55 deletions(-) diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index a0135acdf1..00b3e54fa1 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -137,7 +137,6 @@ void Agent::startConstituent() { auto database = ApplicationServer::getFeature("Database"); auto vocbase = database->vocbase(); auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY; - _constituent.start(vocbase, queryRegistry); } // Waits here for confirmation of log's commits up to index. @@ -401,8 +400,8 @@ bool Agent::load() { TRI_ASSERT(queryRegistry != nullptr); if (size() == 1) { activateAgency(); - _constituent.start(vocbase, queryRegistry); } + _constituent.start(vocbase, queryRegistry); if (_config.supervision()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities"; diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 37ae4e1aa5..7978cdd714 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -230,7 +230,7 @@ std::string Constituent::endpoint(std::string id) const { /// @brief Vote bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex, term_t prevLogTerm, bool appendEntries) { - if(!_vocbase) { + if(_vocbase==nullptr) { return false; } diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index ee47cfa655..9ddecbdff0 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -69,16 +69,16 @@ void ClusterCommResult::setDestination(std::string const& dest, serverID = ""; status = CL_COMM_BACKEND_UNAVAILABLE; if (logConnectionErrors) { - LOG(ERR) << "cannot find responsible server for shard '" - << shardID << "'"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "cannot find responsible server for shard '" << shardID << "'"; } else { - LOG(INFO) << "cannot find responsible server for shard '" - << shardID << "'"; + LOG_TOPIC(INFO, Logger::CLUSTER) + << "cannot find responsible server for shard '" << shardID << "'"; } return; } } - LOG(DEBUG) << "Responsible server: " << serverID; + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Responsible server: " << serverID; } else if (dest.substr(0, 7) == "server:") { shardID = ""; serverID = dest.substr(7); @@ -94,9 +94,11 @@ void ClusterCommResult::setDestination(std::string const& dest, status = CL_COMM_BACKEND_UNAVAILABLE; errorMessage = "did not understand destination'" + dest + "'"; if (logConnectionErrors) { - LOG(ERR) << "did not understand destination '" << dest << "'"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "did not understand destination '" << dest << "'"; } else { - LOG(INFO) << "did not understand destination '" << dest << "'"; + LOG_TOPIC(INFO, Logger::CLUSTER) + << "did not understand destination '" << dest << "'"; } return; } @@ -107,11 +109,11 @@ void ClusterCommResult::setDestination(std::string const& dest, status = CL_COMM_BACKEND_UNAVAILABLE; errorMessage = "did not find endpoint of server '" + serverID + "'"; if (logConnectionErrors) { - LOG(ERR) << "did not find endpoint of server '" << serverID - << "'"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "did not find endpoint of server '" << serverID << "'"; } else { - LOG(INFO) << "did not find endpoint of server '" << serverID - << "'"; + LOG_TOPIC(INFO, Logger::CLUSTER) + << "did not find endpoint of server '" << serverID << "'"; } } } @@ -254,7 +256,8 @@ void ClusterComm::startBackgroundThread() { _backgroundThread = new ClusterCommThread(); if (!_backgroundThread->start()) { - LOG(FATAL) << "ClusterComm background thread does not work"; + LOG_TOPIC(FATAL, Logger::CLUSTER) + << "ClusterComm background thread does not work"; FATAL_ERROR_EXIT(); } } @@ -352,8 +355,9 @@ OperationID ClusterComm::asyncRequest( if (op->result.status == CL_COMM_BACKEND_UNAVAILABLE) { // We put it into the received queue right away for error reporting: ClusterCommResult const resCopy(op->result); - LOG(DEBUG) << "In asyncRequest, putting failed request " - << resCopy.operationID << " directly into received queue."; + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "In asyncRequest, putting failed request " << resCopy.operationID + << " directly into received queue."; CONDITION_LOCKER(locker, somethingReceived); received.push_back(op.get()); op.release(); @@ -415,8 +419,8 @@ OperationID ClusterComm::asyncRequest( // LOCKING-DEBUG // std::cout << "asyncRequest: sending " << - // arangodb::rest::HttpRequest::translateMethod(reqtype) << " request to DB - // server '" << op->serverID << ":" << path << "\n" << *(body.get()) << "\n"; + // arangodb::rest::HttpRequest::translateMethod(reqtype, Logger::CLUSTER) << " request to DB + // server '" << op->serverID << ":" << path << "\n" << *(body.get(), Logger::CLUSTER) << "\n"; // for (auto& h : *(op->headerFields)) { // std::cout << h.first << ":" << h.second << std::endl; // } @@ -430,7 +434,8 @@ OperationID ClusterComm::asyncRequest( std::list::iterator i = toSend.end(); toSendByOpID[opId] = --i; } - LOG(DEBUG) << "In asyncRequest, put into queue " << opId; + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "In asyncRequest, put into queue " << opId; somethingToSend.signal(); return opId; @@ -505,20 +510,22 @@ std::unique_ptr ClusterComm::syncRequest( res->errorMessage = "cannot create connection to server '" + res->serverID + "'"; if (logConnectionErrors()) { - LOG(ERR) << "cannot create connection to server '" - << res->serverID << "' at endpoint '" << res->endpoint << "'"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "cannot create connection to server '" << res->serverID + << "' at endpoint '" << res->endpoint << "'"; } else { - LOG(INFO) << "cannot create connection to server '" - << res->serverID << "' at endpoint '" << res->endpoint << "'"; + LOG_TOPIC(INFO, Logger::CLUSTER) + << "cannot create connection to server '" << res->serverID + << "' at endpoint '" << res->endpoint << "'"; } } else { - LOG(DEBUG) << "sending " - << arangodb::HttpRequest::translateMethod(reqtype) - << " request to DB server '" << res->serverID << "' at endpoint '" << res->endpoint - << "': " << body; + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "sending " << arangodb::HttpRequest::translateMethod(reqtype) + << " request to DB server '" << res->serverID << "' at endpoint '" + << res->endpoint << "': " << body; // LOCKING-DEBUG // std::cout << "syncRequest: sending " << - // arangodb::rest::HttpRequest::translateMethod(reqtype) << " request to + // arangodb::rest::HttpRequest::translateMethod(reqtype, Logger::CLUSTER) << " request to // DB server '" << res->serverID << ":" << path << "\n" << body << "\n"; // for (auto& h : headersCopy) { // std::cout << h.first << ":" << h.second << std::endl; @@ -892,11 +899,13 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader, size_t start = 0; size_t pos; - LOG(DEBUG) << "In asyncAnswer, seeing " << coordinatorHeader; + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "In asyncAnswer, seeing " << coordinatorHeader; pos = coordinatorHeader.find(":", start); if (pos == std::string::npos) { - LOG(ERR) << "Could not find coordinator ID in X-Arango-Coordinator"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "Could not find coordinator ID in X-Arango-Coordinator"; return; } @@ -909,11 +918,13 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader, if (endpoint == "") { if (logConnectionErrors()) { - LOG(ERR) << "asyncAnswer: cannot find endpoint for server '" - << coordinatorID << "'"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "asyncAnswer: cannot find endpoint for server '" + << coordinatorID << "'"; } else { - LOG(INFO) << "asyncAnswer: cannot find endpoint for server '" - << coordinatorID << "'"; + LOG_TOPIC(INFO, Logger::CLUSTER) + << "asyncAnswer: cannot find endpoint for server '" + << coordinatorID << "'"; } return; } @@ -922,12 +933,14 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader, cm->leaseConnection(endpoint); if (nullptr == connection) { - LOG(ERR) << "asyncAnswer: cannot create connection to server '" - << coordinatorID << "'"; + LOG_TOPIC(ERR, Logger::CLUSTER) + << "asyncAnswer: cannot create connection to server '" + << coordinatorID << "'"; return; } - std::unordered_map headers = responseToSend->headers(); + std::unordered_map headers = + responseToSend->headers(); headers["X-Arango-Coordinator"] = coordinatorHeader; headers["X-Arango-Response-Code"] = responseToSend->responseString(responseToSend->responseCode()); @@ -939,8 +952,9 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader, char const* body = responseToSend->body().c_str(); size_t len = responseToSend->body().length(); - LOG(DEBUG) << "asyncAnswer: sending PUT request to DB server '" - << coordinatorID << "'"; + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "asyncAnswer: sending PUT request to DB server '" + << coordinatorID << "'"; auto client = std::make_unique( connection->_connection, 3600.0, false); @@ -981,7 +995,8 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader, size_t start = 0; size_t pos; - LOG(DEBUG) << "In processAnswer, seeing " << coordinatorHeader; + LOG_TOPIC(DEBUG, Logger::CLUSTER) + << "In processAnswer, seeing " << coordinatorHeader; pos = coordinatorHeader.find(":", start); if (pos == std::string::npos) { @@ -1062,7 +1077,7 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader, //////////////////////////////////////////////////////////////////////////////// bool ClusterComm::moveFromSendToReceived(OperationID operationID) { - LOG(DEBUG) << "In moveFromSendToReceived " << operationID; + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "In moveFromSendToReceived " << operationID; CONDITION_LOCKER(locker, somethingReceived); CONDITION_LOCKER(sendLocker, somethingToSend); @@ -1258,7 +1273,7 @@ size_t ClusterComm::performRequests(std::vector& requests, auto it = opIDtoIndex.find(res.operationID); if (it == opIDtoIndex.end()) { // Ooops, we got a response to which we did not send the request - LOG(ERR) << "Received ClusterComm response for a request we did not send!"; + LOG_TOPIC(ERR, Logger::CLUSTER) << "Received ClusterComm response for a request we did not send!"; continue; } size_t index = it->second; @@ -1271,7 +1286,7 @@ size_t ClusterComm::performRequests(std::vector& requests, res.answer_code == GeneralResponse::ResponseCode::ACCEPTED) { nrGood++; } - LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " + LOG_TOPIC(TRACE, Logger::CLUSTER) << "ClusterComm::performRequests: " << "got answer from " << requests[index].destination << ":" << requests[index].path << " with return code " << (int) res.answer_code; @@ -1290,7 +1305,7 @@ size_t ClusterComm::performRequests(std::vector& requests, if (dueTime[index] < actionNeeded) { actionNeeded = dueTime[index]; } - LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " + LOG_TOPIC(TRACE, Logger::CLUSTER) << "ClusterComm::performRequests: " << "got BACKEND_UNAVAILABLE or TIMEOUT from " << requests[index].destination << ":" << requests[index].path; @@ -1298,7 +1313,7 @@ size_t ClusterComm::performRequests(std::vector& requests, requests[index].result = res; requests[index].done = true; nrDone++; - LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " + LOG_TOPIC(TRACE, Logger::CLUSTER) << "ClusterComm::performRequests: " << "got no answer from " << requests[index].destination << ":" << requests[index].path << " with error " << res.status; } @@ -1309,14 +1324,14 @@ size_t ClusterComm::performRequests(std::vector& requests, } } } catch (...) { - LOG_TOPIC(ERR, logTopic) << "ClusterComm::performRequests: " + LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: " << "caught exception, ignoring..."; } // We only get here if the global timeout was triggered, not all // requests are marked by done! - LOG_TOPIC(DEBUG, logTopic) << "ClusterComm::performRequests: " + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "ClusterComm::performRequests: " << "got timeout, this will be reported..."; // Forget about @@ -1405,7 +1420,7 @@ void ClusterCommThread::run() { ClusterCommOperation* op; ClusterComm* cc = ClusterComm::instance(); - LOG(DEBUG) << "starting ClusterComm thread"; + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "starting ClusterComm thread"; while (!isStopping()) { // First check the sending queue, as long as it is not empty, we send @@ -1421,7 +1436,7 @@ void ClusterCommThread::run() { if (cc->toSend.empty()) { break; } else { - LOG(DEBUG) << "Noticed something to send"; + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Noticed something to send"; op = cc->toSend.front(); TRI_ASSERT(op->result.status == CL_COMM_SUBMITTED); op->result.status = CL_COMM_SENDING; @@ -1448,15 +1463,15 @@ void ClusterCommThread::run() { op->result.errorMessage = "cannot create connection to server: "; op->result.errorMessage += op->result.serverID; if (cc->logConnectionErrors()) { - LOG(ERR) << "cannot create connection to server '" + LOG_TOPIC(ERR, Logger::CLUSTER) << "cannot create connection to server '" << op->result.serverID << "' at endpoint '" << op->result.endpoint << "'"; } else { - LOG(INFO) << "cannot create connection to server '" + LOG_TOPIC(INFO, Logger::CLUSTER) << "cannot create connection to server '" << op->result.serverID << "' at endpoint '" << op->result.endpoint << "'"; } } else { if (nullptr != op->body.get()) { - LOG(DEBUG) << "sending " + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "sending " << arangodb::HttpRequest::translateMethod( op->reqtype) .c_str() @@ -1464,7 +1479,7 @@ void ClusterCommThread::run() { << op->result.serverID << "' at endpoint '" << op->result.endpoint << "': " << op->body->c_str(); } else { - LOG(DEBUG) << "sending " + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "sending " << arangodb::HttpRequest::translateMethod( op->reqtype) .c_str() @@ -1586,5 +1601,5 @@ void ClusterCommThread::run() { } } - LOG(DEBUG) << "stopped ClusterComm thread"; + LOG_TOPIC(DEBUG, Logger::CLUSTER) << "stopped ClusterComm thread"; }