1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
Wilfried Goesgens 2017-03-16 14:57:12 +01:00
commit 9d3740d098
20 changed files with 530 additions and 538 deletions

View File

@ -5,10 +5,12 @@
@RESTDESCRIPTION
Returns an array of objects, which each have
the attribute `endpoint`, whose value is a string with the endpoint
description. There is an entry for each coordinator in the cluster.
This method only works on coordinators in cluster mode.
Returns an object with an attribute `endpoints`, which contains an
array of objects, which each have the attribute `endpoint`, whose value
is a string with the endpoint description. There is an entry for each
coordinator in the cluster. This method only works on coordinators in
cluster mode. In case of an error the `error` attribute is set to
`true`.
@RESTRETURNCODES

View File

@ -40,6 +40,7 @@ class RestAgencyHandler : public RestBaseHandler {
public:
char const* name() const override final { return "RestAgencyHandler"; }
bool isDirect() const override;
bool needsOwnThread() const { return true; }
RestStatus execute() override;
private:

View File

@ -41,6 +41,7 @@ class RestAgencyPrivHandler : public arangodb::RestBaseHandler {
public:
char const* name() const override final { return "RestAgencyPrivHandler"; }
bool isDirect() const override;
bool needsOwnThread() const { return true; }
RestStatus execute() override;
private:

View File

@ -28,6 +28,7 @@
#include "Basics/HybridLogicalClock.h"
#include "Basics/MutexLocker.h"
#include "Basics/StaticStrings.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/AsyncJobManager.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
@ -179,7 +180,6 @@ void GeneralCommTask::processResponse(GeneralResponse* response) {
if (response == nullptr) {
LOG_TOPIC(WARN, Logger::COMMUNICATION)
<< "processResponse received a nullptr, closing connection";
closeStream();
} else {
addResponse(response, nullptr);
@ -194,7 +194,7 @@ RequestStatistics* GeneralCommTask::acquireStatistics(uint64_t id) {
RequestStatistics* GeneralCommTask::statistics(uint64_t id) {
MUTEX_LOCKER(locker, _statisticsMutex);
auto iter = _statisticsMap.find(id);
if (iter == _statisticsMap.end()) {
@ -229,38 +229,46 @@ void GeneralCommTask::transferStatisticsTo(uint64_t id, RestHandler* handler) {
// -----------------------------------------------------------------------------
bool GeneralCommTask::handleRequest(std::shared_ptr<RestHandler> handler) {
bool isDirect = false;
bool isPrio = false;
if (handler->isDirect()) {
isDirect = true;
} else if (_loop._scheduler->hasQueueCapacity()) {
isDirect = true;
} else if (ServerState::instance()->isDBServer()) {
isPrio = true;
} else if (handler->needsOwnThread()) {
isPrio = true;
} else if (handler->queue() == JobQueue::AQL_QUEUE) {
isPrio = true;
}
if (isDirect) {
handleRequestDirectly(std::move(handler));
return true;
}
if (_loop._scheduler->isIdle()) {
handleRequestDirectly(std::move(handler));
return true;
}
auto self = shared_from_this();
bool startThread = handler->needsOwnThread();
if (startThread) {
handleRequestDirectly(std::move(handler));
if (isPrio) {
SchedulerFeature::SCHEDULER->post(
[self, this, handler]() { handleRequestDirectly(std::move(handler)); });
return true;
}
// ok, we need to queue the request
LOG_TOPIC(TRACE, Logger::THREADS) << "too much work, queuing handler: "
<< _loop._scheduler->infoStatus();
size_t queue = handler->queue();
uint64_t messageId = handler->messageId();
auto self = shared_from_this();
std::unique_ptr<Job> job(
new Job(_server, std::move(handler),
[self, this](std::shared_ptr<RestHandler> h) {
handleRequestDirectly(h);
}));
bool ok =
SchedulerFeature::SCHEDULER->jobQueue()->queue(queue, std::move(job));
bool ok = SchedulerFeature::SCHEDULER->queue(std::move(job));
if (!ok) {
handleSimpleError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_QUEUE_FULL,
@ -274,8 +282,8 @@ void GeneralCommTask::handleRequestDirectly(
std::shared_ptr<RestHandler> handler) {
auto self = shared_from_this();
handler->initEngine(_loop, [self, this](RestHandler* h) {
RequestStatistics* stat = h->stealStatistics();
addResponse(h->response(), stat);
RequestStatistics* stat = h->stealStatistics();
addResponse(h->response(), stat);
});
HandlerWorkStack monitor(handler);
@ -309,14 +317,11 @@ bool GeneralCommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
}
// queue this job
size_t queue = handler->queue();
auto self = shared_from_this();
auto job =
std::make_unique<Job>(_server, std::move(handler),
[self, this](std::shared_ptr<RestHandler> h) {
h->asyncRunEngine();
});
auto job = std::make_unique<Job>(
_server, std::move(handler),
[self, this](std::shared_ptr<RestHandler> h) { h->asyncRunEngine(); });
return SchedulerFeature::SCHEDULER->jobQueue()->queue(queue, std::move(job));
return SchedulerFeature::SCHEDULER->queue(std::move(job));
}

View File

@ -91,6 +91,8 @@ class GeneralCommTask : public SocketTask {
virtual arangodb::Endpoint::TransportType transportType() = 0;
void setStatistics(uint64_t, RequestStatistics*);
protected:
virtual std::unique_ptr<GeneralResponse> createResponse(
rest::ResponseCode, uint64_t messageId) = 0;
@ -109,7 +111,6 @@ class GeneralCommTask : public SocketTask {
std::string const& errorMessage,
uint64_t messageId) = 0;
void setStatistics(uint64_t, RequestStatistics*);
RequestStatistics* acquireStatistics(uint64_t);
RequestStatistics* statistics(uint64_t);
RequestStatistics* stealStatistics(uint64_t);

View File

@ -2266,7 +2266,8 @@ int MMFilesCollection::beginReadTimed(bool useDeadlockDetector,
// std::cout << "BeginReadTimed: " << _name << std::endl;
int iterations = 0;
bool wasBlocked = false;
double end = 0.0;
uint64_t waitTime = 0; // indicate that times uninitialized
double startTime = 0.0;
while (true) {
TRY_READ_LOCKER(locker, _idxLock);
@ -2323,20 +2324,18 @@ int MMFilesCollection::beginReadTimed(bool useDeadlockDetector,
}
}
if (end == 0.0) {
double now = TRI_microtime();
if (waitTime == 0) { // initialize times
// set end time for lock waiting
if (timeout <= 0.0) {
timeout = 15.0 * 60.0;
}
end = TRI_microtime() + timeout;
TRI_ASSERT(end > 0.0);
startTime = now;
waitTime = 1;
}
std::this_thread::yield();
TRI_ASSERT(end > 0.0);
if (TRI_microtime() > end) {
if (now > startTime + timeout) {
if (useDeadlockDetector) {
_logicalCollection->vocbase()->_deadlockDetector.unsetReaderBlocked(_logicalCollection);
}
@ -2344,6 +2343,15 @@ int MMFilesCollection::beginReadTimed(bool useDeadlockDetector,
<< "'";
return TRI_ERROR_LOCK_TIMEOUT;
}
if (now - startTime < 0.001) {
std::this_thread::yield();
} else {
usleep(waitTime);
if (waitTime < 500000) {
waitTime *= 2;
}
}
}
}
@ -2365,7 +2373,8 @@ int MMFilesCollection::beginWriteTimed(bool useDeadlockDetector,
// std::cout << "BeginWriteTimed: " << document->_info._name << std::endl;
int iterations = 0;
bool wasBlocked = false;
double end = 0.0;
uint64_t waitTime = 0; // indicate that times uninitialized
double startTime = 0.0;
while (true) {
TRY_WRITE_LOCKER(locker, _idxLock);
@ -2421,22 +2430,18 @@ int MMFilesCollection::beginWriteTimed(bool useDeadlockDetector,
}
}
std::this_thread::yield();
double now = TRI_microtime();
if (end == 0.0) {
if (waitTime == 0) { // initialize times
// set end time for lock waiting
if (timeout <= 0.0) {
timeout = 15.0 * 60.0;
}
end = TRI_microtime() + timeout;
TRI_ASSERT(end > 0.0);
startTime = now;
waitTime = 1;
}
std::this_thread::yield();
TRI_ASSERT(end > 0.0);
if (TRI_microtime() > end) {
if (now > startTime + timeout) {
if (useDeadlockDetector) {
_logicalCollection->vocbase()->_deadlockDetector.unsetWriterBlocked(
_logicalCollection);
@ -2445,6 +2450,16 @@ int MMFilesCollection::beginWriteTimed(bool useDeadlockDetector,
<< "'";
return TRI_ERROR_LOCK_TIMEOUT;
}
if (now - startTime < 0.001) {
std::this_thread::yield();
} else {
usleep(waitTime);
if (waitTime < 500000) {
waitTime *= 2;
}
}
}
}

View File

@ -143,7 +143,7 @@ bool Conductor::_startGlobalStep() {
});
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
LOG_TOPIC(INFO, Logger::PREGEL)
LOG_TOPIC(ERR, Logger::PREGEL)
<< "Seems there is at least one worker out of order";
// the recovery mechanisms should take care of this
return false;
@ -198,11 +198,11 @@ bool Conductor::_startGlobalStep() {
res = _sendToAllDBServers(Utils::startGSSPath, b); // call me maybe
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor could not start GSS "
LOG_TOPIC(ERR, Logger::PREGEL) << "Conductor could not start GSS "
<< _globalSuperstep;
// the recovery mechanisms should take care od this
} else {
LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor started new gss "
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Conductor started new gss "
<< _globalSuperstep;
}
return res == TRI_ERROR_NO_ERROR;

View File

@ -79,7 +79,7 @@ void PregelFeature::start() {
}
const size_t threadNum = PregelFeature::availableParallelism();
LOG_TOPIC(INFO, Logger::PREGEL) << "Pregel uses " << threadNum << " threads";
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Pregel uses " << threadNum << " threads";
_threadPool.reset(new ThreadPool(threadNum, "Pregel"));
if (ServerState::instance()->isCoordinator()) {

View File

@ -115,7 +115,7 @@ GSSContext::~GSSContext() {}*/
template <typename V, typename E, typename M>
Worker<V, E, M>::~Worker() {
LOG_TOPIC(INFO, Logger::PREGEL) << "Called ~Worker()";
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Called ~Worker()";
_state = WorkerState::DONE;
usleep(50000); // 50ms wait for threads to die
delete _readCache;
@ -238,7 +238,7 @@ VPackBuilder Worker<V, E, M>::prepareGlobalStep(VPackSlice const& data) {
_workerAggregators->serializeValues(response);
response.close();
LOG_TOPIC(INFO, Logger::PREGEL) << "Responded: " << response.toJson();
//LOG_TOPIC(INFO, Logger::PREGEL) << "Responded: " << response.toJson();
return response;
}
@ -297,7 +297,7 @@ void Worker<V, E, M>::startGlobalStep(VPackSlice const& data) {
_workerContext->preGlobalSuperstep(gss);
}
LOG_TOPIC(INFO, Logger::PREGEL) << "Worker starts new gss: " << gss;
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Worker starts new gss: " << gss;
_startProcessing(); // sets _state = COMPUTING;
}
@ -344,7 +344,7 @@ void Worker<V, E, M>::_startProcessing() {
i++;
} while (start != total);
// TRI_ASSERT(_runningThreads == i);
LOG_TOPIC(INFO, Logger::PREGEL) << "Using " << i << " Threads";
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Using " << i << " Threads";
}
template <typename V, typename E, typename M>
@ -399,7 +399,7 @@ bool Worker<V, E, M>::_processVertices(
}
}
if (_state != WorkerState::COMPUTING) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Execution aborted prematurely.";
LOG_TOPIC(WARN, Logger::PREGEL) << "Execution aborted prematurely.";
break;
}
}
@ -413,11 +413,11 @@ bool Worker<V, E, M>::_processVertices(
_nextGSSSendMessageCount += outCache->sendCountNextGSS();
}
double t = TRI_microtime();
//double t = TRI_microtime();
// merge thread local messages, _writeCache does locking
_writeCache->mergeCache(_config, inCache);
// TODO ask how to implement message sending without waiting for a response
t = TRI_microtime() - t;
//t = TRI_microtime() - t;
MessageStats stats;
stats.sendCount = outCache->sendCount();
@ -428,10 +428,10 @@ bool Worker<V, E, M>::_processVertices(
bool lastThread = false;
{ // only one thread at a time
MUTEX_LOCKER(guard, _threadMutex);
if (t > 0.005) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Total " << stats.superstepRuntimeSecs
/*if (t > 0.005) {
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Total " << stats.superstepRuntimeSecs
<< " s merge took " << t << " s";
}
}*/
// merge the thread local stats and aggregators
_workerAggregators->aggregateValues(workerAggregator);
@ -632,7 +632,7 @@ void Worker<V, E, M>::startRecovery(VPackSlice const& data) {
MUTEX_LOCKER(guard, _commandMutex);
VPackSlice method = data.get(Utils::recoveryMethodKey);
if (method.compareString(Utils::compensate) != 0) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Unsupported operation";
LOG_TOPIC(ERR, Logger::PREGEL) << "Unsupported operation";
return;
}
// else if (method.compareString(Utils::rollback) == 0)
@ -668,7 +668,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
ThreadPool* pool = PregelFeature::instance()->threadPool();
pool->enqueue([this] {
if (_state != WorkerState::RECOVERING) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Compensation aborted prematurely.";
LOG_TOPIC(WARN, Logger::PREGEL) << "Compensation aborted prematurely.";
return;
}
@ -684,7 +684,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
vCompensate->compensate(i > _preRecoveryTotal);
i++;
if (_state != WorkerState::RECOVERING) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Execution aborted prematurely.";
LOG_TOPIC(WARN, Logger::PREGEL) << "Execution aborted prematurely.";
break;
}
}
@ -705,7 +705,7 @@ template <typename V, typename E, typename M>
void Worker<V, E, M>::finalizeRecovery(VPackSlice const& data) {
MUTEX_LOCKER(guard, _commandMutex);
if (_state != WorkerState::RECOVERING) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Compensation aborted prematurely.";
LOG_TOPIC(WARN, Logger::PREGEL) << "Compensation aborted prematurely.";
return;
}

View File

@ -87,6 +87,7 @@ void WorkerConfig::updateConfig(VPackSlice params) {
shards.push_back(shard);
_localVertexShardIDs.push_back(shard);
_localPregelShardIDs.insert(_pregelShardIDs[shard]);
_localPShardIDs_hash.insert(_pregelShardIDs[shard]);
}
_vertexCollectionShards.emplace(pair.key.copyString(), shards);
}

View File

@ -112,7 +112,7 @@ class WorkerConfig {
// index in globalShardIDs
inline bool isLocalVertexShard(PregelShard shardIndex) const {
// TODO cache this? prob small
return _localPregelShardIDs.find(shardIndex) != _localPregelShardIDs.end();
return _localPShardIDs_hash.find(shardIndex) != _localPShardIDs_hash.end();
}
// convert an arangodb document id to a pregel id
@ -144,6 +144,7 @@ class WorkerConfig {
/// cache these ids as much as possible, since we access them often
std::unordered_map<std::string, PregelShard> _pregelShardIDs;
std::set<PregelShard> _localPregelShardIDs;
std::unordered_set<PregelShard> _localPShardIDs_hash;
};
}
}

View File

@ -44,8 +44,6 @@ class JobGuard : public SameThreadAsserter {
~JobGuard() { release(); }
public:
bool isIdle() { return _scheduler->isIdle(); }
void work() {
TRI_ASSERT(!_isWorkingFlag);

View File

@ -30,11 +30,13 @@
using namespace arangodb;
namespace {
class JobQueueThread final : public Thread {
namespace arangodb {
class JobQueueThread final
: public Thread,
public std::enable_shared_from_this<JobQueueThread> {
public:
JobQueueThread(JobQueue* server, boost::asio::io_service* ioService)
: Thread("JobQueueThread"), _jobQueue(server), _ioService(ioService) {}
JobQueueThread(JobQueue* server, rest::Scheduler* scheduler)
: Thread("JobQueueThread"), _jobQueue(server), _scheduler(scheduler) {}
~JobQueueThread() { shutdown(); }
@ -46,51 +48,40 @@ class JobQueueThread final : public Thread {
public:
void run() {
int idleTries = 0;
auto jobQueue = _jobQueue;
auto self = shared_from_this();
// iterate until we are shutting down
while (!isStopping()) {
++idleTries;
for (size_t i = 0; i < JobQueue::SYSTEM_QUEUE_SIZE; ++i) {
LOG_TOPIC(TRACE, Logger::THREADS) << "size of queue #" << i << ": "
<< _jobQueue->queueSize(i);
LOG_TOPIC(TRACE, Logger::THREADS) << "size of job queue: "
<< _jobQueue->queueSize();
while (_jobQueue->tryActive()) {
Job* job = nullptr;
while (_scheduler->shouldQueueMore()) {
Job* jobPtr = nullptr;
if (!_jobQueue->pop(i, job)) {
_jobQueue->releaseActive();
break;
if (!_jobQueue->pop(jobPtr)) {
break;
}
LOG_TOPIC(TRACE, Logger::THREADS) << "starting next queued job";
idleTries = 0;
std::shared_ptr<Job> job(jobPtr);
_scheduler->post([this, self, job]() {
try {
job->_callback(std::move(job->_handler));
} catch (std::exception& e) {
LOG_TOPIC(WARN, Logger::THREADS) << "Exception caught in a dangereous place! " << e.what();
}
LOG_TOPIC(TRACE, Logger::THREADS)
<< "starting next queued job, number currently active "
<< _jobQueue->active();
idleTries = 0;
_ioService->post([jobQueue, job]() {
JobGuard guard(SchedulerFeature::SCHEDULER);
guard.work();
std::unique_ptr<Job> releaseGuard(job);
try {
job->_callback(std::move(job->_handler));
} catch (...) {
}
jobQueue->releaseActive();
jobQueue->wakeup();
});
}
this->_jobQueue->wakeup();
});
}
// we need to check again if more work has arrived after we have
// aquired the lock. The lockfree queue and _nrWaiting are accessed
// using "memory_order_seq_cst", this guarantees that we do not
// miss a signal.
if (idleTries >= 2) {
LOG_TOPIC(TRACE, Logger::THREADS) << "queue manager going to sleep";
_jobQueue->waitForWork();
@ -98,17 +89,16 @@ class JobQueueThread final : public Thread {
}
// clear all non-processed jobs
for (size_t i = 0; i < JobQueue::SYSTEM_QUEUE_SIZE; ++i) {
Job* job = nullptr;
while (_jobQueue->pop(i, job)) {
delete job;
}
Job* job = nullptr;
while (_jobQueue->pop(job)) {
delete job;
}
}
private:
JobQueue* _jobQueue;
boost::asio::io_service* _ioService;
rest::Scheduler* _scheduler;
};
}
@ -116,46 +106,18 @@ class JobQueueThread final : public Thread {
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
JobQueue::JobQueue(size_t queueSize, boost::asio::io_service* ioService)
: _queueAql(queueSize),
_queueRequeue(queueSize),
_queueStandard(queueSize),
_queueUser(queueSize),
_queues{&_queueRequeue, &_queueAql, &_queueStandard, &_queueUser},
_active(0),
_ioService(ioService),
_queueThread(new JobQueueThread(this, _ioService)) {
for (size_t i = 0; i < SYSTEM_QUEUE_SIZE; ++i) {
_queuesSize[i].store(0);
}
}
JobQueue::JobQueue(size_t queueSize, rest::Scheduler* scheduler)
: _queue(queueSize),
_queueSize(0),
_queueThread(new JobQueueThread(this, scheduler)) {}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
void JobQueue::start() {
_queueThread->start();
}
void JobQueue::start() { _queueThread->start(); }
void JobQueue::beginShutdown() {
_queueThread->beginShutdown();
}
bool JobQueue::tryActive() {
static size_t const MAX_ACTIVE = 10;
if (_active > MAX_ACTIVE) {
return false;
}
++_active;
return true;
}
void JobQueue::releaseActive() {
--_active;
}
void JobQueue::beginShutdown() { _queueThread->beginShutdown(); }
void JobQueue::wakeup() {
CONDITION_LOCKER(guard, _queueCondition);

View File

@ -27,42 +27,41 @@
#include <boost/lockfree/queue.hpp>
#include "Basics/asio-helper.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include "Basics/asio-helper.h"
#include "Scheduler/Job.h"
namespace arangodb {
namespace rest {
class Scheduler;
}
class JobQueueThread;
class JobQueue {
public:
// ordered by priority (highst prio first)
static size_t const REQUEUED_QUEUE = 0;
static size_t const AQL_QUEUE = 1;
static size_t const STANDARD_QUEUE = 2;
static size_t const USER_QUEUE = 3;
static size_t const SYSTEM_QUEUE_SIZE = 4;
public:
JobQueue(size_t queueSize, boost::asio::io_service* ioService);
JobQueue(size_t queueSize, rest::Scheduler*);
public:
void start();
void beginShutdown();
int64_t queueSize(size_t i) const { return _queuesSize[i]; }
bool queue(size_t i, std::unique_ptr<Job> job) {
if (i >= SYSTEM_QUEUE_SIZE) {
return false;
}
int64_t queueSize() const { return _queueSize; }
bool queue(std::unique_ptr<Job> job) {
try {
if (!_queues[i]->push(job.get())) {
if (!_queue.push(job.get())) {
throw "failed to add to queue";
}
job.release();
++_queuesSize[i];
++_queueSize;
} catch (...) {
wakeup();
return false;
@ -72,15 +71,11 @@ class JobQueue {
return true;
}
bool pop(size_t i, Job*& job) {
if (i >= SYSTEM_QUEUE_SIZE) {
return false;
}
bool ok = _queues[i]->pop(job) && job != nullptr;
bool pop(Job*& job) {
bool ok = _queue.pop(job) && job != nullptr;
if (ok) {
--_queuesSize[i];
--_queueSize;
}
return ok;
@ -89,23 +84,13 @@ class JobQueue {
void wakeup();
void waitForWork();
size_t active() const { return _active.load(); }
bool tryActive();
void releaseActive();
private:
boost::lockfree::queue<Job*> _queueAql;
boost::lockfree::queue<Job*> _queueRequeue;
boost::lockfree::queue<Job*> _queueStandard;
boost::lockfree::queue<Job*> _queueUser;
boost::lockfree::queue<Job*>* _queues[SYSTEM_QUEUE_SIZE];
std::atomic<int64_t> _queuesSize[SYSTEM_QUEUE_SIZE];
std::atomic<size_t> _active;
boost::lockfree::queue<Job*> _queue;
std::atomic<int64_t> _queueSize;
basics::ConditionVariable _queueCondition;
boost::asio::io_service* _ioService;
std::unique_ptr<Thread> _queueThread;
std::shared_ptr<JobQueueThread> _queueThread;
};
}

View File

@ -92,8 +92,8 @@ class SchedulerThread : public Thread {
public:
void run() {
_scheduler->incRunning();
LOG_TOPIC(DEBUG, Logger::THREADS) << "running (" << _scheduler->infoStatus()
<< ")";
LOG_TOPIC(DEBUG, Logger::THREADS) << "started thread ("
<< _scheduler->infoStatus() << ")";
auto start = std::chrono::steady_clock::now();
@ -102,6 +102,8 @@ class SchedulerThread : public Thread {
static double MIN_SECONDS = 30;
size_t counter = 0;
bool doDecrement = true;
while (!_scheduler->isStopping()) {
_service->run_one();
@ -114,12 +116,13 @@ class SchedulerThread : public Thread {
if (diff.count() > MIN_SECONDS) {
start = std::chrono::steady_clock::now();
if (_scheduler->stopThread()) {
if (_scheduler->shouldStopThread()) {
auto n = _scheduler->decRunning();
if (n <= 2) {
if (n <= _scheduler->minimum()) {
_scheduler->incRunning();
} else {
doDecrement = false;
break;
}
}
@ -127,6 +130,10 @@ class SchedulerThread : public Thread {
}
}
if (doDecrement) {
_scheduler->decRunning();
}
LOG_TOPIC(DEBUG, Logger::THREADS) << "stopped ("
<< _scheduler->infoStatus() << ")";
} catch (...) {
@ -153,28 +160,24 @@ class SchedulerThread : public Thread {
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
Scheduler::Scheduler(size_t nrThreads, size_t maxQueueSize)
: _nrThreads(nrThreads),
Scheduler::Scheduler(uint64_t nrMinimum, uint64_t nrDesired, uint64_t nrMaximum,
uint64_t maxQueueSize)
: _stopping(false),
_maxQueueSize(maxQueueSize),
_stopping(false),
_nrMinimum(nrMinimum),
_nrDesired(nrDesired),
_nrMaximum(nrMaximum),
_nrWorking(0),
_nrQueued(0),
_nrBlocked(0),
_nrRunning(0),
_nrMinimal(0),
_nrMaximal(0),
_nrRealMaximum(0),
_lastThreadWarning(0) {
_nrRunning(0) {
// setup signal handlers
initializeSignalHandlers();
}
Scheduler::~Scheduler() {
if (_threadManager != nullptr) {
try {
_threadManager->cancel();
} catch (...) {
// must not throw in the dtor
}
_threadManager->cancel();
}
try {
@ -191,10 +194,14 @@ Scheduler::~Scheduler() {
// -----------------------------------------------------------------------------
void Scheduler::post(std::function<void()> callback) {
++_nrQueued;
_ioService.get()->post([this, callback]() {
JobGuard guard(this);
guard.work();
--_nrQueued;
callback();
});
}
@ -203,20 +210,11 @@ bool Scheduler::start(ConditionVariable* cv) {
// start the I/O
startIoService();
// initialize thread handling
if (_nrMaximal <= 0) {
_nrMaximal = _nrThreads;
}
TRI_ASSERT(0 < _nrMinimum);
TRI_ASSERT(_nrMinimum <= _nrDesired);
TRI_ASSERT(_nrDesired <= _nrMaximum);
if (_nrRealMaximum <= 0) {
_nrRealMaximum = 4 * _nrMaximal;
}
if (_nrRealMaximum <= 64) {
_nrRealMaximum = 64;
}
for (size_t i = 0; i < 2; ++i) {
for (size_t i = 0; i < (size_t)_nrMinimum; ++i) {
startNewThread();
}
@ -224,7 +222,7 @@ bool Scheduler::start(ConditionVariable* cv) {
startRebalancer();
// initialize the queue handling
_jobQueue.reset(new JobQueue(_maxQueueSize, _ioService.get()));
_jobQueue.reset(new JobQueue(_maxQueueSize, this));
_jobQueue->start();
// done
@ -241,7 +239,7 @@ void Scheduler::startIoService() {
}
void Scheduler::startRebalancer() {
std::chrono::milliseconds interval(500);
std::chrono::milliseconds interval(100);
_threadManager.reset(new boost::asio::steady_timer(*_managerService));
_threadHandler = [this, interval](const boost::system::error_code& error) {
@ -257,8 +255,6 @@ void Scheduler::startRebalancer() {
_threadManager->expires_from_now(interval);
_threadManager->async_wait(_threadHandler);
_lastThreadWarning.store(TRI_microtime());
}
void Scheduler::startManagerThread() {
@ -279,22 +275,54 @@ void Scheduler::startNewThread() {
thread->start();
}
bool Scheduler::stopThread() {
if (_nrRunning <= _nrMinimal) {
bool Scheduler::shouldStopThread() {
if (_nrRunning <= _nrWorking + _nrQueued + _nrMinimum) {
return false;
}
if (_nrRunning >= 3) {
int64_t low = ((_nrRunning <= 4) ? 0 : (_nrRunning * 1 / 4)) - _nrBlocked;
if (_nrWorking <= low) {
return true;
}
if (_nrMinimum + _nrBlocked < _nrRunning) {
return true;
}
return false;
}
bool Scheduler::shouldQueueMore() {
if (_nrWorking + _nrQueued + _nrMinimum < _nrMaximum) {
return true;
}
return false;
}
bool Scheduler::hasQueueCapacity() {
if (_nrWorking + _nrQueued + _nrMinimum >= _nrMaximum) {
return false;
}
auto jobQueue = _jobQueue.get();
auto queueSize = (jobQueue == nullptr) ? 0 : jobQueue->queueSize();
return queueSize == 0;
}
bool Scheduler::queue(std::unique_ptr<Job> job) {
return _jobQueue->queue(std::move(job));
}
std::string Scheduler::infoStatus() {
auto jobQueue = _jobQueue.get();
auto queueSize = (jobQueue == nullptr) ? 0 : jobQueue->queueSize();
return "working: " + std::to_string(_nrWorking) + ", queued: " +
std::to_string(_nrQueued) + ", blocked: " +
std::to_string(_nrBlocked) + ", running: " +
std::to_string(_nrRunning) + ", outstanding: " +
std::to_string(queueSize) + ", min/des/max: " +
std::to_string(_nrMinimum) + "/" + std::to_string(_nrDesired) + "/" +
std::to_string(_nrMaximum);
}
void Scheduler::threadDone(Thread* thread) {
MUTEX_LOCKER(guard, _threadsLock);
@ -326,48 +354,19 @@ void Scheduler::deleteOldThreads() {
}
void Scheduler::rebalanceThreads() {
static double const MIN_WARN_INTERVAL = 10;
static double const MIN_ERR_INTERVAL = 300;
static uint64_t count = 0;
int64_t high = (_nrRunning <= 4) ? 1 : (_nrRunning * 11 / 16);
++count;
LOG_TOPIC(DEBUG, Logger::THREADS) << "rebalancing threads, high: " << high
<< ", " << infoStatus();
if (_nrWorking >= high) {
if (_nrRunning < _nrMaximal + _nrBlocked &&
_nrRunning < _nrRealMaximum) { // added by Max 22.12.2016
// otherwise we exceed the total maximum
startNewThread();
return;
}
if ((count % 5) == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "rebalancing threads: " << infoStatus();
} else {
LOG_TOPIC(TRACE, Logger::THREADS) << "rebalancing threads: " << infoStatus();
}
if (_nrWorking >= _nrMaximal + _nrBlocked || _nrRunning < _nrMinimal) {
double ltw = _lastThreadWarning.load();
double now = TRI_microtime();
if (_nrRunning >= _nrRealMaximum) {
if (ltw - now > MIN_ERR_INTERVAL) {
LOG_TOPIC(ERR, Logger::THREADS) << "too many threads (" << infoStatus()
<< ")";
_lastThreadWarning.store(now);
}
} else {
if (_nrRunning >= _nrRealMaximum * 3 / 4) {
if (ltw - now > MIN_WARN_INTERVAL) {
LOG_TOPIC(WARN, Logger::THREADS)
<< "number of threads is reaching a critical limit ("
<< infoStatus() << ")";
_lastThreadWarning.store(now);
}
}
LOG_TOPIC(DEBUG, Logger::THREADS) << "overloading threads ("
<< infoStatus() << ")";
startNewThread();
}
while (_nrRunning < _nrWorking + _nrQueued + _nrMinimum) {
startNewThread();
usleep(5000);
}
}

View File

@ -35,6 +35,7 @@
#include "Basics/socket-utils.h"
#include "Logger/Logger.h"
#include "Scheduler/EventLoop.h"
#include "Scheduler/Job.h"
namespace arangodb {
class JobQueue;
@ -56,7 +57,8 @@ class Scheduler {
friend class arangodb::JobGuard;
public:
Scheduler(size_t nrThreads, size_t maxQueueSize);
Scheduler(uint64_t nrMinimum, uint64_t nrDesired, uint64_t nrMaximum,
uint64_t maxQueueSize);
virtual ~Scheduler();
public:
@ -84,33 +86,20 @@ class Scheduler {
static void initializeSignalHandlers();
public:
JobQueue* jobQueue() const { return _jobQueue.get(); }
bool shouldStopThread();
bool shouldQueueMore();
bool hasQueueCapacity();
bool isIdle() {
if (_nrWorking < _nrRunning && _nrWorking < _nrMaximal) {
return true;
}
bool queue(std::unique_ptr<Job> job);
return false;
}
void setMinimal(int64_t minimal) { _nrMinimal = minimal; }
void setMaximal(int64_t maximal) { _nrMaximal = maximal; }
void setRealMaximum(int64_t maximum) { _nrRealMaximum = maximum; }
uint64_t minimum() const { return _nrMinimum; }
uint64_t incRunning() { return ++_nrRunning; }
uint64_t decRunning() { return --_nrRunning; }
std::string infoStatus() {
return "working: " + std::to_string(_nrWorking) + ", blocked: " +
std::to_string(_nrBlocked) + ", running: " +
std::to_string(_nrRunning) + ", maximal: " +
std::to_string(_nrMaximal) + ", real maximum: " +
std::to_string(_nrRealMaximum);
}
std::string infoStatus();
void startNewThread();
bool stopThread();
void threadDone(Thread*);
void deleteOldThreads();
@ -127,19 +116,32 @@ class Scheduler {
void rebalanceThreads();
private:
size_t _nrThreads;
size_t _maxQueueSize;
std::atomic<bool> _stopping;
std::atomic<int64_t> _nrWorking;
std::atomic<int64_t> _nrBlocked;
std::atomic<int64_t> _nrRunning;
std::atomic<int64_t> _nrMinimal;
std::atomic<int64_t> _nrMaximal;
std::atomic<int64_t> _nrRealMaximum;
// maximal number of outstanding user requests
int64_t const _maxQueueSize;
std::atomic<double> _lastThreadWarning;
// minimum number of running SchedulerThreads
int64_t const _nrMinimum;
// desired number of running SchedulerThreads
int64_t const _nrDesired;
// maximal number of outstanding user requests
int64_t const _nrMaximum;
// number of jobs currently been worked on
// use signed values just in case we have an underflow
std::atomic<int64_t> _nrWorking;
// number of jobs that are currently been queued, but not worked on
std::atomic<int64_t> _nrQueued;
// number of jobs that entered a potentially blocking situation
std::atomic<int64_t> _nrBlocked;
// number of SchedulerThread that are running
std::atomic<int64_t> _nrRunning;
std::unique_ptr<JobQueue> _jobQueue;

View File

@ -67,11 +67,13 @@ void SchedulerFeature::collectOptions(
options->addOption("--server.threads", "number of threads",
new UInt64Parameter(&_nrServerThreads));
options->addHiddenOption("--server.minimal-threads", "minimal number of threads",
new Int64Parameter(&_nrMinimalThreads));
options->addHiddenOption("--server.minimal-threads",
"minimal number of threads",
new Int64Parameter(&_nrMinimalThreads));
options->addHiddenOption("--server.maximal-threads", "maximal number of threads",
new Int64Parameter(&_nrMaximalThreads));
options->addHiddenOption("--server.maximal-threads",
"maximal number of threads",
new Int64Parameter(&_nrMaximalThreads));
options->addOption("--server.maximal-queue-size",
"maximum queue length for asynchronous operations",
@ -93,7 +95,23 @@ void SchedulerFeature::validateOptions(
FATAL_ERROR_EXIT();
}
if (_nrMinimalThreads != 0 && _nrMaximalThreads != 0 && _nrMinimalThreads > _nrMaximalThreads) {
if (_nrMinimalThreads < 2) {
_nrMinimalThreads = 2;
}
if (_nrServerThreads <= 0) {
_nrServerThreads = 1;
}
if (_nrMaximalThreads <= 0) {
_nrMaximalThreads = 4 * _nrServerThreads;
}
if (_nrMaximalThreads < 64) {
_nrMaximalThreads = 64;
}
if (_nrMinimalThreads > _nrMaximalThreads) {
_nrMaximalThreads = _nrMinimalThreads;
}
}
@ -153,7 +171,6 @@ void SchedulerFeature::stop() {
LOG_TOPIC(TRACE, Logger::STARTUP) << "waiting for scheduler to stop";
usleep(100000);
}
}
void SchedulerFeature::unprepare() {
@ -234,13 +251,11 @@ bool CtrlHandler(DWORD eventType) {
void SchedulerFeature::buildScheduler() {
_scheduler =
std::make_unique<Scheduler>(static_cast<size_t>(_nrServerThreads),
static_cast<size_t>(_queueSize));
std::make_unique<Scheduler>(static_cast<uint64_t>(_nrMinimalThreads),
static_cast<uint64_t>(_nrServerThreads),
static_cast<uint64_t>(_nrMaximalThreads),
static_cast<uint64_t>(_queueSize));
_scheduler->setMinimal(_nrMinimalThreads);
_scheduler->setRealMaximum(_nrMaximalThreads);
TRI_ASSERT(SCHEDULER == nullptr);
SCHEDULER = _scheduler.get();
}
@ -256,7 +271,8 @@ void SchedulerFeature::buildControlCHandler() {
#else
auto ioService = _scheduler->managerService();
_exitSignals = std::make_shared<boost::asio::signal_set>(*ioService, SIGINT, SIGTERM, SIGQUIT);
_exitSignals = std::make_shared<boost::asio::signal_set>(*ioService, SIGINT,
SIGTERM, SIGQUIT);
_signalHandler = [this](const boost::system::error_code& error, int number) {
if (error) {
@ -285,7 +301,8 @@ void SchedulerFeature::buildHangupHandler() {
#ifndef WIN32
auto ioService = _scheduler->managerService();
_hangupSignals = std::make_shared<boost::asio::signal_set>(*ioService, SIGHUP);
_hangupSignals =
std::make_shared<boost::asio::signal_set>(*ioService, SIGHUP);
_hangupHandler = [this](const boost::system::error_code& error, int number) {
if (error) {

View File

@ -55,7 +55,7 @@ class SchedulerFeature final : public application_features::ApplicationFeature {
uint64_t _nrServerThreads = 0;
int64_t _nrMinimalThreads = 0;
int64_t _nrMaximalThreads = 0;
uint64_t _queueSize = 128;
uint64_t _queueSize = 512;
public:
size_t concurrency() const {

File diff suppressed because one or more lines are too long

View File

@ -2077,7 +2077,9 @@ function endpoints() {
try {
let coords = global.ArangoClusterInfo.getCoordinators();
let endpoints = coords.map(c => global.ArangoClusterInfo.getServerEndpoint(c));
return endpoints.map(function(e) { return {"endpoint": e}; });
return { "endpoints": endpoints.map(function(e) {
return {"endpoint": e};
}) };
} catch (err) {
return { error: true, exception: err };
}