1
0
Fork 0

Create log topics for AgencyComm and Heartbeat.

This commit is contained in:
Max Neunhoeffer 2016-05-09 13:19:23 +02:00
parent 9aa6df528c
commit 0b068fa3e4
4 changed files with 116 additions and 74 deletions

View File

@ -37,7 +37,6 @@
#include "Cluster/ServerState.h"
#include "Endpoint/Endpoint.h"
#include "Logger/Logger.h"
#include "Logger/Logger.h"
#include "Random/RandomGenerator.h"
#include "Rest/HttpRequest.h"
#include "Rest/HttpResponse.h"
@ -593,7 +592,8 @@ bool AgencyComm::tryConnect() {
// mop: not sure if a timeout makes sense here
while (true) {
LOG(INFO) << "Trying to find an active agency. Checking " << endpointsStr;
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
<< "Trying to find an active agency. Checking " << endpointsStr;
std::list<AgencyEndpoint*>::iterator it = _globalEndpoints.begin();
while (it != _globalEndpoints.end()) {
@ -862,8 +862,8 @@ void AgencyComm::disconnect() {
bool AgencyComm::addEndpoint(std::string const& endpointSpecification,
bool toFront) {
LOG(TRACE) << "adding global agency-endpoint '" << endpointSpecification
<< "'";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "adding global agency-endpoint '" << endpointSpecification << "'";
{
WRITE_LOCKER(writeLocker, AgencyComm::_globalLock);
@ -1048,7 +1048,8 @@ bool AgencyComm::setPrefix(std::string const& prefix) {
}
}
LOG(TRACE) << "setting agency-prefix to '" << prefix << "'";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "setting agency-prefix to '" << prefix << "'";
return true;
}
@ -1284,8 +1285,8 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
}
}
} else if (node.isArray()) {
LOG(ERR) << node.toJson();
LOG(ERR) << "Oops...TODO array unexpected";
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << node.toJson();
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << "Oops...TODO array unexpected";
} else {
builder.add("value", node);
}
@ -1333,13 +1334,16 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
VPackStringSink sink(&result._body);
VPackDumper::dump(s, &sink, &dumperOptions);
LOG(TRACE) << "Created fake etcd node" << result._body;
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "Created fake etcd node" << result._body;
result._statusCode = 200;
} catch(std::exception &e) {
LOG(ERR) << "Error transforming result. " << e.what();
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "Error transforming result. " << e.what();
result.clear();
} catch(...) {
LOG(ERR) << "Error transforming result. Out of memory";
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "Error transforming result. Out of memory";
result.clear();
}
@ -1390,10 +1394,12 @@ AgencyCommResult AgencyComm::getValues2(std::string const& key) {
result._statusCode = 200;
} catch(std::exception &e) {
LOG(ERR) << "Error transforming result. " << e.what();
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "Error transforming result. " << e.what();
result.clear();
} catch(...) {
LOG(ERR) << "Error transforming result. Out of memory";
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "Error transforming result. Out of memory";
result.clear();
}
@ -1576,7 +1582,8 @@ uint64_t AgencyComm::uniqid(uint64_t count, double timeout) {
{prefixStripped(), "Sync", "LatestID"}));
if (!(oldSlice.isSmallInt() || oldSlice.isUInt())) {
LOG(WARN) << "Sync/LatestID in agency is not an unsigned integer, fixing...";
LOG_TOPIC(WARN, Logger::AGENCYCOMM)
<< "Sync/LatestID in agency is not an unsigned integer, fixing...";
try {
VPackBuilder builder;
builder.add(VPackValue(0));
@ -1866,10 +1873,12 @@ AgencyCommResult AgencyComm::sendTransactionWithFailover(
result._statusCode = 200;
} catch(std::exception &e) {
LOG(ERR) << "Error transforming result. " << e.what();
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "Error transforming result. " << e.what();
result.clear();
} catch(...) {
LOG(ERR) << "Error transforming result. Out of memory";
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "Error transforming result. Out of memory";
result.clear();
}
return result;
@ -1964,7 +1973,8 @@ AgencyCommResult AgencyComm::sendWithFailover(
if (!AgencyComm::hasEndpoint(endpoint)) {
AgencyComm::addEndpoint(endpoint, true);
LOG(INFO) << "adding agency-endpoint '" << endpoint << "'";
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
<< "adding agency-endpoint '" << endpoint << "'";
// re-check the new endpoint
if (AgencyComm::hasEndpoint(endpoint)) {
@ -1972,8 +1982,9 @@ AgencyCommResult AgencyComm::sendWithFailover(
continue;
}
LOG(ERR) << "found redirection to unknown endpoint '" << endpoint
<< "'. Will not follow!";
LOG_TOPIC(ERR, Logger::AGENCYCOMM)
<< "found redirection to unknown endpoint '" << endpoint
<< "'. Will not follow!";
// this is an error
break;
@ -2029,10 +2040,11 @@ AgencyCommResult AgencyComm::send(
result._connected = false;
result._statusCode = 0;
LOG(TRACE) << "sending " << arangodb::HttpRequest::translateMethod(method)
<< " request to agency at endpoint '"
<< connection->getEndpoint()->specification() << "', url '" << url
<< "': " << body;
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "sending " << arangodb::HttpRequest::translateMethod(method)
<< " request to agency at endpoint '"
<< connection->getEndpoint()->specification() << "', url '" << url
<< "': " << body;
arangodb::httpclient::SimpleHttpClient client(connection, timeout, false);
@ -2052,7 +2064,7 @@ AgencyCommResult AgencyComm::send(
if (response == nullptr) {
connection->disconnect();
result._message = "could not send request to agency";
LOG(TRACE) << "sending request to agency failed";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM) << "sending request to agency failed";
return result;
}
@ -2060,7 +2072,7 @@ AgencyCommResult AgencyComm::send(
if (!response->isComplete()) {
connection->disconnect();
result._message = "sending request to agency failed";
LOG(TRACE) << "sending request to agency failed";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM) << "sending request to agency failed";
return result;
}
@ -2074,7 +2086,8 @@ AgencyCommResult AgencyComm::send(
bool found = false;
result._location = response->getHeaderField("location", found);
LOG(TRACE) << "redirecting to location: '" << result._location << "'";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "redirecting to location: '" << result._location << "'";
if (!found) {
// a 307 without a location header does not make any sense
@ -2090,9 +2103,10 @@ AgencyCommResult AgencyComm::send(
result._body = std::string(sb.c_str(), sb.length());
result._statusCode = response->getHttpReturnCode();
LOG(TRACE) << "request to agency returned status code " << result._statusCode
<< ", message: '" << result._message << "', body: '"
<< result._body << "'";
LOG_TOPIC(TRACE, Logger::AGENCYCOMM)
<< "request to agency returned status code " << result._statusCode
<< ", message: '" << result._message << "', body: '"
<< result._body << "'";
if (result.successful()) {
return result;

View File

@ -108,7 +108,8 @@ void HeartbeatThread::run() {
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::runDBServer() {
LOG(TRACE) << "starting heartbeat thread (DBServer version)";
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "starting heartbeat thread (DBServer version)";
// convert timeout to seconds
double const interval = (double)_interval / 1000.0 / 1000.0;
@ -116,7 +117,8 @@ void HeartbeatThread::runDBServer() {
std::function<bool(VPackSlice const& result)> updatePlan = [&](
VPackSlice const& result) {
if (!result.isNumber()) {
LOG(ERR) << "Plan Version is not a number! " << result.toJson();
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Plan Version is not a number! " << result.toJson();
return false;
}
uint64_t version = result.getNumber<uint64_t>();
@ -126,7 +128,8 @@ void HeartbeatThread::runDBServer() {
MUTEX_LOCKER(mutexLocker, _statusLock);
if (version > _desiredVersions.plan) {
_desiredVersions.plan = version;
LOG(DEBUG) << "Desired Current Version is now " << _desiredVersions.plan;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Desired Current Version is now " << _desiredVersions.plan;
doSync = true;
}
}
@ -145,7 +148,8 @@ void HeartbeatThread::runDBServer() {
while (!registered) {
registered = _agencyCallbackRegistry->registerCallback(planAgencyCallback);
if (!registered) {
LOG(ERR) << "Couldn't register plan change in agency!";
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Couldn't register plan change in agency!";
sleep(1);
}
}
@ -155,7 +159,7 @@ void HeartbeatThread::runDBServer() {
int currentCount = currentCountStart;
while (!isStopping()) {
LOG(DEBUG) << "sending heartbeat to agency";
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sending heartbeat to agency";
double const start = TRI_microtime();
@ -171,7 +175,8 @@ void HeartbeatThread::runDBServer() {
currentCount = currentCountStart;
// send an initial GET request to Sync/Commands/my-id
LOG(TRACE) << "Looking at Sync/Commands/" + _myId;
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "Looking at Sync/Commands/" + _myId;
AgencyCommResult result =
_agency.getValues2("Sync/Commands/" + _myId);
@ -184,17 +189,19 @@ void HeartbeatThread::runDBServer() {
break;
}
LOG(TRACE) << "Refetching Current/Version...";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Refetching Current/Version...";
AgencyCommResult res = _agency.getValues2("Current/Version");
if (!res.successful()) {
LOG(ERR) << "Could not read Current/Version from agency.";
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Could not read Current/Version from agency.";
} else {
VPackSlice s
= res.slice()[0].get(std::vector<std::string>(
{_agency.prefixStripped(), std::string("Current"),
std::string("Version")}));
if (!s.isInteger()) {
LOG(ERR) << "Current/Version in agency is not an integer.";
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Current/Version in agency is not an integer.";
} else {
uint64_t currentVersion = 0;
try {
@ -202,13 +209,15 @@ void HeartbeatThread::runDBServer() {
} catch (...) {
}
if (currentVersion == 0) {
LOG(ERR) << "Current/Version in agency is 0.";
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Current/Version in agency is 0.";
} else {
{
MUTEX_LOCKER(mutexLocker, _statusLock);
if (currentVersion > _desiredVersions.current) {
_desiredVersions.current = currentVersion;
LOG(DEBUG) << "Found greater Current/Version in agency.";
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Found greater Current/Version in agency.";
}
}
syncDBServerStatusQuo();
@ -224,7 +233,7 @@ void HeartbeatThread::runDBServer() {
double remain = interval - (TRI_microtime() - start);
// mop: execute at least once
do {
LOG(TRACE) << "Entering update loop";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Entering update loop";
bool wasNotified;
{
@ -238,7 +247,7 @@ void HeartbeatThread::runDBServer() {
}
if (!wasNotified) {
LOG(TRACE) << "Lock reached timeout";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Lock reached timeout";
planAgencyCallback->refetchAndUpdate(true);
} else {
// mop: a plan change returned successfully...
@ -263,7 +272,8 @@ void HeartbeatThread::runDBServer() {
}
usleep(1000);
}
LOG(TRACE) << "stopped heartbeat thread (DBServer version)";
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "stopped heartbeat thread (DBServer version)";
}
////////////////////////////////////////////////////////////////////////////////
@ -271,7 +281,8 @@ void HeartbeatThread::runDBServer() {
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::runCoordinator() {
LOG(TRACE) << "starting heartbeat thread (coordinator version)";
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "starting heartbeat thread (coordinator version)";
uint64_t oldUserVersion = 0;
@ -286,7 +297,7 @@ void HeartbeatThread::runCoordinator() {
setReady();
while (!isStopping()) {
LOG(TRACE) << "sending heartbeat to agency";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "sending heartbeat to agency";
double const start = TRI_microtime();
// send our state to the agency.
@ -305,9 +316,11 @@ void HeartbeatThread::runCoordinator() {
AgencyCommResult result = _agency.sendTransactionWithFailover(trx);
if (!result.successful()) {
LOG(WARN) << "Heartbeat: Could not read from agency!";
LOG_TOPIC(WARN, Logger::HEARTBEAT)
<< "Heartbeat: Could not read from agency!";
} else {
LOG(TRACE) << "Looking at Sync/Commands/" + _myId;
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "Looking at Sync/Commands/" + _myId;
handleStateChange(result);
@ -355,8 +368,9 @@ void HeartbeatThread::runCoordinator() {
TRI_UseCoordinatorDatabaseServer(_server, i->c_str());
if (vocbase != nullptr) {
LOG(INFO) << "Reloading users for database " << vocbase->_name
<< ".";
LOG_TOPIC(INFO, Logger::HEARTBEAT)
<< "Reloading users for database " << vocbase->_name
<< ".";
if (!fetchUsers(vocbase)) {
// something is wrong... probably the database server
@ -409,7 +423,7 @@ void HeartbeatThread::runCoordinator() {
}
LOG(TRACE) << "stopped heartbeat thread";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "stopped heartbeat thread";
}
////////////////////////////////////////////////////////////////////////////////
@ -431,14 +445,16 @@ bool HeartbeatThread::init() {
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::removeDispatchedJob(ServerJobResult result) {
LOG(TRACE) << "Dispatched job returned!";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Dispatched job returned!";
{
MUTEX_LOCKER(mutexLocker, _statusLock);
if (result.success) {
LOG(DEBUG) << "Sync request successful. Now have Plan " << result.planVersion << ", Current " << result.currentVersion;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Sync request successful. Now have Plan " << result.planVersion
<< ", Current " << result.currentVersion;
_currentVersions = AgencyVersions(result);
} else {
LOG(DEBUG) << "Sync request failed!";
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "Sync request failed!";
// mop: we will retry immediately so wait at least a LITTLE bit
usleep(10000);
}
@ -458,7 +474,7 @@ static std::string const prefixPlanChangeCoordinator = "Plan/Databases";
bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
bool fetchingUsersFailed = false;
LOG(TRACE) << "found a plan update";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "found a plan update";
AgencyCommResult result;
{
@ -489,7 +505,8 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
}
auto nameSlice = options.value.get("name");
if (nameSlice.isNone()) {
LOG (ERR) << "Missing name in agency database plan";
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Missing name in agency database plan";
continue;
}
@ -502,8 +519,9 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
try {
id = std::stoul(v.copyString());
} catch (std::exception const& e) {
LOG(ERR) << "Failed to convert id string to number";
LOG(ERR) << e.what();
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "Failed to convert id string to number";
LOG_TOPIC(ERR, Logger::HEARTBEAT) << e.what();
}
}
}
@ -579,7 +597,8 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
// turn on error logging now
if (!ClusterComm::instance()->enableConnectionErrorLogging(true)) {
LOG(INFO) << "created coordinator databases for the first time";
LOG_TOPIC(INFO, Logger::HEARTBEAT)
<< "created coordinator databases for the first time";
}
return true;
@ -603,15 +622,16 @@ bool HeartbeatThread::syncDBServerStatusQuo() {
}
if (_desiredVersions.plan > _currentVersions.plan) {
LOG(DEBUG) << "Plan version " << _currentVersions.plan
<< " is lower than desired version " << _desiredVersions.plan;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Plan version " << _currentVersions.plan
<< " is lower than desired version " << _desiredVersions.plan;
_isDispatchingChange = true;
becauseOfPlan = true;
}
if (_desiredVersions.current > _currentVersions.current) {
LOG(DEBUG) << "Current version " << _currentVersions.current
<< " is lower than desired version "
<< _desiredVersions.current;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Current version " << _currentVersions.current
<< " is lower than desired version " << _desiredVersions.current;
_isDispatchingChange = true;
becauseOfCurrent = true;
}
@ -628,22 +648,23 @@ bool HeartbeatThread::syncDBServerStatusQuo() {
ci->invalidateCurrent();
}
LOG(TRACE) << "Dispatching Sync";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Dispatching Sync";
// schedule a job for the change
std::unique_ptr<arangodb::rest::Job> job(new ServerJob(this));
auto dispatcher = DispatcherFeature::DISPATCHER;
if (dispatcher == nullptr) {
LOG(ERR) << "could not schedule dbserver sync - dispatcher gone.";
LOG_TOPIC(ERR, Logger::HEARTBEAT)
<< "could not schedule dbserver sync - dispatcher gone.";
return false;
}
if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) {
LOG(TRACE) << "scheduled dbserver sync";
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "scheduled dbserver sync";
return true;
}
MUTEX_LOCKER(mutexLocker, _statusLock);
_isDispatchingChange = false;
LOG(ERR) << "could not schedule dbserver sync";
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "could not schedule dbserver sync";
}
return false;
}
@ -690,9 +711,10 @@ bool HeartbeatThread::sendState() {
if (++_numFails % _maxFailsBeforeWarning == 0) {
std::string const endpoints = AgencyComm::getEndpointsString();
LOG(WARN) << "heartbeat could not be sent to agency endpoints ("
<< endpoints << "): http code: " << result.httpCode()
<< ", body: " << result.body();
LOG_TOPIC(WARN, Logger::HEARTBEAT)
<< "heartbeat could not be sent to agency endpoints ("
<< endpoints << "): http code: " << result.httpCode()
<< ", body: " << result.body();
_numFails = 0;
}
@ -708,7 +730,8 @@ bool HeartbeatThread::fetchUsers(TRI_vocbase_t* vocbase) {
VPackBuilder builder;
builder.openArray();
LOG(TRACE) << "fetching users for database '" << vocbase->_name << "'";
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "fetching users for database '" << vocbase->_name << "'";
int res = usersOnCoordinator(std::string(vocbase->_name), builder, 10.0);
@ -740,12 +763,13 @@ bool HeartbeatThread::fetchUsers(TRI_vocbase_t* vocbase) {
}
if (result) {
LOG(TRACE) << "fetching users for database '" << vocbase->_name
<< "' successful";
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "fetching users for database '" << vocbase->_name << "' successful";
_refetchUsers.erase(vocbase);
} else {
LOG(TRACE) << "fetching users for database '" << vocbase->_name
<< "' failed with error: " << TRI_errno_string(res);
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "fetching users for database '" << vocbase->_name
<< "' failed with error: " << TRI_errno_string(res);
_refetchUsers.insert(vocbase);
}

View File

@ -37,10 +37,12 @@ Mutex LogTopic::_namesLock;
std::map<std::string, LogTopic*> LogTopic::_names;
LogTopic Logger::AGENCY("agency", LogLevel::INFO);
LogTopic Logger::AGENCYCOMM("agencycomm", LogLevel::INFO);
LogTopic Logger::COLLECTOR("collector");
LogTopic Logger::COMPACTOR("compactor");
LogTopic Logger::CONFIG("config");
LogTopic Logger::DATAFILES("datafiles", LogLevel::INFO);
LogTopic Logger::HEARTBEAT("heartbeat", LogLevel::INFO);
LogTopic Logger::MMAP("mmap");
LogTopic Logger::PERFORMANCE("performance", LogLevel::FATAL); // suppress
LogTopic Logger::QUERIES("queries", LogLevel::INFO);

View File

@ -128,10 +128,12 @@ class Logger {
public:
static LogTopic AGENCY;
static LogTopic AGENCYCOMM;
static LogTopic COLLECTOR;
static LogTopic COMPACTOR;
static LogTopic CONFIG;
static LogTopic DATAFILES;
static LogTopic HEARTBEAT;
static LogTopic MMAP;
static LogTopic PERFORMANCE;
static LogTopic QUERIES;