diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index 50b6a0a79b..7e687e6ae2 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -1091,18 +1091,19 @@ static bool throwExceptionAfterBadSyncRequest(ClusterCommResult* res, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_TIMEOUT, errorMessage); } + if (res->status == CL_COMM_BACKEND_UNAVAILABLE) { + // there is no result + std::string errorMessage = + std::string("Empty result in communication with shard '") + + std::string(res->shardID) + std::string("' on cluster node '") + + std::string(res->serverID) + std::string("'"); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST, + errorMessage); + } + if (res->status == CL_COMM_ERROR) { std::string errorMessage; - // This could be a broken connection or an Http error: - if (res->result == nullptr || !res->result->isComplete()) { - // there is no result - errorMessage += - std::string("Empty result in communication with shard '") + - std::string(res->shardID) + std::string("' on cluster node '") + - std::string(res->serverID) + std::string("'"); - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST, - errorMessage); - } + TRI_ASSERT(nullptr != res->result); StringBuffer const& responseBodyBuf(res->result->getBody()); diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 14a4e13b2d..f527f7068c 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -65,7 +65,7 @@ void ClusterCommResult::setDestination(std::string const& dest, serverID = (*resp)[0]; } else { serverID = ""; - status = CL_COMM_ERROR; + status = CL_COMM_BACKEND_UNAVAILABLE; if (logConnectionErrors) { LOG(ERR) << "cannot find responsible server for shard '" << shardID << "'"; @@ -89,7 +89,7 @@ void ClusterCommResult::setDestination(std::string const& dest, shardID = ""; serverID = ""; endpoint = ""; - status = CL_COMM_ERROR; + status = CL_COMM_BACKEND_UNAVAILABLE; errorMessage = "did not understand destination'" + dest + "'"; if (logConnectionErrors) { LOG(ERR) << "did not understand destination '" << dest << "'"; @@ -102,7 +102,7 @@ void ClusterCommResult::setDestination(std::string const& dest, auto ci = ClusterInfo::instance(); endpoint = ci->getServerEndpoint(serverID); if (endpoint.empty()) { - status = CL_COMM_ERROR; + status = CL_COMM_BACKEND_UNAVAILABLE; errorMessage = "did not find endpoint of server '" + serverID + "'"; if (logConnectionErrors) { LOG(ERR) << "did not find endpoint of server '" << serverID @@ -253,8 +253,7 @@ OperationID ClusterComm::asyncRequest( op->result.setDestination(destination, logConnectionErrors()); if (op->result.status == CL_COMM_ERROR) { - // In the non-singleRequest mode we want to put it into the received - // queue right away for backward compatibility: + // We put it into the received queue right away for error reporting: ClusterCommResult const resCopy(op->result); LOG(DEBUG) << "In asyncRequest, putting failed request " << resCopy.operationID << " directly into received queue."; @@ -262,7 +261,17 @@ OperationID ClusterComm::asyncRequest( received.push_back(op.get()); op.release(); auto q = received.end(); - receivedByOpID[resCopy.operationID] = --q; + receivedByOpID[opId] = --q; + if (nullptr != callback) { + op.reset(*q); + if ( (*callback.get())(&(op->result)) ) { + auto i = receivedByOpID.find(opId); + receivedByOpID.erase(i); + received.erase(q); + } else { + op.release(); + } + } somethingReceived.broadcast(); return opId; } @@ -370,7 +379,7 @@ std::unique_ptr ClusterComm::syncRequest( res->setDestination(destination, logConnectionErrors()); - if (res->status == CL_COMM_ERROR) { + if (res->status == CL_COMM_BACKEND_UNAVAILABLE) { return res; } @@ -886,9 +895,10 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader, if ((*op->callback.get())(&op->result)) { // This is fully processed, so let's remove it from the queue: QueueIterator q = i->second; + std::unique_ptr o(op); receivedByOpID.erase(i); received.erase(q); - delete op; + return std::string(""); } } } else { @@ -909,9 +919,10 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader, if ((*op->callback)(&op->result)) { // This is fully processed, so let's remove it from the queue: QueueIterator q = i->second; + std::unique_ptr o(op); toSendByOpID.erase(i); toSend.erase(q); - delete op; + return std::string(""); } } } else { @@ -1140,8 +1151,7 @@ size_t ClusterComm::performRequests(std::vector& requests, LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: " << "got BACKEND_UNAVAILABLE or TIMEOUT from " << requests[index].destination << ":" - << requests[index].path << " with return code " - << (int) res.answer_code; + << requests[index].path; // In this case we will retry at the dueTime } else { // a "proper error" requests[index].result = res; @@ -1329,13 +1339,29 @@ void ClusterCommThread::run() { CONDITION_LOCKER(locker, cc->somethingReceived); ClusterComm::QueueIterator q; - for (q = cc->received.begin(); q != cc->received.end(); ++q) { + for (q = cc->received.begin(); q != cc->received.end(); ) { + bool deleted = false; op = *q; if (op->result.status == CL_COMM_SENT) { if (op->endTime < currentTime) { op->result.status = CL_COMM_TIMEOUT; + if (nullptr != op->callback.get()) { + if ( (*op->callback.get())(&op->result) ) { + // This is fully processed, so let's remove it from the queue: + auto i = cc->receivedByOpID.find(op->result.operationID); + TRI_ASSERT(i != cc->receivedByOpID.end()); + cc->receivedByOpID.erase(i); + std::unique_ptr o(op); + auto qq = q++; + cc->received.erase(qq); + deleted = true; + } + } } } + if (!deleted) { + ++q; + } } } diff --git a/arangod/Cluster/ClusterComm.h b/arangod/Cluster/ClusterComm.h index fb85885c5e..0c726407b3 100644 --- a/arangod/Cluster/ClusterComm.h +++ b/arangod/Cluster/ClusterComm.h @@ -70,7 +70,7 @@ enum ClusterCommOpStatus { CL_COMM_SENT = 3, // initial request sent, response available CL_COMM_TIMEOUT = 4, // no answer received until timeout CL_COMM_RECEIVED = 5, // answer received - CL_COMM_ERROR = 6, // original request could not be sent + CL_COMM_ERROR = 6, // original request could not be sent or HTTP error CL_COMM_DROPPED = 7, // operation was dropped, not known // this is only used to report an error // in the wait or enquire methods @@ -82,7 +82,87 @@ enum ClusterCommOpStatus { //////////////////////////////////////////////////////////////////////////////// /// @brief used to report the status, progress and possibly result of -/// an operation +/// an operation, this is used for the asyncRequest (with singleRequest +/// equal to true or to false), and for syncRequest. +/// +/// Here is a complete overview of how the request can happen and how this +/// is reflected in the ClusterCommResult. We first cover the asyncRequest +/// case and then describe the differences for syncRequest: +/// +/// First, the actual destination is determined. If the responsible server +/// for a shard is not found or the endpoint for a named server is not found, +/// or if the given endpoint is no known protocol (currently "tcp://" or +/// "ssl://", then `status` is set to CL_COMM_BACKEND_UNAVAILABLE, +/// `errorMessage` is set but `result` and `answer` are both set +/// to nullptr. The flag `sendWasComplete` remains false and the +/// `answer_code` remains GeneralResponse::ResponseCode::PROCESSING. +/// A potentially given ClusterCommCallback is called. +/// +/// If no error occurs so far, the status is set to CL_COMM_SUBMITTED. +/// Still, `result`, `answer` and `answer_code` are not yet set. +/// A call to ClusterComm::enquire can return a result with this status. +/// A call to ClusterComm::wait cannot return a result wuth this status. +/// The request is queued for sending. +/// +/// As soon as the sending thread discovers the submitted request, it +/// sets its status to CL_COMM_SENDING and tries to open a connection +/// or reuse an existing connection. If opening a connection fails +/// the status is set to CL_COMM_BACKEND_UNAVAILABLE. If the given timeout +/// is already reached, the status is set to CL_COMM_TIMEOUT. In both +/// error cases `result`, `answer` and `answer_code` are still unset. +/// +/// If the connection was successfully created the request is sent. +/// If the request ended with a timeout, `status` is set to +/// CL_COMM_TIMEOUT as above. If another communication error (broken +/// connection) happens, `status` is set to CL_COMM_BACKEND_UNAVAILABLE. +/// In both cases, `result` can be set or can still be a nullptr. +/// `answer` and `answer_code` are still unset. +/// +/// If the request is completed, but an HTTP status code >= 400 occurred, +/// the status is set to CL_COMM_ERROR, but `result` is set correctly +/// to indicate the error. If all is well, `status` is set to CL_COMM_SENT. +/// +/// In the `singleRequest==true` mode, the operation is finished at this +/// stage. The callback is called, and the result either left in the +/// receiving queue or dropped. A call to ClusterComm::enquire or +/// ClusterComm::wait can return a result in this state. Note that +/// `answer` and `answer_code` are still not set. The flag +/// `sendWasComplete` is correctly set, though. +/// +/// In the `singleRequest==false` mode, an asynchronous operation happens +/// at the server side and eventually, an HTTP request in the opposite +/// direction is issued. During that time, `status` remains CL_COMM_SENT. +/// A call to ClusterComm::enquire can return a result in this state. +/// A call to ClusterComm::wait does not. +/// +/// If the answer does not arrive in the specified timeout, `status` +/// is set to CL_COMM_TIMEOUT and a potential callback is called. If +/// From then on, ClusterComm::wait will return it (unless deleted +/// by the callback returning true). +/// +/// If an answer comes in in time, then `answer` and `answer_code` +/// are finally set, and `status` is set to CL_COMM_RECEIVED. The callback +/// is called, and the result either left in the received queue for +/// pickup by ClusterComm::wait or deleted. Note that if we get this +/// far, `status` is set to CL_COMM_RECEIVED, even if the status code +/// of the answer is >= 400. +/// +/// Summing up, we have the following outcomes: +/// `status` `result` set `answer` set wait() returns +/// CL_COMM_SUBMITTED no no no +/// CL_COMM_SENDING no no no +/// CL_COMM_SENT yes no yes if single +/// CL_COMM_BACKEND_UN... yes or no no yes +/// CL_COMM_TIMEOUT yes or no no yes +/// CL_COMM_ERROR yes no yes +/// CL_COMM_RECEIVED yes yes yes +/// CL_COMM_DROPPED no no yes +/// +/// The syncRequest behaves essentially in the same way, except that +/// no callback is ever called, the outcome cannot be CL_COMM_RECEIVED +/// or CL_COMM_DROPPED, and CL_COMM_SENT indicates a successful completion. +/// CL_COMM_ERROR means that the request was complete, but an HTTP error +/// occurred. //////////////////////////////////////////////////////////////////////////////// struct ClusterCommResult { diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index d3694367b6..2518506684 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1536,7 +1536,7 @@ int ClusterInfo::ensureIndexCoordinator( AgencyCommResult previous = ac.getValues(key); - if (!res.successful()) { + if (!previous.successful()) { return TRI_ERROR_CLUSTER_READING_PLAN_AGENCY; } diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 532d5d2bf6..7bbfa0644c 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -52,6 +52,8 @@ static int handleGeneralCommErrors(ClusterCommResult const* res) { return TRI_ERROR_CLUSTER_TIMEOUT; } else if (res->status == CL_COMM_ERROR) { // This could be a broken connection or an Http error: + // This case cannot actually happen, but it is not very expensive + // to check, just to be on the safe side: if (res->result == nullptr || !res->result->isComplete()) { // there is not result return TRI_ERROR_CLUSTER_CONNECTION_LOST; diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 7eaa4f7d96..1b41f0c6ec 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -1671,7 +1671,7 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo const& args) { clientTransactionID, coordTransactionID, destination, reqType, path, body, headerFields, 0, timeout, singleRequest); ClusterCommResult res = cc->enquire(opId); - if (res.status == CL_COMM_ERROR) { + if (res.status == CL_COMM_BACKEND_UNAVAILABLE) { TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "couldn't queue async request"); } diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 0d359f6edb..05d0a646f2 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -799,15 +799,16 @@ void RestReplicationHandler::handleTrampolineCoordinator() { "timeout within cluster"); return; } + if (res->status == CL_COMM_BACKEND_UNAVAILABLE) { + // there is no result + generateError(GeneralResponse::ResponseCode::BAD, + TRI_ERROR_CLUSTER_CONNECTION_LOST, + "lost connection within cluster"); + return; + } if (res->status == CL_COMM_ERROR) { // This could be a broken connection or an Http error: - if (res->result == nullptr || !res->result->isComplete()) { - // there is no result - generateError(GeneralResponse::ResponseCode::BAD, - TRI_ERROR_CLUSTER_CONNECTION_LOST, - "lost connection within cluster"); - return; - } + TRI_ASSERT(nullptr != res->result && res->result->isComplete()); // In this case a proper HTTP error was reported by the DBserver, // we simply forward the result. // We intentionally fall through here. diff --git a/arangod/V8Server/v8-actions.cpp b/arangod/V8Server/v8-actions.cpp index 4d12a9e965..5ebe768b01 100644 --- a/arangod/V8Server/v8-actions.cpp +++ b/arangod/V8Server/v8-actions.cpp @@ -1187,7 +1187,8 @@ static bool clusterSendToAllServers( cc->drop("", coordTransactionID, 0, ""); return TRI_ERROR_CLUSTER_TIMEOUT; } - if (res.status == CL_COMM_ERROR || res.status == CL_COMM_DROPPED) { + if (res.status == CL_COMM_ERROR || res.status == CL_COMM_DROPPED || + res.status == CL_COMM_BACKEND_UNAVAILABLE) { cc->drop("", coordTransactionID, 0, ""); return TRI_ERROR_INTERNAL; }