1
0
Fork 0

constituent starting anyway without inception

This commit is contained in:
Kaveh Vahedipour 2016-09-07 11:52:21 +02:00
parent 2499c3e2e8
commit 288f42c531
3 changed files with 69 additions and 55 deletions

View File

@ -137,7 +137,6 @@ void Agent::startConstituent() {
auto database = ApplicationServer::getFeature<DatabaseFeature>("Database"); auto database = ApplicationServer::getFeature<DatabaseFeature>("Database");
auto vocbase = database->vocbase(); auto vocbase = database->vocbase();
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY; auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
_constituent.start(vocbase, queryRegistry);
} }
// Waits here for confirmation of log's commits up to index. // Waits here for confirmation of log's commits up to index.
@ -401,8 +400,8 @@ bool Agent::load() {
TRI_ASSERT(queryRegistry != nullptr); TRI_ASSERT(queryRegistry != nullptr);
if (size() == 1) { if (size() == 1) {
activateAgency(); activateAgency();
_constituent.start(vocbase, queryRegistry);
} }
_constituent.start(vocbase, queryRegistry);
if (_config.supervision()) { if (_config.supervision()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities"; LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities";

View File

@ -230,7 +230,7 @@ std::string Constituent::endpoint(std::string id) const {
/// @brief Vote /// @brief Vote
bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex, bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex,
term_t prevLogTerm, bool appendEntries) { term_t prevLogTerm, bool appendEntries) {
if(!_vocbase) { if(_vocbase==nullptr) {
return false; return false;
} }

View File

@ -69,16 +69,16 @@ void ClusterCommResult::setDestination(std::string const& dest,
serverID = ""; serverID = "";
status = CL_COMM_BACKEND_UNAVAILABLE; status = CL_COMM_BACKEND_UNAVAILABLE;
if (logConnectionErrors) { if (logConnectionErrors) {
LOG(ERR) << "cannot find responsible server for shard '" LOG_TOPIC(ERR, Logger::CLUSTER)
<< shardID << "'"; << "cannot find responsible server for shard '" << shardID << "'";
} else { } else {
LOG(INFO) << "cannot find responsible server for shard '" LOG_TOPIC(INFO, Logger::CLUSTER)
<< shardID << "'"; << "cannot find responsible server for shard '" << shardID << "'";
} }
return; return;
} }
} }
LOG(DEBUG) << "Responsible server: " << serverID; LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Responsible server: " << serverID;
} else if (dest.substr(0, 7) == "server:") { } else if (dest.substr(0, 7) == "server:") {
shardID = ""; shardID = "";
serverID = dest.substr(7); serverID = dest.substr(7);
@ -94,9 +94,11 @@ void ClusterCommResult::setDestination(std::string const& dest,
status = CL_COMM_BACKEND_UNAVAILABLE; status = CL_COMM_BACKEND_UNAVAILABLE;
errorMessage = "did not understand destination'" + dest + "'"; errorMessage = "did not understand destination'" + dest + "'";
if (logConnectionErrors) { if (logConnectionErrors) {
LOG(ERR) << "did not understand destination '" << dest << "'"; LOG_TOPIC(ERR, Logger::CLUSTER)
<< "did not understand destination '" << dest << "'";
} else { } else {
LOG(INFO) << "did not understand destination '" << dest << "'"; LOG_TOPIC(INFO, Logger::CLUSTER)
<< "did not understand destination '" << dest << "'";
} }
return; return;
} }
@ -107,11 +109,11 @@ void ClusterCommResult::setDestination(std::string const& dest,
status = CL_COMM_BACKEND_UNAVAILABLE; status = CL_COMM_BACKEND_UNAVAILABLE;
errorMessage = "did not find endpoint of server '" + serverID + "'"; errorMessage = "did not find endpoint of server '" + serverID + "'";
if (logConnectionErrors) { if (logConnectionErrors) {
LOG(ERR) << "did not find endpoint of server '" << serverID LOG_TOPIC(ERR, Logger::CLUSTER)
<< "'"; << "did not find endpoint of server '" << serverID << "'";
} else { } else {
LOG(INFO) << "did not find endpoint of server '" << serverID LOG_TOPIC(INFO, Logger::CLUSTER)
<< "'"; << "did not find endpoint of server '" << serverID << "'";
} }
} }
} }
@ -254,7 +256,8 @@ void ClusterComm::startBackgroundThread() {
_backgroundThread = new ClusterCommThread(); _backgroundThread = new ClusterCommThread();
if (!_backgroundThread->start()) { if (!_backgroundThread->start()) {
LOG(FATAL) << "ClusterComm background thread does not work"; LOG_TOPIC(FATAL, Logger::CLUSTER)
<< "ClusterComm background thread does not work";
FATAL_ERROR_EXIT(); FATAL_ERROR_EXIT();
} }
} }
@ -352,8 +355,9 @@ OperationID ClusterComm::asyncRequest(
if (op->result.status == CL_COMM_BACKEND_UNAVAILABLE) { if (op->result.status == CL_COMM_BACKEND_UNAVAILABLE) {
// We put it into the received queue right away for error reporting: // We put it into the received queue right away for error reporting:
ClusterCommResult const resCopy(op->result); ClusterCommResult const resCopy(op->result);
LOG(DEBUG) << "In asyncRequest, putting failed request " LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< resCopy.operationID << " directly into received queue."; << "In asyncRequest, putting failed request " << resCopy.operationID
<< " directly into received queue.";
CONDITION_LOCKER(locker, somethingReceived); CONDITION_LOCKER(locker, somethingReceived);
received.push_back(op.get()); received.push_back(op.get());
op.release(); op.release();
@ -415,8 +419,8 @@ OperationID ClusterComm::asyncRequest(
// LOCKING-DEBUG // LOCKING-DEBUG
// std::cout << "asyncRequest: sending " << // std::cout << "asyncRequest: sending " <<
// arangodb::rest::HttpRequest::translateMethod(reqtype) << " request to DB // arangodb::rest::HttpRequest::translateMethod(reqtype, Logger::CLUSTER) << " request to DB
// server '" << op->serverID << ":" << path << "\n" << *(body.get()) << "\n"; // server '" << op->serverID << ":" << path << "\n" << *(body.get(), Logger::CLUSTER) << "\n";
// for (auto& h : *(op->headerFields)) { // for (auto& h : *(op->headerFields)) {
// std::cout << h.first << ":" << h.second << std::endl; // std::cout << h.first << ":" << h.second << std::endl;
// } // }
@ -430,7 +434,8 @@ OperationID ClusterComm::asyncRequest(
std::list<ClusterCommOperation*>::iterator i = toSend.end(); std::list<ClusterCommOperation*>::iterator i = toSend.end();
toSendByOpID[opId] = --i; toSendByOpID[opId] = --i;
} }
LOG(DEBUG) << "In asyncRequest, put into queue " << opId; LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "In asyncRequest, put into queue " << opId;
somethingToSend.signal(); somethingToSend.signal();
return opId; return opId;
@ -505,20 +510,22 @@ std::unique_ptr<ClusterCommResult> ClusterComm::syncRequest(
res->errorMessage = res->errorMessage =
"cannot create connection to server '" + res->serverID + "'"; "cannot create connection to server '" + res->serverID + "'";
if (logConnectionErrors()) { if (logConnectionErrors()) {
LOG(ERR) << "cannot create connection to server '" LOG_TOPIC(ERR, Logger::CLUSTER)
<< res->serverID << "' at endpoint '" << res->endpoint << "'"; << "cannot create connection to server '" << res->serverID
<< "' at endpoint '" << res->endpoint << "'";
} else { } else {
LOG(INFO) << "cannot create connection to server '" LOG_TOPIC(INFO, Logger::CLUSTER)
<< res->serverID << "' at endpoint '" << res->endpoint << "'"; << "cannot create connection to server '" << res->serverID
<< "' at endpoint '" << res->endpoint << "'";
} }
} else { } else {
LOG(DEBUG) << "sending " LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< arangodb::HttpRequest::translateMethod(reqtype) << "sending " << arangodb::HttpRequest::translateMethod(reqtype)
<< " request to DB server '" << res->serverID << "' at endpoint '" << res->endpoint << " request to DB server '" << res->serverID << "' at endpoint '"
<< "': " << body; << res->endpoint << "': " << body;
// LOCKING-DEBUG // LOCKING-DEBUG
// std::cout << "syncRequest: sending " << // std::cout << "syncRequest: sending " <<
// arangodb::rest::HttpRequest::translateMethod(reqtype) << " request to // arangodb::rest::HttpRequest::translateMethod(reqtype, Logger::CLUSTER) << " request to
// DB server '" << res->serverID << ":" << path << "\n" << body << "\n"; // DB server '" << res->serverID << ":" << path << "\n" << body << "\n";
// for (auto& h : headersCopy) { // for (auto& h : headersCopy) {
// std::cout << h.first << ":" << h.second << std::endl; // std::cout << h.first << ":" << h.second << std::endl;
@ -892,11 +899,13 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
size_t start = 0; size_t start = 0;
size_t pos; size_t pos;
LOG(DEBUG) << "In asyncAnswer, seeing " << coordinatorHeader; LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "In asyncAnswer, seeing " << coordinatorHeader;
pos = coordinatorHeader.find(":", start); pos = coordinatorHeader.find(":", start);
if (pos == std::string::npos) { if (pos == std::string::npos) {
LOG(ERR) << "Could not find coordinator ID in X-Arango-Coordinator"; LOG_TOPIC(ERR, Logger::CLUSTER)
<< "Could not find coordinator ID in X-Arango-Coordinator";
return; return;
} }
@ -909,11 +918,13 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
if (endpoint == "") { if (endpoint == "") {
if (logConnectionErrors()) { if (logConnectionErrors()) {
LOG(ERR) << "asyncAnswer: cannot find endpoint for server '" LOG_TOPIC(ERR, Logger::CLUSTER)
<< coordinatorID << "'"; << "asyncAnswer: cannot find endpoint for server '"
<< coordinatorID << "'";
} else { } else {
LOG(INFO) << "asyncAnswer: cannot find endpoint for server '" LOG_TOPIC(INFO, Logger::CLUSTER)
<< coordinatorID << "'"; << "asyncAnswer: cannot find endpoint for server '"
<< coordinatorID << "'";
} }
return; return;
} }
@ -922,12 +933,14 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
cm->leaseConnection(endpoint); cm->leaseConnection(endpoint);
if (nullptr == connection) { if (nullptr == connection) {
LOG(ERR) << "asyncAnswer: cannot create connection to server '" LOG_TOPIC(ERR, Logger::CLUSTER)
<< coordinatorID << "'"; << "asyncAnswer: cannot create connection to server '"
<< coordinatorID << "'";
return; return;
} }
std::unordered_map<std::string, std::string> headers = responseToSend->headers(); std::unordered_map<std::string, std::string> headers =
responseToSend->headers();
headers["X-Arango-Coordinator"] = coordinatorHeader; headers["X-Arango-Coordinator"] = coordinatorHeader;
headers["X-Arango-Response-Code"] = headers["X-Arango-Response-Code"] =
responseToSend->responseString(responseToSend->responseCode()); responseToSend->responseString(responseToSend->responseCode());
@ -939,8 +952,9 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
char const* body = responseToSend->body().c_str(); char const* body = responseToSend->body().c_str();
size_t len = responseToSend->body().length(); size_t len = responseToSend->body().length();
LOG(DEBUG) << "asyncAnswer: sending PUT request to DB server '" LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< coordinatorID << "'"; << "asyncAnswer: sending PUT request to DB server '"
<< coordinatorID << "'";
auto client = std::make_unique<arangodb::httpclient::SimpleHttpClient>( auto client = std::make_unique<arangodb::httpclient::SimpleHttpClient>(
connection->_connection, 3600.0, false); connection->_connection, 3600.0, false);
@ -981,7 +995,8 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
size_t start = 0; size_t start = 0;
size_t pos; size_t pos;
LOG(DEBUG) << "In processAnswer, seeing " << coordinatorHeader; LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "In processAnswer, seeing " << coordinatorHeader;
pos = coordinatorHeader.find(":", start); pos = coordinatorHeader.find(":", start);
if (pos == std::string::npos) { if (pos == std::string::npos) {
@ -1062,7 +1077,7 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool ClusterComm::moveFromSendToReceived(OperationID operationID) { bool ClusterComm::moveFromSendToReceived(OperationID operationID) {
LOG(DEBUG) << "In moveFromSendToReceived " << operationID; LOG_TOPIC(DEBUG, Logger::CLUSTER) << "In moveFromSendToReceived " << operationID;
CONDITION_LOCKER(locker, somethingReceived); CONDITION_LOCKER(locker, somethingReceived);
CONDITION_LOCKER(sendLocker, somethingToSend); CONDITION_LOCKER(sendLocker, somethingToSend);
@ -1258,7 +1273,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
auto it = opIDtoIndex.find(res.operationID); auto it = opIDtoIndex.find(res.operationID);
if (it == opIDtoIndex.end()) { if (it == opIDtoIndex.end()) {
// Ooops, we got a response to which we did not send the request // Ooops, we got a response to which we did not send the request
LOG(ERR) << "Received ClusterComm response for a request we did not send!"; LOG_TOPIC(ERR, Logger::CLUSTER) << "Received ClusterComm response for a request we did not send!";
continue; continue;
} }
size_t index = it->second; size_t index = it->second;
@ -1271,7 +1286,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
res.answer_code == GeneralResponse::ResponseCode::ACCEPTED) { res.answer_code == GeneralResponse::ResponseCode::ACCEPTED) {
nrGood++; nrGood++;
} }
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " LOG_TOPIC(TRACE, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got answer from " << requests[index].destination << ":" << "got answer from " << requests[index].destination << ":"
<< requests[index].path << " with return code " << requests[index].path << " with return code "
<< (int) res.answer_code; << (int) res.answer_code;
@ -1290,7 +1305,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
if (dueTime[index] < actionNeeded) { if (dueTime[index] < actionNeeded) {
actionNeeded = dueTime[index]; actionNeeded = dueTime[index];
} }
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " LOG_TOPIC(TRACE, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got BACKEND_UNAVAILABLE or TIMEOUT from " << "got BACKEND_UNAVAILABLE or TIMEOUT from "
<< requests[index].destination << ":" << requests[index].destination << ":"
<< requests[index].path; << requests[index].path;
@ -1298,7 +1313,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
requests[index].result = res; requests[index].result = res;
requests[index].done = true; requests[index].done = true;
nrDone++; nrDone++;
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " LOG_TOPIC(TRACE, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got no answer from " << requests[index].destination << ":" << "got no answer from " << requests[index].destination << ":"
<< requests[index].path << " with error " << res.status; << requests[index].path << " with error " << res.status;
} }
@ -1309,14 +1324,14 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
} }
} }
} catch (...) { } catch (...) {
LOG_TOPIC(ERR, logTopic) << "ClusterComm::performRequests: " LOG_TOPIC(ERR, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "caught exception, ignoring..."; << "caught exception, ignoring...";
} }
// We only get here if the global timeout was triggered, not all // We only get here if the global timeout was triggered, not all
// requests are marked by done! // requests are marked by done!
LOG_TOPIC(DEBUG, logTopic) << "ClusterComm::performRequests: " LOG_TOPIC(DEBUG, Logger::CLUSTER) << "ClusterComm::performRequests: "
<< "got timeout, this will be reported..."; << "got timeout, this will be reported...";
// Forget about // Forget about
@ -1405,7 +1420,7 @@ void ClusterCommThread::run() {
ClusterCommOperation* op; ClusterCommOperation* op;
ClusterComm* cc = ClusterComm::instance(); ClusterComm* cc = ClusterComm::instance();
LOG(DEBUG) << "starting ClusterComm thread"; LOG_TOPIC(DEBUG, Logger::CLUSTER) << "starting ClusterComm thread";
while (!isStopping()) { while (!isStopping()) {
// First check the sending queue, as long as it is not empty, we send // First check the sending queue, as long as it is not empty, we send
@ -1421,7 +1436,7 @@ void ClusterCommThread::run() {
if (cc->toSend.empty()) { if (cc->toSend.empty()) {
break; break;
} else { } else {
LOG(DEBUG) << "Noticed something to send"; LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Noticed something to send";
op = cc->toSend.front(); op = cc->toSend.front();
TRI_ASSERT(op->result.status == CL_COMM_SUBMITTED); TRI_ASSERT(op->result.status == CL_COMM_SUBMITTED);
op->result.status = CL_COMM_SENDING; op->result.status = CL_COMM_SENDING;
@ -1448,15 +1463,15 @@ void ClusterCommThread::run() {
op->result.errorMessage = "cannot create connection to server: "; op->result.errorMessage = "cannot create connection to server: ";
op->result.errorMessage += op->result.serverID; op->result.errorMessage += op->result.serverID;
if (cc->logConnectionErrors()) { if (cc->logConnectionErrors()) {
LOG(ERR) << "cannot create connection to server '" LOG_TOPIC(ERR, Logger::CLUSTER) << "cannot create connection to server '"
<< op->result.serverID << "' at endpoint '" << op->result.endpoint << "'"; << op->result.serverID << "' at endpoint '" << op->result.endpoint << "'";
} else { } else {
LOG(INFO) << "cannot create connection to server '" LOG_TOPIC(INFO, Logger::CLUSTER) << "cannot create connection to server '"
<< op->result.serverID << "' at endpoint '" << op->result.endpoint << "'"; << op->result.serverID << "' at endpoint '" << op->result.endpoint << "'";
} }
} else { } else {
if (nullptr != op->body.get()) { if (nullptr != op->body.get()) {
LOG(DEBUG) << "sending " LOG_TOPIC(DEBUG, Logger::CLUSTER) << "sending "
<< arangodb::HttpRequest::translateMethod( << arangodb::HttpRequest::translateMethod(
op->reqtype) op->reqtype)
.c_str() .c_str()
@ -1464,7 +1479,7 @@ void ClusterCommThread::run() {
<< op->result.serverID << "' at endpoint '" << op->result.endpoint << op->result.serverID << "' at endpoint '" << op->result.endpoint
<< "': " << op->body->c_str(); << "': " << op->body->c_str();
} else { } else {
LOG(DEBUG) << "sending " LOG_TOPIC(DEBUG, Logger::CLUSTER) << "sending "
<< arangodb::HttpRequest::translateMethod( << arangodb::HttpRequest::translateMethod(
op->reqtype) op->reqtype)
.c_str() .c_str()
@ -1586,5 +1601,5 @@ void ClusterCommThread::run() {
} }
} }
LOG(DEBUG) << "stopped ClusterComm thread"; LOG_TOPIC(DEBUG, Logger::CLUSTER) << "stopped ClusterComm thread";
} }