mirror of https://gitee.com/bigwinds/arangodb
Feature/mv thread death logging (#5111)
* Initial low level interface for thread crash reporting (and management). * Add a member version of isClusterRole() * isolate heartbeat thread creation to new StartHeartbeatThread(). create heartbeat thread even if not a cluster or if an agent. * update runDBServer() and runCoordinator() to shutdown more quickly by polling isStopping() at additional locations. * copying updates from different branch / PR * basic thread crash logging. Not yet tied into Agency arangod or have any specific threads posting crashes * make Supervision thread a CriticalThread * sandwich CriticalThread between Thread and other classes to create long term, repeating thread crash reporting. * restore code lost upon branch update relating to new startHeartbeatThread() function * add CriticalThread.cpp to build * add new runAgentServer() function to loop for Agents. Make Heartbeat thread derive from CriticalThread. * remove debug line
This commit is contained in:
parent
a5b903e82c
commit
a84f7805ad
|
@ -149,7 +149,7 @@ struct HealthRecord {
|
|||
std::string Supervision::_agencyPrefix = "/arango";
|
||||
|
||||
Supervision::Supervision()
|
||||
: arangodb::Thread("Supervision"),
|
||||
: arangodb::CriticalThread("Supervision"),
|
||||
_agent(nullptr),
|
||||
_snapshot("Supervision"),
|
||||
_transient("Transient"),
|
||||
|
@ -380,7 +380,7 @@ void handleOnStatus(
|
|||
// Build transaction for removing unattended servers from health monitoring
|
||||
query_t arangodb::consensus::removeTransactionBuilder(
|
||||
std::vector<std::string> const& todelete) {
|
||||
|
||||
|
||||
query_t del = std::make_shared<Builder>();
|
||||
{ VPackArrayBuilder trxs(del.get());
|
||||
{ VPackArrayBuilder trx(del.get());
|
||||
|
@ -392,7 +392,7 @@ query_t arangodb::consensus::removeTransactionBuilder(
|
|||
{ VPackObjectBuilder oper(del.get());
|
||||
del->add("op", VPackValue("delete")); }}}}}
|
||||
return del;
|
||||
|
||||
|
||||
}
|
||||
|
||||
// Check all DB servers, guarded above doChecks
|
||||
|
@ -423,7 +423,7 @@ std::vector<check_t> Supervision::check(std::string const& type) {
|
|||
if (!todelete.empty()) {
|
||||
_agent->write(removeTransactionBuilder(todelete));
|
||||
}
|
||||
|
||||
|
||||
// Do actual monitoring
|
||||
for (auto const& machine : machinesPlanned) {
|
||||
std::string lastHeartbeatStatus, lastHeartbeatAcked, lastHeartbeatTime,
|
||||
|
@ -595,7 +595,7 @@ void Supervision::reportStatus(std::string const& status) {
|
|||
bool persist = false;
|
||||
query_t report;
|
||||
|
||||
{ // Do I have to report to agency under
|
||||
{ // Do I have to report to agency under
|
||||
_lock.assertLockedByCurrentThread();
|
||||
if (_snapshot.hasAsString("/Supervision/State/Mode").first != status) {
|
||||
// This includes the case that the mode is not set, since status
|
||||
|
@ -603,7 +603,7 @@ void Supervision::reportStatus(std::string const& status) {
|
|||
persist = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
report = std::make_shared<VPackBuilder>();
|
||||
{ VPackArrayBuilder trx(report.get());
|
||||
{ VPackObjectBuilder br(report.get());
|
||||
|
@ -617,7 +617,7 @@ void Supervision::reportStatus(std::string const& status) {
|
|||
if (status != "Maintenance") {
|
||||
transient(_agent, *report);
|
||||
}
|
||||
|
||||
|
||||
if (persist) {
|
||||
write_ret_t res = singleWriteTransaction(_agent, *report);
|
||||
}
|
||||
|
@ -667,7 +667,7 @@ void Supervision::run() {
|
|||
TRI_ASSERT(_agent != nullptr);
|
||||
|
||||
while (!this->isStopping()) {
|
||||
|
||||
|
||||
{
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
|
||||
|
@ -682,8 +682,7 @@ void Supervision::run() {
|
|||
// Supervision needs to wait until the agent has finished leadership
|
||||
// preparation or else the local agency snapshot might be behind its
|
||||
// last state.
|
||||
if (
|
||||
_agent->leading() && _agent->getPrepareLeadership() == 0) {
|
||||
if (_agent->leading() && _agent->getPrepareLeadership() == 0) {
|
||||
|
||||
if (_jobId == 0 || _jobId == _jobIdMax) {
|
||||
getUniqueIds(); // cannot fail but only hang
|
||||
|
@ -712,18 +711,18 @@ void Supervision::run() {
|
|||
}
|
||||
|
||||
handleJobs();
|
||||
|
||||
|
||||
} else {
|
||||
|
||||
reportStatus("Maintenance");
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
_cv.wait(static_cast<uint64_t>(1000000 * _frequency));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (shutdown) {
|
||||
ApplicationServer::server->beginShutdown();
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
#include "AgencyCommon.h"
|
||||
#include "Basics/ConditionVariable.h"
|
||||
#include "Basics/Mutex.h"
|
||||
#include "Basics/Thread.h"
|
||||
#include "Cluster/CriticalThread.h"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
|
@ -44,7 +44,7 @@ struct check_t {
|
|||
check_t(std::string const& n, bool g) : good(g), name(n) {}
|
||||
};
|
||||
|
||||
class Supervision : public arangodb::Thread {
|
||||
class Supervision : public arangodb::CriticalThread {
|
||||
public:
|
||||
typedef std::chrono::system_clock::time_point TimePoint;
|
||||
typedef std::string ServerID;
|
||||
|
@ -127,10 +127,10 @@ class Supervision : public arangodb::Thread {
|
|||
/// @brief Upgrade agency with FailedServers an object from array
|
||||
void upgradeZero(VPackBuilder&);
|
||||
|
||||
/// @brief Upgrade agency to supervision overhaul jobs
|
||||
/// @brief Upgrade agency to supervision overhaul jobs
|
||||
void upgradeOne(VPackBuilder&);
|
||||
|
||||
/// @brief Upgrade agency to supervision overhaul jobs
|
||||
/// @brief Upgrade agency to supervision overhaul jobs
|
||||
void upgradeHealthRecords(VPackBuilder&);
|
||||
|
||||
/// @brief Check for inconsistencies in replication factor vs dbs entries
|
||||
|
@ -175,9 +175,9 @@ class Supervision : public arangodb::Thread {
|
|||
bool handleJobs();
|
||||
void handleShutdown();
|
||||
|
||||
/// @brief Migrate chains of distributeShardsLike to depth 1
|
||||
/// @brief Migrate chains of distributeShardsLike to depth 1
|
||||
void fixPrototypeChain(VPackBuilder&);
|
||||
|
||||
|
||||
Mutex _lock; // guards snapshot, _jobId, jobIdMax, _selfShutdown
|
||||
Agent* _agent; /**< @brief My agent */
|
||||
Node _snapshot;
|
||||
|
@ -203,7 +203,7 @@ class Supervision : public arangodb::Thread {
|
|||
bool _selfShutdown;
|
||||
|
||||
std::atomic<bool> _upgraded;
|
||||
|
||||
|
||||
std::string serverHealth(std::string const&);
|
||||
|
||||
static std::string _agencyPrefix; // initialized in AgencyFeature
|
||||
|
|
|
@ -282,6 +282,7 @@ SET(ARANGOD_SOURCES
|
|||
Cluster/ClusterMethods.cpp
|
||||
Cluster/ClusterTraverser.cpp
|
||||
Cluster/CollectionLockState.cpp
|
||||
Cluster/CriticalThread.cpp
|
||||
Cluster/FollowerInfo.cpp
|
||||
Cluster/DBServerAgencySync.cpp
|
||||
Cluster/HeartbeatThread.cpp
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
@ -105,7 +105,11 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
options->addObsoleteOption("--cluster.arangod-path",
|
||||
"path to the arangod for the cluster",
|
||||
true);
|
||||
|
||||
|
||||
options->addOption("--cluster.require-persisted-id",
|
||||
"if set to true, then the instance will only start if a UUID file is found in the database on startup. Setting this option will make sure the instance is started using an already existing database directory and not a new one. For the first start, the UUID file must either be created manually or the option must be set to false for the initial startup",
|
||||
new BooleanParameter(&_requirePersistedId));
|
||||
|
||||
options->addOption("--cluster.require-persisted-id",
|
||||
"if set to true, then the instance will only start if a UUID file is found in the database on startup. Setting this option will make sure the instance is started using an already existing database directory and not a new one. For the first start, the UUID file must either be created manually or the option must be set to false for the initial startup",
|
||||
new BooleanParameter(&_requirePersistedId));
|
||||
|
@ -133,7 +137,7 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
options->addHiddenOption("--cluster.create-waits-for-sync-replication",
|
||||
"active coordinator will wait for all replicas to create collection",
|
||||
new BooleanParameter(&_createWaitsForSyncReplication));
|
||||
|
||||
|
||||
options->addHiddenOption("--cluster.index-create-timeout",
|
||||
"amount of time (in seconds) the coordinator will wait for an index to be created before giving up",
|
||||
new DoubleParameter(&_indexCreationTimeout));
|
||||
|
@ -224,9 +228,16 @@ void ClusterFeature::reportRole(arangodb::ServerState::RoleEnum role) {
|
|||
|
||||
void ClusterFeature::prepare() {
|
||||
auto v8Dealer = ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||
|
||||
if (_enableCluster &&
|
||||
_requirePersistedId &&
|
||||
|
||||
if (_enableCluster &&
|
||||
_requirePersistedId &&
|
||||
!ServerState::instance()->hasPersistedId()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "required persisted UUID file '" << ServerState::instance()->getUuidFilename() << "' not found. Please make sure this instance is started using an already existing database directory";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
if (_enableCluster &&
|
||||
_requirePersistedId &&
|
||||
!ServerState::instance()->hasPersistedId()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "required persisted UUID file '" << ServerState::instance()->getUuidFilename() << "' not found. Please make sure this instance is started using an already existing database directory";
|
||||
FATAL_ERROR_EXIT();
|
||||
|
@ -378,6 +389,7 @@ void ClusterFeature::prepare() {
|
|||
void ClusterFeature::start() {
|
||||
// return if cluster is disabled
|
||||
if (!_enableCluster) {
|
||||
startHeartbeatThread(nullptr, 5000, 5, std::string());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -426,20 +438,7 @@ void ClusterFeature::start() {
|
|||
<< "default value '" << _heartbeatInterval << " ms'";
|
||||
}
|
||||
|
||||
// start heartbeat thread
|
||||
_heartbeatThread = std::make_shared<HeartbeatThread>(
|
||||
_agencyCallbackRegistry.get(), std::chrono::microseconds(_heartbeatInterval * 1000), 5);
|
||||
|
||||
if (!_heartbeatThread->init() || !_heartbeatThread->start()) {
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "heartbeat could not connect to agency endpoints ("
|
||||
<< endpoints << ")";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
while (!_heartbeatThread->isReady()) {
|
||||
// wait until heartbeat is ready
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(10000));
|
||||
}
|
||||
startHeartbeatThread(_agencyCallbackRegistry.get(), _heartbeatInterval, 5, endpoints);
|
||||
|
||||
while (true) {
|
||||
VPackBuilder builder;
|
||||
|
@ -476,23 +475,20 @@ void ClusterFeature::beginShutdown() {
|
|||
|
||||
void ClusterFeature::stop() {
|
||||
|
||||
if (_enableCluster) {
|
||||
if (_heartbeatThread != nullptr) {
|
||||
_heartbeatThread->beginShutdown();
|
||||
}
|
||||
if (_heartbeatThread != nullptr) {
|
||||
_heartbeatThread->beginShutdown();
|
||||
}
|
||||
|
||||
if (_heartbeatThread != nullptr) {
|
||||
int counter = 0;
|
||||
while (_heartbeatThread->isRunning()) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100000));
|
||||
// emit warning after 5 seconds
|
||||
if (++counter == 10 * 5) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish";
|
||||
}
|
||||
if (_heartbeatThread != nullptr) {
|
||||
int counter = 0;
|
||||
while (_heartbeatThread->isRunning()) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100000));
|
||||
// emit warning after 5 seconds
|
||||
if (++counter == 10 * 5) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::CLUSTER) << "waiting for heartbeat thread to finish";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -566,3 +562,25 @@ void ClusterFeature::unprepare() {
|
|||
void ClusterFeature::setUnregisterOnShutdown(bool unregisterOnShutdown) {
|
||||
_unregisterOnShutdown = unregisterOnShutdown;
|
||||
}
|
||||
|
||||
/// @brief common routine to start heartbeat with or without cluster active
|
||||
void ClusterFeature::startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
|
||||
uint64_t interval_ms,
|
||||
uint64_t maxFailsBeforeWarning,
|
||||
const std::string & endpoints) {
|
||||
|
||||
_heartbeatThread = std::make_shared<HeartbeatThread>(
|
||||
agencyCallbackRegistry, std::chrono::microseconds(interval_ms * 1000), maxFailsBeforeWarning);
|
||||
|
||||
if (!_heartbeatThread->init() || !_heartbeatThread->start()) {
|
||||
// failure only occures in cluster mode.
|
||||
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "heartbeat could not connect to agency endpoints ("
|
||||
<< endpoints << ")";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
while (!_heartbeatThread->isReady()) {
|
||||
// wait until heartbeat is ready
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(10000));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,12 @@ class ClusterFeature : public application_features::ApplicationFeature {
|
|||
return _agencyPrefix;
|
||||
}
|
||||
|
||||
protected:
|
||||
void startHeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
|
||||
uint64_t interval_ms,
|
||||
uint64_t maxFailsBeforeWarning,
|
||||
const std::string & endpoints);
|
||||
|
||||
private:
|
||||
std::vector<std::string> _agencyEndpoints;
|
||||
std::string _agencyPrefix;
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// disclaimer
|
||||
///
|
||||
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Matthew Von-Maszewski
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "CriticalThread.h"
|
||||
#include "HeartbeatThread.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief static object to record thread crashes. it is static so that
|
||||
/// it can contain information about threads that crash before HeartbeatThread
|
||||
/// starts (i.e. HeartbeatThread starts late). this list is intentionally
|
||||
/// NEVER PURGED so that it can be reposted to logs regularly
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void CriticalThread::crashNotification(std::exception const & ex)
|
||||
{
|
||||
HeartbeatThread::recordThreadDeath(name());
|
||||
return;
|
||||
};
|
|
@ -0,0 +1,55 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Matthew Von-Maszewski
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_CLUSTER_CRITICAL_THREAD_H
|
||||
#define ARANGOD_CLUSTER_CRITICAL_THREAD_H 1
|
||||
|
||||
#include "Basics/Thread.h"
|
||||
#include "Logger/Logger.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class CriticalThread : public Thread {
|
||||
|
||||
public:
|
||||
// copy constructor and assignment duplicate base class
|
||||
CriticalThread(CriticalThread const&) = delete;
|
||||
CriticalThread& operator=(CriticalThread const&) = delete;
|
||||
|
||||
CriticalThread(std::string const& name, bool deleteOnExit = false)
|
||||
: Thread(name, deleteOnExit) {
|
||||
};
|
||||
|
||||
virtual ~CriticalThread() {};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Notification of when thread crashes via uncaught throw ... log it
|
||||
/// for hourly repeat log
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void crashNotification(std::exception const & ex) override;
|
||||
|
||||
};//class CriticalThread
|
||||
|
||||
}
|
||||
|
||||
#endif // ARANGOD_CLUSTER_CRITICAL_THREAD_H
|
|
@ -1,7 +1,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
/// disclaimer
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
@ -58,6 +58,20 @@ using namespace arangodb::rest;
|
|||
|
||||
std::atomic<bool> HeartbeatThread::HasRunOnce(false);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief static object to record thread crashes. it is static so that
|
||||
/// it can contain information about threads that crash before HeartbeatThread
|
||||
/// starts (i.e. HeartbeatThread starts late). this list is intentionally
|
||||
/// NEVER PURGED so that it can be reposted to logs regularly
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static std::multimap<std::chrono::system_clock::time_point /* when */, const std::string /* threadName */> deadThreads;
|
||||
|
||||
static std::chrono::system_clock::time_point deadThreadsPosted; // defaults to epoch
|
||||
|
||||
static arangodb::Mutex deadThreadsMutex;
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief constructs a heartbeat thread
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -65,7 +79,7 @@ std::atomic<bool> HeartbeatThread::HasRunOnce(false);
|
|||
HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
|
||||
std::chrono::microseconds interval,
|
||||
uint64_t maxFailsBeforeWarning)
|
||||
: Thread("Heartbeat"),
|
||||
: CriticalThread("Heartbeat"),
|
||||
_agencyCallbackRegistry(agencyCallbackRegistry),
|
||||
_statusLock(std::make_shared<Mutex>()),
|
||||
_agency(),
|
||||
|
@ -201,6 +215,8 @@ void HeartbeatThread::run() {
|
|||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "starting heartbeat thread (" << role << ")";
|
||||
|
||||
logThreadDeaths();
|
||||
|
||||
if (ServerState::instance()->isCoordinator(role)) {
|
||||
runCoordinator();
|
||||
} else if (ServerState::instance()->isDBServer(role)) {
|
||||
|
@ -208,13 +224,13 @@ void HeartbeatThread::run() {
|
|||
} else if (ServerState::instance()->isSingleServer(role)) {
|
||||
runSingleServer();
|
||||
} else if (ServerState::instance()->isAgent(role)) {
|
||||
sleep(3);
|
||||
// runAgentServer();
|
||||
runAgentServer();
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "invalid role setup found when starting HeartbeatThread";
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
|
||||
logThreadDeaths(true); // force log
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
|
||||
<< "stopped heartbeat thread (" << role << ")";
|
||||
}
|
||||
|
@ -274,6 +290,7 @@ void HeartbeatThread::runDBServer() {
|
|||
|
||||
|
||||
while (!isStopping()) {
|
||||
logThreadDeaths();
|
||||
|
||||
try {
|
||||
auto const start = std::chrono::steady_clock::now();
|
||||
|
@ -440,6 +457,8 @@ void HeartbeatThread::runSingleServer() {
|
|||
uint64_t lastSentVersion = 0;
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
while (!isStopping()) {
|
||||
logThreadDeaths();
|
||||
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condition);
|
||||
auto remain = _interval - (std::chrono::steady_clock::now() - start);
|
||||
|
@ -704,6 +723,7 @@ void HeartbeatThread::runCoordinator() {
|
|||
|
||||
while (!isStopping()) {
|
||||
try {
|
||||
logThreadDeaths();
|
||||
auto const start = std::chrono::steady_clock::now();
|
||||
// send our state to the agency.
|
||||
// we don't care if this fails
|
||||
|
@ -919,6 +939,28 @@ void HeartbeatThread::runCoordinator() {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief heartbeat main loop, agent version
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void HeartbeatThread::runAgentServer() {
|
||||
|
||||
// simple loop to post dead threads every hour, no other tasks today
|
||||
while (!isStopping()) {
|
||||
logThreadDeaths();
|
||||
|
||||
{
|
||||
CONDITION_LOCKER(locker, _condition);
|
||||
if (!isStopping()) {
|
||||
locker.wait(std::chrono::hours(1));
|
||||
}
|
||||
}
|
||||
} // while
|
||||
|
||||
return;
|
||||
|
||||
} // HeartbeatThread::runAgentServer
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initializes the heartbeat
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1214,3 +1256,51 @@ void HeartbeatThread::updateAgentPool(VPackSlice const& agentPool) {
|
|||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "Cannot find an agency persisted in RAFT 8|";
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief record the death of a thread, adding std::chrono::system_clock::now().
|
||||
/// This is a static function because HeartbeatThread might not have
|
||||
/// started yet
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void HeartbeatThread::recordThreadDeath(const std::string & threadName) {
|
||||
MUTEX_LOCKER(mutexLocker, deadThreadsMutex);
|
||||
|
||||
deadThreads.insert(std::pair<std::chrono::system_clock::time_point, const std::string>
|
||||
(std::chrono::system_clock::now(), threadName));
|
||||
|
||||
return;
|
||||
|
||||
} // HeartbeatThread::recordThreadDeath
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief post list of deadThreads to current log. Called regularly, but only
|
||||
/// posts to log roughly every 60 minutes
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void HeartbeatThread::logThreadDeaths(bool force) {
|
||||
|
||||
bool doLogging(force);
|
||||
|
||||
if (std::chrono::hours(1) < (std::chrono::system_clock::now() - deadThreadsPosted)) {
|
||||
doLogging = true;
|
||||
} // if
|
||||
|
||||
if (doLogging) {
|
||||
deadThreadsPosted = std::chrono::system_clock::now();
|
||||
|
||||
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "HeartbeatThread ok.";
|
||||
|
||||
for (auto const it : deadThreads) {
|
||||
char buffer[40];
|
||||
struct tm gmt;
|
||||
time_t tt = std::chrono::system_clock::to_time_t(it.first);
|
||||
gmtime_r(&tt, &gmt);
|
||||
strftime(buffer, sizeof(buffer), "%FT%TZ", &gmt);
|
||||
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Prior crash of thread " << it.second
|
||||
<< " occurred at " << buffer;
|
||||
} // for
|
||||
} // if
|
||||
|
||||
} // HeartbeatThread::logThreadDeaths
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
@ -30,6 +30,7 @@
|
|||
#include "Basics/ConditionVariable.h"
|
||||
#include "Basics/Mutex.h"
|
||||
#include "Basics/asio-helper.h"
|
||||
#include "Cluster/CriticalThread.h"
|
||||
#include "Cluster/DBServerAgencySync.h"
|
||||
#include "Logger/Logger.h"
|
||||
|
||||
|
@ -51,7 +52,7 @@ struct AgencyVersions {
|
|||
|
||||
class AgencyCallbackRegistry;
|
||||
|
||||
class HeartbeatThread : public Thread,
|
||||
class HeartbeatThread : public CriticalThread,
|
||||
public std::enable_shared_from_this<HeartbeatThread> {
|
||||
public:
|
||||
HeartbeatThread(AgencyCallbackRegistry*, std::chrono::microseconds,
|
||||
|
@ -94,6 +95,20 @@ class HeartbeatThread : public Thread,
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
virtual void beginShutdown() override;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief add thread name to ongoing list of threads that have crashed
|
||||
/// unexpectedly
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static void recordThreadDeath(const std::string & threadName);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief post list of deadThreads to current log. Called regularly, but only
|
||||
/// posts to log roughly every 60 minutes
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static void logThreadDeaths(bool force=false);
|
||||
|
||||
protected:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief heartbeat main loop
|
||||
|
@ -120,6 +135,12 @@ class HeartbeatThread : public Thread,
|
|||
|
||||
void runSingleServer();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief heartbeat main loop, agent version
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void runAgentServer();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief handles a plan change, coordinator case
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -82,7 +82,7 @@ void Thread::startThread(void* arg) {
|
|||
TRI_ASSERT(ptr != nullptr);
|
||||
|
||||
ptr->_threadNumber = LOCAL_THREAD_NUMBER;
|
||||
|
||||
|
||||
LOCAL_THREAD_NAME = ptr->name().c_str();
|
||||
|
||||
if (0 <= ptr->_affinity) {
|
||||
|
@ -96,6 +96,7 @@ void Thread::startThread(void* arg) {
|
|||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(WARN, Logger::THREADS)
|
||||
<< "caught exception in thread '" << ptr->_name << "': " << ex.what();
|
||||
ptr->crashNotification(ex);
|
||||
if (pushed) {
|
||||
WorkMonitor::popThread(ptr);
|
||||
}
|
||||
|
@ -135,7 +136,7 @@ TRI_pid_t Thread::currentProcessId() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint64_t Thread::currentThreadNumber() { return LOCAL_THREAD_NUMBER; }
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief returns the name of the current thread, if set
|
||||
/// note that this function may return a nullptr
|
||||
|
@ -222,7 +223,7 @@ Thread::~Thread() {
|
|||
<< ". shutting down hard";
|
||||
FATAL_ERROR_ABORT();
|
||||
}
|
||||
|
||||
|
||||
LOCAL_THREAD_NAME = nullptr;
|
||||
}
|
||||
|
||||
|
|
|
@ -91,12 +91,12 @@ class Thread {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static uint64_t currentThreadNumber();
|
||||
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief returns the name of the current thread, if set
|
||||
/// note that this function may return a nullptr
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
static char const* currentThreadName();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -246,6 +246,12 @@ class Thread {
|
|||
|
||||
virtual void addStatus(arangodb::velocypack::Builder* b);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief optional notification call when thread gets unplanned exception
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
virtual void crashNotification(std::exception const & ex) {return;};
|
||||
|
||||
protected:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief the thread program
|
||||
|
|
Loading…
Reference in New Issue