mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
577c2aee11
|
@ -172,112 +172,117 @@ void HeartbeatThread::runDBServer() {
|
|||
int currentCount = currentCountStart;
|
||||
|
||||
while (!isStopping()) {
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sending heartbeat to agency";
|
||||
try {
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sending heartbeat to agency";
|
||||
|
||||
double const start = TRI_microtime();
|
||||
double const start = TRI_microtime();
|
||||
// send our state to the agency.
|
||||
// we don't care if this fails
|
||||
sendState();
|
||||
|
||||
// send our state to the agency.
|
||||
// we don't care if this fails
|
||||
sendState();
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
if (--currentCount == 0) {
|
||||
currentCount = currentCountStart;
|
||||
|
||||
if (--currentCount == 0) {
|
||||
currentCount = currentCountStart;
|
||||
|
||||
// send an initial GET request to Sync/Commands/my-id
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Looking at Sync/Commands/" + _myId;
|
||||
|
||||
AgencyReadTransaction trx(std::vector<std::string>(
|
||||
{_agency.prefixPath() + "Shutdown",
|
||||
_agency.prefixPath() + "Current/Version",
|
||||
_agency.prefixPath() + "Sync/Commands/" + _myId
|
||||
}));
|
||||
|
||||
AgencyCommResult result = _agency.sendTransactionWithFailover(trx);
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "Heartbeat: Could not read from agency!";
|
||||
} else {
|
||||
VPackSlice shutdownSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Shutdown"})
|
||||
);
|
||||
|
||||
if (shutdownSlice.isBool() && shutdownSlice.getBool()) {
|
||||
ApplicationServer::server->beginShutdown();
|
||||
break;
|
||||
}
|
||||
// send an initial GET request to Sync/Commands/my-id
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Looking at Sync/Commands/" + _myId;
|
||||
handleStateChange(result);
|
||||
|
||||
VPackSlice s = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), std::string("Current"),
|
||||
std::string("Version")}));
|
||||
if (!s.isInteger()) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT)
|
||||
<< "Current/Version in agency is not an integer.";
|
||||
|
||||
AgencyReadTransaction trx(std::vector<std::string>(
|
||||
{_agency.prefixPath() + "Shutdown",
|
||||
_agency.prefixPath() + "Current/Version",
|
||||
_agency.prefixPath() + "Sync/Commands/" + _myId
|
||||
}));
|
||||
|
||||
AgencyCommResult result = _agency.sendTransactionWithFailover(trx);
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "Heartbeat: Could not read from agency!";
|
||||
} else {
|
||||
uint64_t currentVersion = 0;
|
||||
try {
|
||||
currentVersion = s.getUInt();
|
||||
} catch (...) {
|
||||
VPackSlice shutdownSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Shutdown"})
|
||||
);
|
||||
|
||||
if (shutdownSlice.isBool() && shutdownSlice.getBool()) {
|
||||
ApplicationServer::server->beginShutdown();
|
||||
break;
|
||||
}
|
||||
if (currentVersion == 0) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Looking at Sync/Commands/" + _myId;
|
||||
handleStateChange(result);
|
||||
|
||||
VPackSlice s = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), std::string("Current"),
|
||||
std::string("Version")}));
|
||||
if (!s.isInteger()) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT)
|
||||
<< "Current/Version in agency is 0.";
|
||||
<< "Current/Version in agency is not an integer.";
|
||||
} else {
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
if (currentVersion > _desiredVersions.current) {
|
||||
_desiredVersions.current = currentVersion;
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
|
||||
<< "Found greater Current/Version in agency.";
|
||||
}
|
||||
uint64_t currentVersion = 0;
|
||||
try {
|
||||
currentVersion = s.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
if (currentVersion == 0) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT)
|
||||
<< "Current/Version in agency is 0.";
|
||||
} else {
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
if (currentVersion > _desiredVersions.current) {
|
||||
_desiredVersions.current = currentVersion;
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
|
||||
<< "Found greater Current/Version in agency.";
|
||||
}
|
||||
}
|
||||
syncDBServerStatusQuo();
|
||||
}
|
||||
syncDBServerStatusQuo();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
// mop: execute at least once
|
||||
do {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Entering update loop";
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
// mop: execute at least once
|
||||
do {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Entering update loop";
|
||||
|
||||
bool wasNotified;
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condition);
|
||||
wasNotified = _wasNotified;
|
||||
if (!wasNotified) {
|
||||
if (remain > 0.0) {
|
||||
locker.wait(static_cast<uint64_t>(remain * 1000000.0));
|
||||
wasNotified = _wasNotified;
|
||||
}
|
||||
}
|
||||
_wasNotified = false;
|
||||
}
|
||||
|
||||
bool wasNotified;
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condition);
|
||||
wasNotified = _wasNotified;
|
||||
if (!wasNotified) {
|
||||
if (remain > 0.0) {
|
||||
locker.wait(static_cast<uint64_t>(remain * 1000000.0));
|
||||
wasNotified = _wasNotified;
|
||||
}
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Lock reached timeout";
|
||||
planAgencyCallback->refetchAndUpdate(true);
|
||||
} else {
|
||||
// mop: a plan change returned successfully...
|
||||
// recheck and redispatch in case our desired versions increased
|
||||
// in the meantime
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "wasNotified==true";
|
||||
syncDBServerStatusQuo();
|
||||
}
|
||||
_wasNotified = false;
|
||||
}
|
||||
|
||||
if (!wasNotified) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Lock reached timeout";
|
||||
planAgencyCallback->refetchAndUpdate(true);
|
||||
} else {
|
||||
// mop: a plan change returned successfully...
|
||||
// recheck and redispatch in case our desired versions increased
|
||||
// in the meantime
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "wasNotified==true";
|
||||
syncDBServerStatusQuo();
|
||||
}
|
||||
remain = interval - (TRI_microtime() - start);
|
||||
} while (remain > 0);
|
||||
remain = interval - (TRI_microtime() - start);
|
||||
} while (remain > 0);
|
||||
} catch (std::exception const& e) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Got an exception in DBServer heartbeat: " << e.what();
|
||||
} catch (...) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Got an unknown exception in DBServer heartbeat";
|
||||
}
|
||||
}
|
||||
|
||||
_agencyCallbackRegistry->unregisterCallback(planAgencyCallback);
|
||||
|
@ -318,138 +323,144 @@ void HeartbeatThread::runCoordinator() {
|
|||
setReady();
|
||||
|
||||
while (!isStopping()) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "sending heartbeat to agency";
|
||||
try {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "sending heartbeat to agency";
|
||||
|
||||
double const start = TRI_microtime();
|
||||
// send our state to the agency.
|
||||
// we don't care if this fails
|
||||
sendState();
|
||||
double const start = TRI_microtime();
|
||||
// send our state to the agency.
|
||||
// we don't care if this fails
|
||||
sendState();
|
||||
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
|
||||
AgencyReadTransaction trx(std::vector<std::string>(
|
||||
{_agency.prefixPath() + "Shutdown",
|
||||
_agency.prefixPath() + "Plan/Version",
|
||||
_agency.prefixPath() + "Current/Version",
|
||||
_agency.prefixPath() + "Current/Foxxmaster",
|
||||
_agency.prefixPath() + "Current/FoxxmasterQueueupdate",
|
||||
_agency.prefixPath() + "Sync/Commands/" + _myId,
|
||||
_agency.prefixPath() + "Sync/UserVersion"}));
|
||||
AgencyCommResult result = _agency.sendTransactionWithFailover(trx);
|
||||
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "Heartbeat: Could not read from agency!";
|
||||
} else {
|
||||
VPackSlice shutdownSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Shutdown"})
|
||||
);
|
||||
|
||||
if (shutdownSlice.isBool() && shutdownSlice.getBool()) {
|
||||
ApplicationServer::server->beginShutdown();
|
||||
if (isStopping()) {
|
||||
break;
|
||||
}
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Looking at Sync/Commands/" + _myId;
|
||||
AgencyReadTransaction trx(std::vector<std::string>(
|
||||
{_agency.prefixPath() + "Shutdown",
|
||||
_agency.prefixPath() + "Plan/Version",
|
||||
_agency.prefixPath() + "Current/Version",
|
||||
_agency.prefixPath() + "Current/Foxxmaster",
|
||||
_agency.prefixPath() + "Current/FoxxmasterQueueupdate",
|
||||
_agency.prefixPath() + "Sync/Commands/" + _myId,
|
||||
_agency.prefixPath() + "Sync/UserVersion"}));
|
||||
AgencyCommResult result = _agency.sendTransactionWithFailover(trx);
|
||||
|
||||
handleStateChange(result);
|
||||
|
||||
// mop: order is actually important here...FoxxmasterQueueupdate will be set only when somebody
|
||||
// registers some new queue stuff (for example on a different coordinator than this one)...
|
||||
// However when we are just about to become the new foxxmaster we must immediately refresh our queues
|
||||
// this is done in ServerState...if queueupdate is set after foxxmaster the change will be reset again
|
||||
VPackSlice foxxmasterQueueupdateSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "FoxxmasterQueueupdate"})
|
||||
);
|
||||
|
||||
if (foxxmasterQueueupdateSlice.isBool()) {
|
||||
ServerState::instance()->setFoxxmasterQueueupdate(foxxmasterQueueupdateSlice.getBool());
|
||||
}
|
||||
if (!result.successful()) {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "Heartbeat: Could not read from agency!";
|
||||
} else {
|
||||
VPackSlice shutdownSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Shutdown"})
|
||||
);
|
||||
|
||||
VPackSlice foxxmasterSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Foxxmaster"})
|
||||
);
|
||||
|
||||
if (foxxmasterSlice.isString()) {
|
||||
ServerState::instance()->setFoxxmaster(foxxmasterSlice.copyString());
|
||||
}
|
||||
|
||||
VPackSlice versionSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Plan", "Version"}));
|
||||
|
||||
if (versionSlice.isInteger()) {
|
||||
// there is a plan version
|
||||
|
||||
uint64_t planVersion = 0;
|
||||
try {
|
||||
planVersion = versionSlice.getUInt();
|
||||
} catch (...) {
|
||||
if (shutdownSlice.isBool() && shutdownSlice.getBool()) {
|
||||
ApplicationServer::server->beginShutdown();
|
||||
break;
|
||||
}
|
||||
|
||||
if (planVersion > lastPlanVersionNoticed) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Found planVersion " << planVersion << " which is newer than "
|
||||
<< lastPlanVersionNoticed;
|
||||
if (handlePlanChangeCoordinator(planVersion)) {
|
||||
lastPlanVersionNoticed = planVersion;
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "handlePlanChangeCoordinator was unsuccessful";
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Looking at Sync/Commands/" + _myId;
|
||||
|
||||
handleStateChange(result);
|
||||
|
||||
// mop: order is actually important here...FoxxmasterQueueupdate will be set only when somebody
|
||||
// registers some new queue stuff (for example on a different coordinator than this one)...
|
||||
// However when we are just about to become the new foxxmaster we must immediately refresh our queues
|
||||
// this is done in ServerState...if queueupdate is set after foxxmaster the change will be reset again
|
||||
VPackSlice foxxmasterQueueupdateSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "FoxxmasterQueueupdate"})
|
||||
);
|
||||
|
||||
if (foxxmasterQueueupdateSlice.isBool()) {
|
||||
ServerState::instance()->setFoxxmasterQueueupdate(foxxmasterQueueupdateSlice.getBool());
|
||||
}
|
||||
|
||||
VPackSlice foxxmasterSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Foxxmaster"})
|
||||
);
|
||||
|
||||
if (foxxmasterSlice.isString()) {
|
||||
ServerState::instance()->setFoxxmaster(foxxmasterSlice.copyString());
|
||||
}
|
||||
|
||||
VPackSlice versionSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Plan", "Version"}));
|
||||
|
||||
if (versionSlice.isInteger()) {
|
||||
// there is a plan version
|
||||
|
||||
uint64_t planVersion = 0;
|
||||
try {
|
||||
planVersion = versionSlice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
if (planVersion > lastPlanVersionNoticed) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Found planVersion " << planVersion << " which is newer than "
|
||||
<< lastPlanVersionNoticed;
|
||||
if (handlePlanChangeCoordinator(planVersion)) {
|
||||
lastPlanVersionNoticed = planVersion;
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT)
|
||||
<< "handlePlanChangeCoordinator was unsuccessful";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VPackSlice slice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Sync", "UserVersion"}));
|
||||
|
||||
if (slice.isInteger()) {
|
||||
// there is a UserVersion
|
||||
uint64_t userVersion = 0;
|
||||
try {
|
||||
userVersion = slice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
if (userVersion > 0 && userVersion != oldUserVersion) {
|
||||
oldUserVersion = userVersion;
|
||||
GeneralServerFeature::AUTH_INFO.outdate();
|
||||
}
|
||||
}
|
||||
|
||||
versionSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Version"}));
|
||||
if (versionSlice.isInteger()) {
|
||||
uint64_t currentVersion = 0;
|
||||
try {
|
||||
currentVersion = versionSlice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
if (currentVersion > lastCurrentVersionNoticed) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Found currentVersion " << currentVersion
|
||||
<< " which is newer than " << lastCurrentVersionNoticed;
|
||||
lastCurrentVersionNoticed = currentVersion;
|
||||
|
||||
ClusterInfo::instance()->invalidateCurrent();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VPackSlice slice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Sync", "UserVersion"}));
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
|
||||
if (slice.isInteger()) {
|
||||
// there is a UserVersion
|
||||
uint64_t userVersion = 0;
|
||||
try {
|
||||
userVersion = slice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
|
||||
if (userVersion > 0 && userVersion != oldUserVersion) {
|
||||
oldUserVersion = userVersion;
|
||||
GeneralServerFeature::AUTH_INFO.outdate();
|
||||
// sleep for a while if appropriate, on some systems usleep does not
|
||||
// like arguments greater than 1000000
|
||||
while (remain > 0.0) {
|
||||
if (remain >= 0.5) {
|
||||
usleep(500000);
|
||||
remain -= 0.5;
|
||||
} else {
|
||||
usleep((unsigned long)(remain * 1000.0 * 1000.0));
|
||||
remain = 0.0;
|
||||
}
|
||||
}
|
||||
|
||||
versionSlice = result.slice()[0].get(
|
||||
std::vector<std::string>({_agency.prefix(), "Current", "Version"}));
|
||||
if (versionSlice.isInteger()) {
|
||||
uint64_t currentVersion = 0;
|
||||
try {
|
||||
currentVersion = versionSlice.getUInt();
|
||||
} catch (...) {
|
||||
}
|
||||
if (currentVersion > lastCurrentVersionNoticed) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "Found currentVersion " << currentVersion
|
||||
<< " which is newer than " << lastCurrentVersionNoticed;
|
||||
lastCurrentVersionNoticed = currentVersion;
|
||||
|
||||
ClusterInfo::instance()->invalidateCurrent();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
double remain = interval - (TRI_microtime() - start);
|
||||
|
||||
// sleep for a while if appropriate, on some systems usleep does not
|
||||
// like arguments greater than 1000000
|
||||
while (remain > 0.0) {
|
||||
if (remain >= 0.5) {
|
||||
usleep(500000);
|
||||
remain -= 0.5;
|
||||
} else {
|
||||
usleep((unsigned long)(remain * 1000.0 * 1000.0));
|
||||
remain = 0.0;
|
||||
}
|
||||
} catch (std::exception const& e) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Got an exception in coordinator heartbeat: " << e.what();
|
||||
} catch (...) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Got an unknown exception in coordinator heartbeat";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue