mirror of https://gitee.com/bigwinds/arangodb
Feature/cluster comm logging (#8971)
This commit is contained in:
parent
d9ff6647d8
commit
057ba7130a
|
@ -38,10 +38,55 @@
|
||||||
#include "VocBase/ticks.h"
|
#include "VocBase/ticks.h"
|
||||||
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
using namespace arangodb::communicator;
|
using namespace arangodb::communicator;
|
||||||
|
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
std::stringstream createRequestInfo(NewRequest const& request) {
|
||||||
|
bool trace = Logger::CLUSTERCOMM.level() == LogLevel::TRACE;
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "id: " << std::setw(8) << std::setiosflags(std::ios::left)
|
||||||
|
<< request._ticketId << std::resetiosflags(std::ios::adjustfield)
|
||||||
|
<< " --> " << request._destination
|
||||||
|
<< " -- " << arangodb::GeneralRequest::translateMethod(request._request->requestType())
|
||||||
|
<< ": " << ( request._request->fullUrl().empty() ? "url unknown" : request._request->fullUrl())
|
||||||
|
;
|
||||||
|
if(trace){
|
||||||
|
try {
|
||||||
|
ss << " -- payload: '" << request._request->payload().toJson() << "'";
|
||||||
|
} catch (...) {
|
||||||
|
ss << " -- can not show payload";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::stringstream createResponseInfo(ClusterCommResult const* result) {
|
||||||
|
bool trace = Logger::CLUSTERCOMM.level() == LogLevel::TRACE;
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "id: " << std::setw(8) << std::setiosflags(std::ios::left)
|
||||||
|
<< result->operationID << std::resetiosflags(std::ios::adjustfield)
|
||||||
|
<< " <-- " << result->endpoint
|
||||||
|
<< " -- " << result->serverID << ":" << (result->shardID.empty() ? "unknown ShardID" : result->shardID)
|
||||||
|
;
|
||||||
|
if(trace){
|
||||||
|
try {
|
||||||
|
if(result->result){
|
||||||
|
ss << " -- payload: '" << result->result->getBody() << "'";
|
||||||
|
} else {
|
||||||
|
ss << " -- payload: no result";
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
ss << "can not show payload";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ss;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// @brief empty map with headers
|
/// @brief empty map with headers
|
||||||
std::unordered_map<std::string, std::string> const ClusterCommRequest::noHeaders;
|
std::unordered_map<std::string, std::string> const ClusterCommRequest::noHeaders;
|
||||||
|
|
||||||
|
@ -450,6 +495,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result->fromError(errorCode, std::move(response));
|
result->fromError(errorCode, std::move(response));
|
||||||
|
LOG_TOPIC("2345c", DEBUG, Logger::CLUSTERCOMM) << createResponseInfo(result.get()).rdbuf();
|
||||||
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
logConnectionError(doLogConnectionErrors, result.get(), initTimeout, __LINE__);
|
logConnectionError(doLogConnectionErrors, result.get(), initTimeout, __LINE__);
|
||||||
}
|
}
|
||||||
|
@ -468,6 +514,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
}
|
}
|
||||||
TRI_ASSERT(response.get() != nullptr);
|
TRI_ASSERT(response.get() != nullptr);
|
||||||
result->fromResponse(std::move(response));
|
result->fromResponse(std::move(response));
|
||||||
|
LOG_TOPIC("23457", DEBUG, Logger::CLUSTERCOMM) << createResponseInfo(result.get()).rdbuf();
|
||||||
/*bool ret =*/((*callback.get())(result.get()));
|
/*bool ret =*/((*callback.get())(result.get()));
|
||||||
// TRI_ASSERT(ret == true);
|
// TRI_ASSERT(ret == true);
|
||||||
};
|
};
|
||||||
|
@ -478,6 +525,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
// having a shared_ptr So it will be gone after this callback
|
// having a shared_ptr So it will be gone after this callback
|
||||||
CONDITION_LOCKER(locker, somethingReceived);
|
CONDITION_LOCKER(locker, somethingReceived);
|
||||||
result->fromError(errorCode, std::move(response));
|
result->fromError(errorCode, std::move(response));
|
||||||
|
LOG_TOPIC("23458", DEBUG, Logger::CLUSTERCOMM) << createResponseInfo(result.get()).rdbuf();
|
||||||
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
|
||||||
logConnectionError(doLogConnectionErrors, result.get(), initTimeout, __LINE__);
|
logConnectionError(doLogConnectionErrors, result.get(), initTimeout, __LINE__);
|
||||||
}
|
}
|
||||||
|
@ -489,6 +537,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
TRI_ASSERT(response.get() != nullptr);
|
TRI_ASSERT(response.get() != nullptr);
|
||||||
CONDITION_LOCKER(locker, somethingReceived);
|
CONDITION_LOCKER(locker, somethingReceived);
|
||||||
result->fromResponse(std::move(response));
|
result->fromResponse(std::move(response));
|
||||||
|
LOG_TOPIC("23459", DEBUG, Logger::CLUSTERCOMM) << createResponseInfo(result.get()).rdbuf();
|
||||||
somethingReceived.broadcast();
|
somethingReceived.broadcast();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -502,6 +551,7 @@ OperationID ClusterComm::asyncRequest(
|
||||||
std::move(callbacks),
|
std::move(callbacks),
|
||||||
opt);
|
opt);
|
||||||
|
|
||||||
|
LOG_TOPIC("2345a", DEBUG, Logger::CLUSTERCOMM) << createRequestInfo(*newRequest).rdbuf();
|
||||||
CONDITION_LOCKER(locker, somethingReceived);
|
CONDITION_LOCKER(locker, somethingReceived);
|
||||||
auto ticketId = communicatorPtr->addRequest(std::move(newRequest));
|
auto ticketId = communicatorPtr->addRequest(std::move(newRequest));
|
||||||
|
|
||||||
|
@ -581,6 +631,7 @@ std::unique_ptr<ClusterCommResult> ClusterComm::syncRequest(
|
||||||
callbacks,
|
callbacks,
|
||||||
opt);
|
opt);
|
||||||
|
|
||||||
|
LOG_TOPIC("34567", TRACE, Logger::CLUSTERCOMM) << createRequestInfo(*newRequest).rdbuf();
|
||||||
CONDITION_LOCKER(isen, cv);
|
CONDITION_LOCKER(isen, cv);
|
||||||
// can't move callbacks here
|
// can't move callbacks here
|
||||||
communicator()->addRequest(std::move(newRequest));
|
communicator()->addRequest(std::move(newRequest));
|
||||||
|
@ -588,6 +639,8 @@ std::unique_ptr<ClusterCommResult> ClusterComm::syncRequest(
|
||||||
while (!wasSignaled) {
|
while (!wasSignaled) {
|
||||||
cv.wait(100000);
|
cv.wait(100000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG_TOPIC("2345b", DEBUG, Logger::CLUSTERCOMM) << createResponseInfo(result.get()).rdbuf();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,6 +111,7 @@ LogTopic Logger::CACHE("cache", LogLevel::INFO);
|
||||||
LogTopic Logger::CLUSTER("cluster", LogLevel::INFO);
|
LogTopic Logger::CLUSTER("cluster", LogLevel::INFO);
|
||||||
LogTopic Logger::COLLECTOR("collector");
|
LogTopic Logger::COLLECTOR("collector");
|
||||||
LogTopic Logger::COMMUNICATION("communication", LogLevel::INFO);
|
LogTopic Logger::COMMUNICATION("communication", LogLevel::INFO);
|
||||||
|
LogTopic Logger::CLUSTERCOMM("clustercomm", LogLevel::INFO);
|
||||||
LogTopic Logger::COMPACTOR("compactor");
|
LogTopic Logger::COMPACTOR("compactor");
|
||||||
LogTopic Logger::CONFIG("config");
|
LogTopic Logger::CONFIG("config");
|
||||||
LogTopic Logger::DATAFILES("datafiles", LogLevel::INFO);
|
LogTopic Logger::DATAFILES("datafiles", LogLevel::INFO);
|
||||||
|
|
|
@ -133,6 +133,7 @@ class Logger {
|
||||||
static LogTopic CLUSTER;
|
static LogTopic CLUSTER;
|
||||||
static LogTopic COLLECTOR;
|
static LogTopic COLLECTOR;
|
||||||
static LogTopic COMMUNICATION;
|
static LogTopic COMMUNICATION;
|
||||||
|
static LogTopic CLUSTERCOMM;
|
||||||
static LogTopic COMPACTOR;
|
static LogTopic COMPACTOR;
|
||||||
static LogTopic CONFIG;
|
static LogTopic CONFIG;
|
||||||
static LogTopic DATAFILES;
|
static LogTopic DATAFILES;
|
||||||
|
|
Loading…
Reference in New Issue