1
0
Fork 0

Dedicated thread for Phase 1&2 (#6342)

* First draft of dedicated thread for phase 1 and phase 2.

* Added comments and removed old code.
This commit is contained in:
Lars Maier 2018-09-06 12:14:53 +02:00 committed by Max Neunhöffer
parent 4bb9299f2d
commit 798375fbcd
3 changed files with 152 additions and 129 deletions

View File

@ -73,6 +73,115 @@ static std::chrono::system_clock::time_point deadThreadsPosted; // defaults to
static arangodb::Mutex deadThreadsMutex;
namespace arangodb {
class HeartbeatBackgroundJobThread : public Thread {
public:
HeartbeatBackgroundJobThread(HeartbeatThread *heartbeatThread) :
Thread("Maintenance"),
_heartbeatThread(heartbeatThread),
_stop(false),
_sleeping(false),
_backgroundJobsLaunched(0)
{}
~HeartbeatBackgroundJobThread() { shutdown(); }
//////////////////////////////////////////////////////////////////////////////
/// @brief asks the thread to stop, but does not wait.
//////////////////////////////////////////////////////////////////////////////
void stop() {
std::unique_lock<std::mutex> guard(_mutex);
_stop = true;
_condition.notify_one();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief notifies the background thread: when the thread is sleeping, wakes
/// it up. Otherwise sets a flag to start another round.
//////////////////////////////////////////////////////////////////////////////
void notify() {
std::unique_lock<std::mutex> guard(_mutex);
_anotherRun.store(true, std::memory_order_release);
if (_sleeping.load(std::memory_order_acquire)) {
_condition.notify_one();
}
}
protected:
void run() override {
while (!_stop) {
{
std::unique_lock<std::mutex> guard(_mutex);
if (!_anotherRun.load(std::memory_order_acquire)) {
_sleeping.store(true, std::memory_order_release);
while (true) {
_condition.wait(guard);
if (_stop) {
return ;
} else if (_anotherRun) {
break ;
} // otherwise spurious wakeup
}
_sleeping.store(false, std::memory_order_release);
}
_anotherRun.store(false, std::memory_order_release);
}
// execute schmutz here
uint64_t jobNr = ++_backgroundJobsLaunched;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
{
DBServerAgencySync job(_heartbeatThread);
job.work();
}
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;
}
}
private:
HeartbeatThread *_heartbeatThread;
std::mutex _mutex;
//////////////////////////////////////////////////////////////////////////////
/// @brief used to wake up the background thread
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::condition_variable _condition;
//////////////////////////////////////////////////////////////////////////////
/// @brief Set by the HeartbeatThread when the BackgroundThread should stop
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::atomic<bool> _stop;
//////////////////////////////////////////////////////////////////////////////
/// @brief wether the background thread sleeps or not
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::atomic<bool> _sleeping;
//////////////////////////////////////////////////////////////////////////////
/// @brief when awake, the background thread will execute another round of
/// phase 1 and phase 2, after resetting this to false
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::atomic<bool> _anotherRun;
uint64_t _backgroundJobsLaunched;
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a heartbeat thread
@ -97,93 +206,22 @@ HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
_desiredVersions(std::make_shared<AgencyVersions>(0, 0)),
_wasNotified(false),
_backgroundJobsPosted(0),
_backgroundJobsLaunched(0),
_backgroundJobScheduledOrRunning(false),
_launchAnotherBackgroundJob(false),
_lastSyncTime(0) {
_lastSyncTime(0),
_maintenanceThread(nullptr) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroys a heartbeat thread
////////////////////////////////////////////////////////////////////////////////
HeartbeatThread::~HeartbeatThread() { shutdown(); }
////////////////////////////////////////////////////////////////////////////////
/// @brief running of heartbeat background jobs (in JavaScript), we run
/// these by instantiating an object in class HeartbeatBackgroundJob,
/// which is a std::function<void()> and holds a shared_ptr to the
/// HeartbeatThread singleton itself. This instance is then posted to
/// the io_service for execution in the thread pool. Should the heartbeat
/// thread itself terminate during shutdown, then the HeartbeatThread
/// singleton itself is still kept alive by the shared_ptr in the instance
/// of HeartbeatBackgroundJob. The operator() method simply calls the
/// runBackgroundJob() method of the heartbeat thread. Should this have
/// to schedule another background job, then it can simply create a new
/// HeartbeatBackgroundJob instance, again using shared_from_this() to
/// create a new shared_ptr keeping the HeartbeatThread object alive.
////////////////////////////////////////////////////////////////////////////////
class HeartbeatBackgroundJob {
std::shared_ptr<HeartbeatThread> _heartbeatThread;
double _startTime;
std::string _schedulerInfo;
public:
explicit HeartbeatBackgroundJob(std::shared_ptr<HeartbeatThread> hbt,
double startTime)
: _heartbeatThread(hbt), _startTime(startTime),_schedulerInfo(SchedulerFeature::SCHEDULER->infoStatus()) {
}
void operator()() {
// first tell the scheduler that this thread is working:
JobGuard guard(SchedulerFeature::SCHEDULER);
guard.work();
double now = TRI_microtime();
if (now > _startTime + 5.0) {
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "ALARM: Scheduling background job "
"took " << now - _startTime
<< " seconds, scheduler info at schedule time: " << _schedulerInfo
<< ", scheduler info now: "
<< SchedulerFeature::SCHEDULER->infoStatus();
}
_heartbeatThread->runBackgroundJob();
}
};
////////////////////////////////////////////////////////////////////////////////
/// @brief method runBackgroundJob()
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::runBackgroundJob() {
uint64_t jobNr = ++_backgroundJobsLaunched;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
{
DBServerAgencySync job(this);
job.work();
}
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;
{
MUTEX_LOCKER(mutexLocker, *_statusLock);
TRI_ASSERT(_backgroundJobScheduledOrRunning);
if (_launchAnotherBackgroundJob) {
jobNr = ++_backgroundJobsPosted;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail " << jobNr;
_launchAnotherBackgroundJob = false;
// the JobGuard is in the operator() of HeartbeatBackgroundJob
_lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(
HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
} else {
_backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false;
}
HeartbeatThread::~HeartbeatThread() {
if (_maintenanceThread) {
_maintenanceThread->stop();
}
shutdown();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief heartbeat main loop
/// the heartbeat thread constantly reports the current server status to the
@ -198,6 +236,7 @@ void HeartbeatThread::runBackgroundJob() {
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::run() {
ServerState::RoleEnum role = ServerState::instance()->getRole();
// mop: the heartbeat thread itself is now ready
@ -248,6 +287,12 @@ void HeartbeatThread::run() {
void HeartbeatThread::runDBServer() {
_maintenanceThread = std::make_unique<HeartbeatBackgroundJobThread>(this);
if (!_maintenanceThread->start()) {
// WHAT TO DO NOW?
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Failed to start dedicated thread for maintenance";
}
std::function<bool(VPackSlice const& result)> updatePlan =
[=](VPackSlice const& result) {
@ -612,7 +657,7 @@ void HeartbeatThread::runSingleServer() {
continue; // try again next time
}
}
TRI_voc_tick_t lastTick = 0; // we always want to set lastTick
auto sendTransient = [&]() {
VPackBuilder builder;
@ -631,7 +676,7 @@ void HeartbeatThread::runSingleServer() {
applier->stopAndJoin();
}
lastTick = EngineSelectorFeature::ENGINE->currentTick();
// put the leader in optional read-only mode
auto readOnlySlice = response.get(std::vector<std::string>(
{AgencyCommManager::path(), "Readonly"}));
@ -654,7 +699,7 @@ void HeartbeatThread::runSingleServer() {
ServerState::instance()->setFoxxmaster(leaderStr); // leader is foxxmater
ServerState::instance()->setReadOnly(true); // Disable writes with dirty-read header
std::string endpoint = ci->getServerEndpoint(leaderStr);
if (endpoint.empty()) {
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Failed to resolve leader endpoint";
@ -748,7 +793,7 @@ void HeartbeatThread::updateServerMode(VPackSlice const& readOnlySlice) {
if (readOnlySlice.isBoolean()) {
readOnly = readOnlySlice.getBool();
}
ServerState::instance()->setReadOnly(readOnly);
}
@ -1086,7 +1131,7 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
}
std::string const name = options.value.get("name").copyString();
TRI_ASSERT(!name.empty());
VPackSlice const idSlice = options.value.get("id");
if (!idSlice.isString()) {
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Missing id in agency database plan";
@ -1168,7 +1213,7 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
MUTEX_LOCKER(mutexLocker, *_statusLock);
bool shouldUpdate = false;
if (_desiredVersions->plan > _currentVersions.plan) {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Plan version " << _currentVersions.plan
@ -1181,18 +1226,18 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
<< " is lower than desired version " << _desiredVersions->current;
shouldUpdate = true;
}
// 7.4 seconds is just less than half the 15 seconds agency uses to declare dead server,
// perform a safety execution of job in case other plan changes somehow incomplete or undetected
double now = TRI_microtime();
if (now > _lastSyncTime + 7.4 || asyncPush) {
shouldUpdate = true;
}
if (!shouldUpdate) {
return;
}
// First invalidate the caches in ClusterInfo:
auto ci = ClusterInfo::instance();
if (_desiredVersions->plan > ci->getPlanVersion()) {
@ -1201,22 +1246,15 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
if (_desiredVersions->current > ci->getCurrentVersion()) {
ci->invalidateCurrent();
}
if (_backgroundJobScheduledOrRunning) {
_launchAnotherBackgroundJob = true;
return;
}
// schedule a job for the change:
uint64_t jobNr = ++_backgroundJobsPosted;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync " << jobNr;
_backgroundJobScheduledOrRunning = true;
// the JobGuard is in the operator() of HeartbeatBackgroundJob
_lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(
HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
TRI_ASSERT(_maintenanceThread != nullptr);
_maintenanceThread->notify();
}
////////////////////////////////////////////////////////////////////////////////
@ -1289,7 +1327,7 @@ void HeartbeatThread::logThreadDeaths(bool force) {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "HeartbeatThread ok.";
std::string buffer;
buffer.reserve(40);
for (auto const& it : deadThreads) {
buffer = date::format("%FT%TZ", date::floor<std::chrono::milliseconds>(it.first));

View File

@ -50,6 +50,7 @@ struct AgencyVersions {
};
class AgencyCallbackRegistry;
class HeartbeatBackgroundJobThread;
class HeartbeatThread : public CriticalThread,
public std::enable_shared_from_this<HeartbeatThread> {
@ -77,7 +78,7 @@ class HeartbeatThread : public CriticalThread,
void setReady() { _ready.store(true); }
void runBackgroundJob();
//void runBackgroundJob();
void dispatchedJobResult(DBServerAgencySyncResult);
@ -269,31 +270,14 @@ public:
std::atomic<uint64_t> _backgroundJobsPosted;
//////////////////////////////////////////////////////////////////////////////
/// @brief number of background jobs that have been launched by the scheduler
//////////////////////////////////////////////////////////////////////////////
std::atomic<uint64_t> _backgroundJobsLaunched;
//////////////////////////////////////////////////////////////////////////////
/// @brief flag indicates whether or not a background job is either
/// scheduled with boost::asio or is already running, this and the
/// next one about having to start another background job when the
/// current one is finished are protected by the statusLock.
//////////////////////////////////////////////////////////////////////////////
bool _backgroundJobScheduledOrRunning;
//////////////////////////////////////////////////////////////////////////////
/// @brief flag indicates whether or not a new background job needs
/// to be started when the current one has terminated. This and the
/// previous one are protected by the statusLock.
//////////////////////////////////////////////////////////////////////////////
bool _launchAnotherBackgroundJob;
// when was the javascript sync routine last run?
// when was the sync routine last run?
double _lastSyncTime;
//////////////////////////////////////////////////////////////////////////////
/// @brief handle of the dedicated thread to execute the phase 1 and phase 2
/// code. Only created on dbservers.
//////////////////////////////////////////////////////////////////////////////
std::unique_ptr<HeartbeatBackgroundJobThread> _maintenanceThread;
};
}

View File

@ -152,8 +152,6 @@ bool UpdateCollection::first() {
if (!_result.ok()) {
LOG_TOPIC(ERR, Logger::MAINTENANCE) << "failed to update properties"
" of collection " << shard << ": " << _result.errorMessage();
_feature.storeShardError(database, collection, shard,
_description.get(SERVER_ID), _result);
}
});
@ -163,14 +161,17 @@ bool UpdateCollection::first() {
<< "in database " + database;
LOG_TOPIC(ERR, Logger::MAINTENANCE) << error.str();
_result = actionError(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, error.str());
return false;
}
} catch (std::exception const& e) {
std::stringstream error;
error << "action " << _description << " failed with exception " << e.what();
LOG_TOPIC(WARN, Logger::MAINTENANCE) << "UpdateCollection: " << error.str();
_result.reset(TRI_ERROR_INTERNAL, error.str());
return false;
}
if (_result.fail()) {
_feature.storeShardError(database, collection, shard,
_description.get(SERVER_ID), _result);
}
notify();