mirror of https://gitee.com/bigwinds/arangodb
Cleanup of error handling for asyncRequest and syncRequest.
I have added a thorough description of events to the comments in ClusterComm.h. This should enable everybody to do proper error handling when using ClusterComm::asyncRequest and ClusterComm::syncRequest.
This commit is contained in:
parent
a2d237f84d
commit
20ef93d76b
|
@ -1091,18 +1091,19 @@ static bool throwExceptionAfterBadSyncRequest(ClusterCommResult* res,
|
||||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_TIMEOUT, errorMessage);
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_TIMEOUT, errorMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (res->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
|
// there is no result
|
||||||
|
std::string errorMessage =
|
||||||
|
std::string("Empty result in communication with shard '") +
|
||||||
|
std::string(res->shardID) + std::string("' on cluster node '") +
|
||||||
|
std::string(res->serverID) + std::string("'");
|
||||||
|
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST,
|
||||||
|
errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
if (res->status == CL_COMM_ERROR) {
|
if (res->status == CL_COMM_ERROR) {
|
||||||
std::string errorMessage;
|
std::string errorMessage;
|
||||||
// This could be a broken connection or an Http error:
|
TRI_ASSERT(nullptr != res->result);
|
||||||
if (res->result == nullptr || !res->result->isComplete()) {
|
|
||||||
// there is no result
|
|
||||||
errorMessage +=
|
|
||||||
std::string("Empty result in communication with shard '") +
|
|
||||||
std::string(res->shardID) + std::string("' on cluster node '") +
|
|
||||||
std::string(res->serverID) + std::string("'");
|
|
||||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST,
|
|
||||||
errorMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
StringBuffer const& responseBodyBuf(res->result->getBody());
|
StringBuffer const& responseBodyBuf(res->result->getBody());
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ void ClusterCommResult::setDestination(std::string const& dest,
|
||||||
serverID = (*resp)[0];
|
serverID = (*resp)[0];
|
||||||
} else {
|
} else {
|
||||||
serverID = "";
|
serverID = "";
|
||||||
status = CL_COMM_ERROR;
|
status = CL_COMM_BACKEND_UNAVAILABLE;
|
||||||
if (logConnectionErrors) {
|
if (logConnectionErrors) {
|
||||||
LOG(ERR) << "cannot find responsible server for shard '"
|
LOG(ERR) << "cannot find responsible server for shard '"
|
||||||
<< shardID << "'";
|
<< shardID << "'";
|
||||||
|
@ -89,7 +89,7 @@ void ClusterCommResult::setDestination(std::string const& dest,
|
||||||
shardID = "";
|
shardID = "";
|
||||||
serverID = "";
|
serverID = "";
|
||||||
endpoint = "";
|
endpoint = "";
|
||||||
status = CL_COMM_ERROR;
|
status = CL_COMM_BACKEND_UNAVAILABLE;
|
||||||
errorMessage = "did not understand destination'" + dest + "'";
|
errorMessage = "did not understand destination'" + dest + "'";
|
||||||
if (logConnectionErrors) {
|
if (logConnectionErrors) {
|
||||||
LOG(ERR) << "did not understand destination '" << dest << "'";
|
LOG(ERR) << "did not understand destination '" << dest << "'";
|
||||||
|
@ -102,7 +102,7 @@ void ClusterCommResult::setDestination(std::string const& dest,
|
||||||
auto ci = ClusterInfo::instance();
|
auto ci = ClusterInfo::instance();
|
||||||
endpoint = ci->getServerEndpoint(serverID);
|
endpoint = ci->getServerEndpoint(serverID);
|
||||||
if (endpoint.empty()) {
|
if (endpoint.empty()) {
|
||||||
status = CL_COMM_ERROR;
|
status = CL_COMM_BACKEND_UNAVAILABLE;
|
||||||
errorMessage = "did not find endpoint of server '" + serverID + "'";
|
errorMessage = "did not find endpoint of server '" + serverID + "'";
|
||||||
if (logConnectionErrors) {
|
if (logConnectionErrors) {
|
||||||
LOG(ERR) << "did not find endpoint of server '" << serverID
|
LOG(ERR) << "did not find endpoint of server '" << serverID
|
||||||
|
@ -253,8 +253,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
|
|
||||||
op->result.setDestination(destination, logConnectionErrors());
|
op->result.setDestination(destination, logConnectionErrors());
|
||||||
if (op->result.status == CL_COMM_ERROR) {
|
if (op->result.status == CL_COMM_ERROR) {
|
||||||
// In the non-singleRequest mode we want to put it into the received
|
// We put it into the received queue right away for error reporting:
|
||||||
// queue right away for backward compatibility:
|
|
||||||
ClusterCommResult const resCopy(op->result);
|
ClusterCommResult const resCopy(op->result);
|
||||||
LOG(DEBUG) << "In asyncRequest, putting failed request "
|
LOG(DEBUG) << "In asyncRequest, putting failed request "
|
||||||
<< resCopy.operationID << " directly into received queue.";
|
<< resCopy.operationID << " directly into received queue.";
|
||||||
|
@ -262,7 +261,17 @@ OperationID ClusterComm::asyncRequest(
|
||||||
received.push_back(op.get());
|
received.push_back(op.get());
|
||||||
op.release();
|
op.release();
|
||||||
auto q = received.end();
|
auto q = received.end();
|
||||||
receivedByOpID[resCopy.operationID] = --q;
|
receivedByOpID[opId] = --q;
|
||||||
|
if (nullptr != callback) {
|
||||||
|
op.reset(*q);
|
||||||
|
if ( (*callback.get())(&(op->result)) ) {
|
||||||
|
auto i = receivedByOpID.find(opId);
|
||||||
|
receivedByOpID.erase(i);
|
||||||
|
received.erase(q);
|
||||||
|
} else {
|
||||||
|
op.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
somethingReceived.broadcast();
|
somethingReceived.broadcast();
|
||||||
return opId;
|
return opId;
|
||||||
}
|
}
|
||||||
|
@ -370,7 +379,7 @@ std::unique_ptr<ClusterCommResult> ClusterComm::syncRequest(
|
||||||
|
|
||||||
res->setDestination(destination, logConnectionErrors());
|
res->setDestination(destination, logConnectionErrors());
|
||||||
|
|
||||||
if (res->status == CL_COMM_ERROR) {
|
if (res->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -886,9 +895,10 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
|
||||||
if ((*op->callback.get())(&op->result)) {
|
if ((*op->callback.get())(&op->result)) {
|
||||||
// This is fully processed, so let's remove it from the queue:
|
// This is fully processed, so let's remove it from the queue:
|
||||||
QueueIterator q = i->second;
|
QueueIterator q = i->second;
|
||||||
|
std::unique_ptr<ClusterCommOperation> o(op);
|
||||||
receivedByOpID.erase(i);
|
receivedByOpID.erase(i);
|
||||||
received.erase(q);
|
received.erase(q);
|
||||||
delete op;
|
return std::string("");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -909,9 +919,10 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
|
||||||
if ((*op->callback)(&op->result)) {
|
if ((*op->callback)(&op->result)) {
|
||||||
// This is fully processed, so let's remove it from the queue:
|
// This is fully processed, so let's remove it from the queue:
|
||||||
QueueIterator q = i->second;
|
QueueIterator q = i->second;
|
||||||
|
std::unique_ptr<ClusterCommOperation> o(op);
|
||||||
toSendByOpID.erase(i);
|
toSendByOpID.erase(i);
|
||||||
toSend.erase(q);
|
toSend.erase(q);
|
||||||
delete op;
|
return std::string("");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1140,8 +1151,7 @@ size_t ClusterComm::performRequests(std::vector<ClusterCommRequest>& requests,
|
||||||
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: "
|
LOG_TOPIC(TRACE, logTopic) << "ClusterComm::performRequests: "
|
||||||
<< "got BACKEND_UNAVAILABLE or TIMEOUT from "
|
<< "got BACKEND_UNAVAILABLE or TIMEOUT from "
|
||||||
<< requests[index].destination << ":"
|
<< requests[index].destination << ":"
|
||||||
<< requests[index].path << " with return code "
|
<< requests[index].path;
|
||||||
<< (int) res.answer_code;
|
|
||||||
// In this case we will retry at the dueTime
|
// In this case we will retry at the dueTime
|
||||||
} else { // a "proper error"
|
} else { // a "proper error"
|
||||||
requests[index].result = res;
|
requests[index].result = res;
|
||||||
|
@ -1329,13 +1339,29 @@ void ClusterCommThread::run() {
|
||||||
CONDITION_LOCKER(locker, cc->somethingReceived);
|
CONDITION_LOCKER(locker, cc->somethingReceived);
|
||||||
|
|
||||||
ClusterComm::QueueIterator q;
|
ClusterComm::QueueIterator q;
|
||||||
for (q = cc->received.begin(); q != cc->received.end(); ++q) {
|
for (q = cc->received.begin(); q != cc->received.end(); ) {
|
||||||
|
bool deleted = false;
|
||||||
op = *q;
|
op = *q;
|
||||||
if (op->result.status == CL_COMM_SENT) {
|
if (op->result.status == CL_COMM_SENT) {
|
||||||
if (op->endTime < currentTime) {
|
if (op->endTime < currentTime) {
|
||||||
op->result.status = CL_COMM_TIMEOUT;
|
op->result.status = CL_COMM_TIMEOUT;
|
||||||
|
if (nullptr != op->callback.get()) {
|
||||||
|
if ( (*op->callback.get())(&op->result) ) {
|
||||||
|
// This is fully processed, so let's remove it from the queue:
|
||||||
|
auto i = cc->receivedByOpID.find(op->result.operationID);
|
||||||
|
TRI_ASSERT(i != cc->receivedByOpID.end());
|
||||||
|
cc->receivedByOpID.erase(i);
|
||||||
|
std::unique_ptr<ClusterCommOperation> o(op);
|
||||||
|
auto qq = q++;
|
||||||
|
cc->received.erase(qq);
|
||||||
|
deleted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!deleted) {
|
||||||
|
++q;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ enum ClusterCommOpStatus {
|
||||||
CL_COMM_SENT = 3, // initial request sent, response available
|
CL_COMM_SENT = 3, // initial request sent, response available
|
||||||
CL_COMM_TIMEOUT = 4, // no answer received until timeout
|
CL_COMM_TIMEOUT = 4, // no answer received until timeout
|
||||||
CL_COMM_RECEIVED = 5, // answer received
|
CL_COMM_RECEIVED = 5, // answer received
|
||||||
CL_COMM_ERROR = 6, // original request could not be sent
|
CL_COMM_ERROR = 6, // original request could not be sent or HTTP error
|
||||||
CL_COMM_DROPPED = 7, // operation was dropped, not known
|
CL_COMM_DROPPED = 7, // operation was dropped, not known
|
||||||
// this is only used to report an error
|
// this is only used to report an error
|
||||||
// in the wait or enquire methods
|
// in the wait or enquire methods
|
||||||
|
@ -82,7 +82,87 @@ enum ClusterCommOpStatus {
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief used to report the status, progress and possibly result of
|
/// @brief used to report the status, progress and possibly result of
|
||||||
/// an operation
|
/// an operation, this is used for the asyncRequest (with singleRequest
|
||||||
|
/// equal to true or to false), and for syncRequest.
|
||||||
|
///
|
||||||
|
/// Here is a complete overview of how the request can happen and how this
|
||||||
|
/// is reflected in the ClusterCommResult. We first cover the asyncRequest
|
||||||
|
/// case and then describe the differences for syncRequest:
|
||||||
|
///
|
||||||
|
/// First, the actual destination is determined. If the responsible server
|
||||||
|
/// for a shard is not found or the endpoint for a named server is not found,
|
||||||
|
/// or if the given endpoint is no known protocol (currently "tcp://" or
|
||||||
|
/// "ssl://", then `status` is set to CL_COMM_BACKEND_UNAVAILABLE,
|
||||||
|
/// `errorMessage` is set but `result` and `answer` are both set
|
||||||
|
/// to nullptr. The flag `sendWasComplete` remains false and the
|
||||||
|
/// `answer_code` remains GeneralResponse::ResponseCode::PROCESSING.
|
||||||
|
/// A potentially given ClusterCommCallback is called.
|
||||||
|
///
|
||||||
|
/// If no error occurs so far, the status is set to CL_COMM_SUBMITTED.
|
||||||
|
/// Still, `result`, `answer` and `answer_code` are not yet set.
|
||||||
|
/// A call to ClusterComm::enquire can return a result with this status.
|
||||||
|
/// A call to ClusterComm::wait cannot return a result wuth this status.
|
||||||
|
/// The request is queued for sending.
|
||||||
|
///
|
||||||
|
/// As soon as the sending thread discovers the submitted request, it
|
||||||
|
/// sets its status to CL_COMM_SENDING and tries to open a connection
|
||||||
|
/// or reuse an existing connection. If opening a connection fails
|
||||||
|
/// the status is set to CL_COMM_BACKEND_UNAVAILABLE. If the given timeout
|
||||||
|
/// is already reached, the status is set to CL_COMM_TIMEOUT. In both
|
||||||
|
/// error cases `result`, `answer` and `answer_code` are still unset.
|
||||||
|
///
|
||||||
|
/// If the connection was successfully created the request is sent.
|
||||||
|
/// If the request ended with a timeout, `status` is set to
|
||||||
|
/// CL_COMM_TIMEOUT as above. If another communication error (broken
|
||||||
|
/// connection) happens, `status` is set to CL_COMM_BACKEND_UNAVAILABLE.
|
||||||
|
/// In both cases, `result` can be set or can still be a nullptr.
|
||||||
|
/// `answer` and `answer_code` are still unset.
|
||||||
|
///
|
||||||
|
/// If the request is completed, but an HTTP status code >= 400 occurred,
|
||||||
|
/// the status is set to CL_COMM_ERROR, but `result` is set correctly
|
||||||
|
/// to indicate the error. If all is well, `status` is set to CL_COMM_SENT.
|
||||||
|
///
|
||||||
|
/// In the `singleRequest==true` mode, the operation is finished at this
|
||||||
|
/// stage. The callback is called, and the result either left in the
|
||||||
|
/// receiving queue or dropped. A call to ClusterComm::enquire or
|
||||||
|
/// ClusterComm::wait can return a result in this state. Note that
|
||||||
|
/// `answer` and `answer_code` are still not set. The flag
|
||||||
|
/// `sendWasComplete` is correctly set, though.
|
||||||
|
///
|
||||||
|
/// In the `singleRequest==false` mode, an asynchronous operation happens
|
||||||
|
/// at the server side and eventually, an HTTP request in the opposite
|
||||||
|
/// direction is issued. During that time, `status` remains CL_COMM_SENT.
|
||||||
|
/// A call to ClusterComm::enquire can return a result in this state.
|
||||||
|
/// A call to ClusterComm::wait does not.
|
||||||
|
///
|
||||||
|
/// If the answer does not arrive in the specified timeout, `status`
|
||||||
|
/// is set to CL_COMM_TIMEOUT and a potential callback is called. If
|
||||||
|
/// From then on, ClusterComm::wait will return it (unless deleted
|
||||||
|
/// by the callback returning true).
|
||||||
|
///
|
||||||
|
/// If an answer comes in in time, then `answer` and `answer_code`
|
||||||
|
/// are finally set, and `status` is set to CL_COMM_RECEIVED. The callback
|
||||||
|
/// is called, and the result either left in the received queue for
|
||||||
|
/// pickup by ClusterComm::wait or deleted. Note that if we get this
|
||||||
|
/// far, `status` is set to CL_COMM_RECEIVED, even if the status code
|
||||||
|
/// of the answer is >= 400.
|
||||||
|
///
|
||||||
|
/// Summing up, we have the following outcomes:
|
||||||
|
/// `status` `result` set `answer` set wait() returns
|
||||||
|
/// CL_COMM_SUBMITTED no no no
|
||||||
|
/// CL_COMM_SENDING no no no
|
||||||
|
/// CL_COMM_SENT yes no yes if single
|
||||||
|
/// CL_COMM_BACKEND_UN... yes or no no yes
|
||||||
|
/// CL_COMM_TIMEOUT yes or no no yes
|
||||||
|
/// CL_COMM_ERROR yes no yes
|
||||||
|
/// CL_COMM_RECEIVED yes yes yes
|
||||||
|
/// CL_COMM_DROPPED no no yes
|
||||||
|
///
|
||||||
|
/// The syncRequest behaves essentially in the same way, except that
|
||||||
|
/// no callback is ever called, the outcome cannot be CL_COMM_RECEIVED
|
||||||
|
/// or CL_COMM_DROPPED, and CL_COMM_SENT indicates a successful completion.
|
||||||
|
/// CL_COMM_ERROR means that the request was complete, but an HTTP error
|
||||||
|
/// occurred.
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
struct ClusterCommResult {
|
struct ClusterCommResult {
|
||||||
|
|
|
@ -1536,7 +1536,7 @@ int ClusterInfo::ensureIndexCoordinator(
|
||||||
|
|
||||||
AgencyCommResult previous = ac.getValues(key);
|
AgencyCommResult previous = ac.getValues(key);
|
||||||
|
|
||||||
if (!res.successful()) {
|
if (!previous.successful()) {
|
||||||
return TRI_ERROR_CLUSTER_READING_PLAN_AGENCY;
|
return TRI_ERROR_CLUSTER_READING_PLAN_AGENCY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,8 @@ static int handleGeneralCommErrors(ClusterCommResult const* res) {
|
||||||
return TRI_ERROR_CLUSTER_TIMEOUT;
|
return TRI_ERROR_CLUSTER_TIMEOUT;
|
||||||
} else if (res->status == CL_COMM_ERROR) {
|
} else if (res->status == CL_COMM_ERROR) {
|
||||||
// This could be a broken connection or an Http error:
|
// This could be a broken connection or an Http error:
|
||||||
|
// This case cannot actually happen, but it is not very expensive
|
||||||
|
// to check, just to be on the safe side:
|
||||||
if (res->result == nullptr || !res->result->isComplete()) {
|
if (res->result == nullptr || !res->result->isComplete()) {
|
||||||
// there is not result
|
// there is not result
|
||||||
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
|
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
|
||||||
|
|
|
@ -1671,7 +1671,7 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||||
clientTransactionID, coordTransactionID, destination, reqType, path, body,
|
clientTransactionID, coordTransactionID, destination, reqType, path, body,
|
||||||
headerFields, 0, timeout, singleRequest);
|
headerFields, 0, timeout, singleRequest);
|
||||||
ClusterCommResult res = cc->enquire(opId);
|
ClusterCommResult res = cc->enquire(opId);
|
||||||
if (res.status == CL_COMM_ERROR) {
|
if (res.status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||||
"couldn't queue async request");
|
"couldn't queue async request");
|
||||||
}
|
}
|
||||||
|
|
|
@ -799,15 +799,16 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
|
||||||
"timeout within cluster");
|
"timeout within cluster");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (res->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
|
// there is no result
|
||||||
|
generateError(GeneralResponse::ResponseCode::BAD,
|
||||||
|
TRI_ERROR_CLUSTER_CONNECTION_LOST,
|
||||||
|
"lost connection within cluster");
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (res->status == CL_COMM_ERROR) {
|
if (res->status == CL_COMM_ERROR) {
|
||||||
// This could be a broken connection or an Http error:
|
// This could be a broken connection or an Http error:
|
||||||
if (res->result == nullptr || !res->result->isComplete()) {
|
TRI_ASSERT(nullptr != res->result && res->result->isComplete());
|
||||||
// there is no result
|
|
||||||
generateError(GeneralResponse::ResponseCode::BAD,
|
|
||||||
TRI_ERROR_CLUSTER_CONNECTION_LOST,
|
|
||||||
"lost connection within cluster");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// In this case a proper HTTP error was reported by the DBserver,
|
// In this case a proper HTTP error was reported by the DBserver,
|
||||||
// we simply forward the result.
|
// we simply forward the result.
|
||||||
// We intentionally fall through here.
|
// We intentionally fall through here.
|
||||||
|
|
|
@ -1187,7 +1187,8 @@ static bool clusterSendToAllServers(
|
||||||
cc->drop("", coordTransactionID, 0, "");
|
cc->drop("", coordTransactionID, 0, "");
|
||||||
return TRI_ERROR_CLUSTER_TIMEOUT;
|
return TRI_ERROR_CLUSTER_TIMEOUT;
|
||||||
}
|
}
|
||||||
if (res.status == CL_COMM_ERROR || res.status == CL_COMM_DROPPED) {
|
if (res.status == CL_COMM_ERROR || res.status == CL_COMM_DROPPED ||
|
||||||
|
res.status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
cc->drop("", coordTransactionID, 0, "");
|
cc->drop("", coordTransactionID, 0, "");
|
||||||
return TRI_ERROR_INTERNAL;
|
return TRI_ERROR_INTERNAL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue