1
0
Fork 0

[devel] Direct Exec Scheduler (#9004)

This commit is contained in:
Lars Maier 2019-05-20 11:38:57 +02:00 committed by Jan
parent 56696e3ea6
commit 4fc2790863
15 changed files with 236 additions and 115 deletions

View File

@ -1108,7 +1108,7 @@ void ClusterComm::disable() {
} }
void ClusterComm::scheduleMe(std::function<void()> task) { void ClusterComm::scheduleMe(std::function<void()> task) {
arangodb::SchedulerFeature::SCHEDULER->queue(RequestLane::CLUSTER_INTERNAL, task); arangodb::SchedulerFeature::SCHEDULER->queue(RequestLane::CLUSTER_INTERNAL, std::move(task));
} }
ClusterCommThread::ClusterCommThread() : Thread("ClusterComm"), _cc(nullptr) { ClusterCommThread::ClusterCommThread() : Thread("ClusterComm"), _cc(nullptr) {

View File

@ -71,13 +71,11 @@ RestStatus RestAgencyCallbacksHandler::execute() {
LOG_TOPIC("76a8a", DEBUG, Logger::CLUSTER) LOG_TOPIC("76a8a", DEBUG, Logger::CLUSTER)
<< "Agency callback has been triggered. refetching!"; << "Agency callback has been triggered. refetching!";
// SchedulerFeature::SCHEDULER->queue(RequestPriority::MED, [cb] {
try { try {
cb->refetchAndUpdate(true, false); cb->refetchAndUpdate(true, false);
} catch (arangodb::basics::Exception const& e) { } catch (arangodb::basics::Exception const& e) {
LOG_TOPIC("c3910", WARN, Logger::AGENCYCOMM) << "Error executing callback: " << e.message(); LOG_TOPIC("c3910", WARN, Logger::AGENCYCOMM) << "Error executing callback: " << e.message();
} }
//});
resetResponse(arangodb::rest::ResponseCode::ACCEPTED); resetResponse(arangodb::rest::ResponseCode::ACCEPTED);
} }

View File

@ -73,10 +73,10 @@ inline bool startsWith(std::string const& path, char const* other) {
// --SECTION-- constructors and destructors // --SECTION-- constructors and destructors
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
GeneralCommTask::GeneralCommTask(GeneralServer& server, GeneralCommTask::GeneralCommTask(GeneralServer& server,
GeneralServer::IoContext& context, GeneralServer::IoContext& context,
char const* name, char const* name,
std::unique_ptr<Socket> socket, std::unique_ptr<Socket> socket,
ConnectionInfo&& info, ConnectionInfo&& info,
double keepAliveTimeout, bool skipSocketInit) double keepAliveTimeout, bool skipSocketInit)
: SocketTask(server, context, name, std::move(socket), std::move(info), : SocketTask(server, context, name, std::move(socket), std::move(info),
@ -303,7 +303,7 @@ void GeneralCommTask::executeRequest(std::unique_ptr<GeneralRequest>&& request,
<< "could not find corresponding request/response"; << "could not find corresponding request/response";
} }
rest::ContentType respType = request->contentTypeResponse(); rest::ContentType const respType = request->contentTypeResponse();
// create a handler, this takes ownership of request and response // create a handler, this takes ownership of request and response
std::shared_ptr<RestHandler> handler( std::shared_ptr<RestHandler> handler(
GeneralServerFeature::HANDLER_FACTORY->createHandler(std::move(request), GeneralServerFeature::HANDLER_FACTORY->createHandler(std::move(request),
@ -341,6 +341,11 @@ void GeneralCommTask::executeRequest(std::unique_ptr<GeneralRequest>&& request,
ok = handleRequestAsync(std::move(handler)); ok = handleRequestAsync(std::move(handler));
} }
TRI_IF_FAILURE("queueFull") {
ok = false;
jobId = 0;
}
if (ok) { if (ok) {
std::unique_ptr<GeneralResponse> response = std::unique_ptr<GeneralResponse> response =
createResponse(rest::ResponseCode::ACCEPTED, messageId); createResponse(rest::ResponseCode::ACCEPTED, messageId);
@ -353,7 +358,7 @@ void GeneralCommTask::executeRequest(std::unique_ptr<GeneralRequest>&& request,
addResponse(*response, nullptr); addResponse(*response, nullptr);
} else { } else {
addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE, addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE,
request->contentTypeResponse(), messageId, TRI_ERROR_QUEUE_FULL); respType, messageId, TRI_ERROR_QUEUE_FULL);
} }
} else { } else {
// synchronous request // synchronous request
@ -447,20 +452,19 @@ void GeneralCommTask::addErrorResponse(rest::ResponseCode code, rest::ContentTyp
// thread. Depending on the number of running threads requests may be queued // thread. Depending on the number of running threads requests may be queued
// and scheduled later when the number of used threads decreases // and scheduled later when the number of used threads decreases
bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) { bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
auto const lane = handler->getRequestLane();
auto self = shared_from_this();
if (application_features::ApplicationServer::isStopping()) { if (application_features::ApplicationServer::isStopping()) {
return false; return false;
} }
bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self, this, handler]() { auto const lane = handler->getRequestLane();
handleRequestDirectly(basics::ConditionalLocking::DoLock, std::move(handler));
bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), this, handler]() {
handleRequestDirectly(basics::ConditionalLocking::DoLock, handler);
}); });
if (!ok) { if (!ok) {
uint64_t messageId = handler->messageId();
addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE, addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE,
handler->request()->contentTypeResponse(), messageId, handler->request()->contentTypeResponse(), handler->messageId(),
TRI_ERROR_QUEUE_FULL); TRI_ERROR_QUEUE_FULL);
} }
@ -471,11 +475,11 @@ bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptr<RestHandler> handler) { void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptr<RestHandler> handler) {
TRI_ASSERT(doLock || _peer->runningInThisThread()); TRI_ASSERT(doLock || _peer->runningInThisThread());
auto self = shared_from_this();
if (application_features::ApplicationServer::isStopping()) { if (application_features::ApplicationServer::isStopping()) {
return; return;
} }
handler->runHandler([self = std::move(self), this](rest::RestHandler* handler) {
handler->runHandler([self = shared_from_this(), this](rest::RestHandler* handler) {
RequestStatistics* stat = handler->stealStatistics(); RequestStatistics* stat = handler->stealStatistics();
auto h = handler->shared_from_this(); auto h = handler->shared_from_this();
// Pass the response the io context // Pass the response the io context
@ -486,24 +490,25 @@ void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptr<RestHan
// handle a request which came in with the x-arango-async header // handle a request which came in with the x-arango-async header
bool GeneralCommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler, bool GeneralCommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
uint64_t* jobId) { uint64_t* jobId) {
auto self = shared_from_this();
if (application_features::ApplicationServer::isStopping()) { if (application_features::ApplicationServer::isStopping()) {
return false; return false;
} }
auto const lane = handler->getRequestLane();
if (jobId != nullptr) { if (jobId != nullptr) {
GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler); GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler);
*jobId = handler->handlerId(); *jobId = handler->handlerId();
// callback will persist the response with the AsyncJobManager // callback will persist the response with the AsyncJobManager
return SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = std::move(self), handler] { return SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), handler = std::move(handler)] {
handler->runHandler([](RestHandler* h) { handler->runHandler([](RestHandler* h) {
GeneralServerFeature::JOB_MANAGER->finishAsyncJob(h); GeneralServerFeature::JOB_MANAGER->finishAsyncJob(h);
}); });
}); });
} else { } else {
// here the response will just be ignored // here the response will just be ignored
return SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = std::move(self), handler] { return SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), handler = std::move(handler)] {
handler->runHandler([](RestHandler*) {}); handler->runHandler([](RestHandler*) {});
}); });
} }

View File

@ -111,6 +111,10 @@ void GeneralServer::stopListening() {
void GeneralServer::stopWorking() { void GeneralServer::stopWorking() {
_listenTasks.clear(); _listenTasks.clear();
for (auto& context : _contexts) {
context.stop();
}
while (true) { while (true) {
{ {
MUTEX_LOCKER(lock, _tasksLock); MUTEX_LOCKER(lock, _tasksLock);

View File

@ -1,4 +1,4 @@
//////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////
/// DISCLAIMER /// DISCLAIMER
/// ///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2016 ArangoDB GmbH, Cologne, Germany

View File

@ -87,18 +87,12 @@ SocketTask::~SocketTask() {
_connectionStatistics = nullptr; _connectionStatistics = nullptr;
} }
asio_ns::error_code err; cancelKeepAlive();
if (_keepAliveTimerActive.load(std::memory_order_relaxed)) {
_keepAliveTimer->cancel(err);
}
if (err) {
LOG_TOPIC("985c1", ERR, Logger::COMMUNICATION) << "unable to cancel _keepAliveTimer";
}
// _peer could be nullptr if it was moved out of a HttpCommTask, during // _peer could be nullptr if it was moved out of a HttpCommTask, during
// upgrade to a VstCommTask. // upgrade to a VstCommTask.
if (_peer) { if (_peer) {
asio_ns::error_code err;
_peer->close(err); _peer->close(err);
} }
@ -131,9 +125,7 @@ bool SocketTask::start() {
<< _connectionInfo.serverAddress << ":" << _connectionInfo.serverPort << " <-> " << _connectionInfo.serverAddress << ":" << _connectionInfo.serverPort << " <-> "
<< _connectionInfo.clientAddress << ":" << _connectionInfo.clientPort; << _connectionInfo.clientAddress << ":" << _connectionInfo.clientPort;
auto self = shared_from_this(); _peer->post([self = shared_from_this(), this]() { asyncReadSome(); });
_peer->post([self, this]() { asyncReadSome(); });
return true; return true;
} }
@ -188,13 +180,13 @@ bool SocketTask::completedWriteBuffer() {
// caller must not hold the _lock // caller must not hold the _lock
void SocketTask::closeStream() { void SocketTask::closeStream() {
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
_server.unregisterTask(this->id());
return; return;
} }
// strand::dispatch may execute this immediately if this // strand::dispatch may execute this immediately if this
// is called on a thread inside the same strand // is called on a thread inside the same strand
auto self = shared_from_this(); _peer->post([self = shared_from_this(), this] { closeStreamNoLock(); });
_peer->post([self, this] { closeStreamNoLock(); });
} }
// caller must hold the _lock // caller must hold the _lock
@ -214,8 +206,7 @@ void SocketTask::closeStreamNoLock() {
_closedSend.store(true, std::memory_order_release); _closedSend.store(true, std::memory_order_release);
_closedReceive.store(true, std::memory_order_release); _closedReceive.store(true, std::memory_order_release);
_closeRequested.store(false, std::memory_order_release); _closeRequested.store(false, std::memory_order_release);
_keepAliveTimer->cancel(); cancelKeepAlive();
_keepAliveTimerActive.store(false, std::memory_order_relaxed);
_server.unregisterTask(this->id()); _server.unregisterTask(this->id());
} }
@ -243,8 +234,7 @@ void SocketTask::resetKeepAlive() {
} }
_keepAliveTimerActive.store(true, std::memory_order_relaxed); _keepAliveTimerActive.store(true, std::memory_order_relaxed);
auto self = shared_from_this(); _keepAliveTimer->async_wait([self = shared_from_this(), this](const asio_ns::error_code& error) {
_keepAliveTimer->async_wait([self, this](const asio_ns::error_code& error) {
if (!error) { // error will be true if timer was canceled if (!error) { // error will be true if timer was canceled
LOG_TOPIC("5c1e0", ERR, Logger::COMMUNICATION) LOG_TOPIC("5c1e0", ERR, Logger::COMMUNICATION)
<< "keep alive timout - closing stream!"; << "keep alive timout - closing stream!";
@ -288,8 +278,6 @@ bool SocketTask::trySyncRead() {
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
asio_ns::error_code err; asio_ns::error_code err;
TRI_ASSERT(_peer != nullptr);
if (0 == _peer->available(err)) { if (0 == _peer->available(err)) {
return false; return false;
} }
@ -338,7 +326,6 @@ bool SocketTask::processAll() {
Result res; Result res;
bool rv = true; bool rv = true;
while (rv) { while (rv) {
Result result{TRI_ERROR_NO_ERROR};
try { try {
rv = processRead(startTime); rv = processRead(startTime);
} catch (arangodb::basics::Exception const& e) { } catch (arangodb::basics::Exception const& e) {
@ -358,11 +345,11 @@ bool SocketTask::processAll() {
if (res.fail()) { if (res.fail()) {
LOG_TOPIC("a3c44", ERR, Logger::COMMUNICATION) << res.errorMessage(); LOG_TOPIC("a3c44", ERR, Logger::COMMUNICATION) << res.errorMessage();
_closeRequested.store(true, std::memory_order_release); _closeRequested.store(true, std::memory_order_release);
break; return false;
} }
if (_closeRequested.load(std::memory_order_acquire)) { if (_closeRequested.load(std::memory_order_acquire)) {
break; return false;
} }
} }
@ -421,15 +408,13 @@ void SocketTask::asyncReadSome() {
return; return;
} }
auto self = shared_from_this();
// WARNING: the _readBuffer MUST NOT be changed until the callback // WARNING: the _readBuffer MUST NOT be changed until the callback
// has been called! Otherwise ASIO will get confused and write to // has been called! Otherwise ASIO will get confused and write to
// the wrong position. // the wrong position.
TRI_ASSERT(_peer != nullptr); TRI_ASSERT(_peer != nullptr);
_peer->asyncRead(asio_ns::buffer(_readBuffer.end(), READ_BLOCK_SIZE), _peer->asyncRead(asio_ns::buffer(_readBuffer.end(), READ_BLOCK_SIZE),
[self, this](const asio_ns::error_code& ec, std::size_t transferred) { [self = shared_from_this(), this](const asio_ns::error_code& ec, std::size_t transferred) {
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} else if (ec) { } else if (ec) {
@ -519,16 +504,14 @@ void SocketTask::asyncWriteSome() {
// so the code could have blocked at this point or not all data // so the code could have blocked at this point or not all data
// was written in one go, begin writing at offset (written) // was written in one go, begin writing at offset (written)
auto self = shared_from_this();
_peer->asyncWrite(asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written), _peer->asyncWrite(asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written),
[self, this](const asio_ns::error_code& ec, std::size_t transferred) { [self = shared_from_this(), this](const asio_ns::error_code& ec, std::size_t transferred) {
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} }
if (ec) { if (ec) {
LOG_TOPIC("8ed36", DEBUG, Logger::COMMUNICATION) LOG_TOPIC("8ed36", DEBUG, Logger::COMMUNICATION)
<< "write on failed with: " << ec.message(); << "write failed with: " << ec.message();
closeStream(); closeStream();
return; return;
} }
@ -544,11 +527,16 @@ void SocketTask::asyncWriteSome() {
} }
StringBuffer* SocketTask::leaseStringBuffer(size_t length) { StringBuffer* SocketTask::leaseStringBuffer(size_t length) {
std::unique_ptr<StringBuffer> buffer;
MUTEX_LOCKER(guard, _bufferLock); MUTEX_LOCKER(guard, _bufferLock);
StringBuffer* buffer = nullptr;
if (!_stringBuffers.empty()) { if (!_stringBuffers.empty()) {
buffer = _stringBuffers.back(); buffer.reset(_stringBuffers.back());
_stringBuffers.pop_back();
// we can release the lock here already
guard.unlock();
TRI_ASSERT(buffer != nullptr); TRI_ASSERT(buffer != nullptr);
TRI_ASSERT(buffer->length() == 0); TRI_ASSERT(buffer->length() == 0);
@ -558,19 +546,17 @@ StringBuffer* SocketTask::leaseStringBuffer(size_t length) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
} }
} }
_stringBuffers.pop_back();
} else { } else {
buffer = new StringBuffer(length, false); buffer.reset(new StringBuffer(length, false));
} }
TRI_ASSERT(buffer != nullptr); TRI_ASSERT(buffer != nullptr);
// still check for safety reasons // still check for safety reasons
if (buffer->capacity() >= length) { if (buffer->capacity() >= length) {
return buffer; return buffer.release();
} }
delete buffer;
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
} }
@ -580,21 +566,20 @@ void SocketTask::returnStringBuffer(StringBuffer* buffer) {
if (_stringBuffers.size() > 4 || buffer->capacity() >= 4 * 1024 * 1024) { if (_stringBuffers.size() > 4 || buffer->capacity() >= 4 * 1024 * 1024) {
// don't keep too many buffers around and don't hog too much memory // don't keep too many buffers around and don't hog too much memory
delete buffer; guard.unlock();
return;
}
try {
buffer->reset();
_stringBuffers.emplace_back(buffer);
} catch (...) {
delete buffer; delete buffer;
} else {
try {
buffer->reset();
_stringBuffers.emplace_back(buffer);
} catch (...) {
delete buffer;
}
} }
} }
void SocketTask::triggerProcessAll() { void SocketTask::triggerProcessAll() {
// try to process remaining request data // try to process remaining request data
auto self = shared_from_this(); _peer->post([self = shared_from_this(), this] { processAll(); });
_peer->post([self, this] { processAll(); });
} }

View File

@ -127,8 +127,6 @@ void RestBatchHandler::processSubHandlerResult(RestHandler const& handler) {
} }
bool RestBatchHandler::executeNextHandler() { bool RestBatchHandler::executeNextHandler() {
auto self(shared_from_this());
// get authorization header. we will inject this into the subparts // get authorization header. we will inject this into the subparts
std::string const& authorization = _request->header(StaticStrings::Authorization); std::string const& authorization = _request->header(StaticStrings::Authorization);
@ -218,9 +216,13 @@ bool RestBatchHandler::executeNextHandler() {
} }
} }
// now scheduler the real handler // assume a bad lane, so the request is definitely executed via the queues
auto const lane = RequestLane::CLIENT_V8;
// now schedule the real handler
bool ok = bool ok =
SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [this, self, handler]() { SchedulerFeature::SCHEDULER->queue(lane, [this, self = shared_from_this(), handler]() {
// start to work for this handler // start to work for this handler
// ignore any errors here, will be handled later by inspecting the response // ignore any errors here, will be handled later by inspecting the response
try { try {
@ -229,7 +231,7 @@ bool RestBatchHandler::executeNextHandler() {
processSubHandlerResult(*handler); processSubHandlerResult(*handler);
}); });
} catch (...) { } catch (...) {
processSubHandlerResult(*handler.get()); processSubHandlerResult(*handler);
} }
}); });

View File

@ -113,20 +113,25 @@ void Scheduler::runCronThread() {
while (!_cronQueue.empty()) { while (!_cronQueue.empty()) {
// top is a reference to a tuple containing the timepoint and a shared_ptr to the work item // top is a reference to a tuple containing the timepoint and a shared_ptr to the work item
auto const& top = _cronQueue.top(); auto top = _cronQueue.top();
if (top.first < now) { if (top.first < now) {
// It is time to scheduler this task, try to get the lock and obtain a shared_ptr
// If this fails a default WorkItem is constructed which has disabled == true
auto item = top.second.lock();
if (item) {
try {
item->run();
} catch (std::exception const& ex) {
LOG_TOPIC("6d997", WARN, Logger::THREADS) << "caught exception in runCronThread: " << ex.what();
}
}
_cronQueue.pop(); _cronQueue.pop();
guard.unlock();
// It is time to schedule this task, try to get the lock and obtain a shared_ptr
// If this fails a default WorkItem is constructed which has disabled == true
try {
auto item = top.second.lock();
if (item) {
item->run();
}
} catch (std::exception const& ex) {
LOG_TOPIC("6d997", WARN, Logger::THREADS) << "caught exception in runCronThread: " << ex.what();
}
// always lock again, as we are going into the wait_for below
guard.lock();
} else { } else {
auto then = (top.first - now); auto then = (top.first - now);

View File

@ -64,11 +64,11 @@ class Scheduler {
virtual WorkHandle queueDelay(RequestLane lane, clock::duration delay, virtual WorkHandle queueDelay(RequestLane lane, clock::duration delay,
std::function<void(bool canceled)> handler); std::function<void(bool canceled)> handler);
class WorkItem { class WorkItem final {
public: public:
virtual ~WorkItem() { ~WorkItem() {
try { try {
cancel(); cancel();
} catch (...) { } catch (...) {
// destructor... no exceptions allowed here // destructor... no exceptions allowed here
} }
@ -104,7 +104,7 @@ class Scheduler {
} }
} }
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool isDisabled() { return _disable.load(); } bool isDisabled() const { return _disable.load(); }
friend class Scheduler; friend class Scheduler;
#endif #endif
@ -137,7 +137,7 @@ class Scheduler {
typedef std::pair<clock::time_point, std::weak_ptr<WorkItem>> CronWorkItem; typedef std::pair<clock::time_point, std::weak_ptr<WorkItem>> CronWorkItem;
struct CronWorkItemCompare { struct CronWorkItemCompare {
bool operator()(CronWorkItem const& left, CronWorkItem const& right) { bool operator()(CronWorkItem const& left, CronWorkItem const& right) const {
// Reverse order, because std::priority_queue is a max heap. // Reverse order, because std::priority_queue is a max heap.
return right.first < left.first; return right.first < left.first;
} }

View File

@ -41,12 +41,28 @@ using namespace arangodb;
using namespace arangodb::basics; using namespace arangodb::basics;
namespace { namespace {
static uint64_t getTickCount_ns() { uint64_t getTickCount_ns() {
auto now = std::chrono::high_resolution_clock::now(); auto now = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()) return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch())
.count(); .count();
} }
bool isDirectDeadlockLane(RequestLane lane) {
// Some lane have tasks that deadlock because they hold a mutex whil calling queue that must be locked to execute the handler.
// Those tasks can not be executed directly.
//return true;
return lane == RequestLane::TASK_V8
|| lane == RequestLane::CLIENT_V8
|| lane == RequestLane::CLUSTER_V8
|| lane == RequestLane::INTERNAL_LOW
|| lane == RequestLane::SERVER_REPLICATION
|| lane == RequestLane::CLUSTER_ADMIN
|| lane == RequestLane::CLUSTER_INTERNAL
|| lane == RequestLane::AGENCY_CLUSTER
|| lane == RequestLane::CLIENT_AQL;
}
} // namespace } // namespace
namespace arangodb { namespace arangodb {
@ -87,6 +103,7 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
_jobsSubmitted(0), _jobsSubmitted(0),
_jobsDequeued(0), _jobsDequeued(0),
_jobsDone(0), _jobsDone(0),
_jobsDirectExec(0),
_wakeupQueueLength(5), _wakeupQueueLength(5),
_wakeupTime_ns(1000), _wakeupTime_ns(1000),
_definitiveWakeupTime_ns(100000), _definitiveWakeupTime_ns(100000),
@ -100,14 +117,25 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
SupervisedScheduler::~SupervisedScheduler() {} SupervisedScheduler::~SupervisedScheduler() {}
bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler) { bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler) {
size_t queueNo = (size_t)PriorityRequestLane(lane); if (!isDirectDeadlockLane(lane) && (_jobsSubmitted - _jobsDone) < 2) {
_jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
_jobsDequeued.fetch_add(1, std::memory_order_relaxed);
_jobsDirectExec.fetch_add(1, std::memory_order_release);
try {
handler();
_jobsDone.fetch_add(1, std::memory_order_release);
return true;
} catch (...) {
_jobsDone.fetch_add(1, std::memory_order_release);
throw;
}
}
size_t queueNo = static_cast<size_t>(PriorityRequestLane(lane));
TRI_ASSERT(queueNo <= 2); TRI_ASSERT(queueNo <= 2);
TRI_ASSERT(isStopping() == false); TRI_ASSERT(isStopping() == false);
static thread_local uint64_t lastSubmitTime_ns;
bool doNotify = false;
WorkItem* work = new WorkItem(std::move(handler)); WorkItem* work = new WorkItem(std::move(handler));
if (!_queue[queueNo].push(work)) { if (!_queue[queueNo].push(work)) {
@ -115,21 +143,21 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler)
return false; return false;
} }
static thread_local uint64_t lastSubmitTime_ns;
// use memory order release to make sure, pushed item is visible // use memory order release to make sure, pushed item is visible
uint64_t jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_release); uint64_t jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_release);
uint64_t approxQueueLength = _jobsDone - jobsSubmitted; uint64_t approxQueueLength = jobsSubmitted - _jobsDone;
uint64_t now_ns = getTickCount_ns(); uint64_t now_ns = getTickCount_ns();
uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns; uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns;
lastSubmitTime_ns = now_ns; lastSubmitTime_ns = now_ns;
bool doNotify = false;
if (sleepyTime_ns > _definitiveWakeupTime_ns.load(std::memory_order_relaxed)) { if (sleepyTime_ns > _definitiveWakeupTime_ns.load(std::memory_order_relaxed)) {
doNotify = true; doNotify = true;
} else if (sleepyTime_ns > _wakeupTime_ns &&
} else if (sleepyTime_ns > _wakeupTime_ns) { approxQueueLength > _wakeupQueueLength.load(std::memory_order_relaxed)) {
if (approxQueueLength > _wakeupQueueLength.load(std::memory_order_relaxed)) { doNotify = true;
doNotify = true;
}
} }
if (doNotify) { if (doNotify) {
@ -181,7 +209,7 @@ void SupervisedScheduler::shutdown() {
while (_numWorker > 0) { while (_numWorker > 0) {
stopOneThread(); stopOneThread();
} }
int tries = 0; int tries = 0;
while (!cleanupAbandonedThreads()) { while (!cleanupAbandonedThreads()) {
if (++tries > 5 * 5) { if (++tries > 5 * 5) {
@ -301,7 +329,7 @@ bool SupervisedScheduler::cleanupAbandonedThreads() {
i++; i++;
} }
} }
return _abandonedWorkerStates.empty(); return _abandonedWorkerStates.empty();
} }
@ -452,7 +480,8 @@ std::string SupervisedScheduler::infoStatus() const {
return "scheduler threads " + std::to_string(numWorker) + " (" + return "scheduler threads " + std::to_string(numWorker) + " (" +
std::to_string(_numIdleWorker) + "<" + std::to_string(_maxNumWorker) + std::to_string(_numIdleWorker) + "<" + std::to_string(_maxNumWorker) +
") queued " + std::to_string(queueLength); ") queued " + std::to_string(queueLength) +
" directly exec " + std::to_string(_jobsDirectExec.load(std::memory_order_relaxed));
} }
Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const { Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const {
@ -467,8 +496,10 @@ void SupervisedScheduler::addQueueStatistics(velocypack::Builder& b) const {
uint64_t numWorker = _numWorker.load(std::memory_order_relaxed); uint64_t numWorker = _numWorker.load(std::memory_order_relaxed);
uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed) - uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed) -
_jobsDone.load(std::memory_order_relaxed); _jobsDone.load(std::memory_order_relaxed);
uint64_t directExec = _jobsDirectExec.load(std::memory_order_relaxed);
// TODO: previous scheduler filled out a lot more fields, relevant? // TODO: previous scheduler filled out a lot more fields, relevant?
b.add("scheduler-threads", VPackValue(static_cast<int32_t>(numWorker))); b.add("scheduler-threads", VPackValue(numWorker));
b.add("queued", VPackValue(static_cast<int32_t>(queueLength))); b.add("queued", VPackValue(queueLength));
b.add("directExec", VPackValue(directExec));
} }

View File

@ -38,7 +38,7 @@ namespace arangodb {
class SupervisedSchedulerWorkerThread; class SupervisedSchedulerWorkerThread;
class SupervisedSchedulerManagerThread; class SupervisedSchedulerManagerThread;
class SupervisedScheduler : public Scheduler { class SupervisedScheduler final : public Scheduler {
public: public:
SupervisedScheduler(uint64_t minThreads, uint64_t maxThreads, uint64_t maxQueueSize, SupervisedScheduler(uint64_t minThreads, uint64_t maxThreads, uint64_t maxQueueSize,
uint64_t fifo1Size, uint64_t fifo2Size); uint64_t fifo1Size, uint64_t fifo2Size);
@ -65,16 +65,16 @@ class SupervisedScheduler : public Scheduler {
friend class SupervisedSchedulerManagerThread; friend class SupervisedSchedulerManagerThread;
friend class SupervisedSchedulerWorkerThread; friend class SupervisedSchedulerWorkerThread;
struct WorkItem { struct WorkItem final {
std::function<void()> _handler; std::function<void()> _handler;
explicit WorkItem(std::function<void()> const& handler) explicit WorkItem(std::function<void()> const& handler)
: _handler(handler) {} : _handler(handler) {}
explicit WorkItem(std::function<void()>&& handler) explicit WorkItem(std::function<void()>&& handler)
: _handler(std::move(handler)) {} : _handler(std::move(handler)) {}
virtual ~WorkItem() {} ~WorkItem() {}
virtual void operator()() { _handler(); } void operator()() { _handler(); }
}; };
// Since the lockfree queue can only handle PODs, one has to wrap lambdas // Since the lockfree queue can only handle PODs, one has to wrap lambdas
@ -85,6 +85,7 @@ class SupervisedScheduler : public Scheduler {
alignas(64) std::atomic<uint64_t> _jobsSubmitted; alignas(64) std::atomic<uint64_t> _jobsSubmitted;
alignas(64) std::atomic<uint64_t> _jobsDequeued; alignas(64) std::atomic<uint64_t> _jobsDequeued;
alignas(64) std::atomic<uint64_t> _jobsDone; alignas(64) std::atomic<uint64_t> _jobsDone;
alignas(64) std::atomic<uint64_t> _jobsDirectExec;
// During a queue operation there a two reasons to manually wake up a worker // During a queue operation there a two reasons to manually wake up a worker
// 1. the queue length is bigger than _wakeupQueueLength and the last submit time // 1. the queue length is bigger than _wakeupQueueLength and the last submit time

View File

@ -56,7 +56,7 @@ ManagerFeature::ManagerFeature(application_features::ApplicationServer& server)
auto off = std::chrono::seconds(1); auto off = std::chrono::seconds(1);
std::lock_guard<std::mutex> guard(_workItemMutex); std::lock_guard<std::mutex> guard(_workItemMutex);
if (!ApplicationServer::isStopping() && !canceled) { if (!ApplicationServer::isStopping()) {
_workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc); _workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
} }
}; };
@ -69,10 +69,9 @@ void ManagerFeature::prepare() {
} }
void ManagerFeature::start() { void ManagerFeature::start() {
auto off = std::chrono::seconds(1);
Scheduler* scheduler = SchedulerFeature::SCHEDULER; Scheduler* scheduler = SchedulerFeature::SCHEDULER;
if (scheduler != nullptr) { // is nullptr in catch tests if (scheduler != nullptr) { // is nullptr in catch tests
auto off = std::chrono::seconds(1);
std::lock_guard<std::mutex> guard(_workItemMutex); std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem = scheduler->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc); _workItem = scheduler->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
} }
@ -80,15 +79,25 @@ void ManagerFeature::start() {
void ManagerFeature::beginShutdown() { void ManagerFeature::beginShutdown() {
{ {
// when we get here, ApplicationServer::isStopping() will always return
// true already. So it is ok to wait here until the workItem has been
// fully canceled. We are grabbing the mutex here, so the workItem cannot
// reschedule itself if it doesn't have the mutex. If it is executed
// directly afterwards, it will check isStopping(), which will return
// false, so no rescheduled will be performed
// if it doesn't hold the mutex, we will cancel it here (under the mutex)
// and when the callback is executed, it will check isStopping(), which
// will always return false
std::lock_guard<std::mutex> guard(_workItemMutex); std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset(); _workItem.reset();
} }
MANAGER->disallowInserts(); MANAGER->disallowInserts();
// at this point all cursors should have been aborted already // at this point all cursors should have been aborted already
MANAGER->garbageCollect(/*abortAll*/true); MANAGER->garbageCollect(/*abortAll*/true);
// make sure no lingering managed trx remain // make sure no lingering managed trx remain
while (MANAGER->garbageCollect(/*abortAll*/true)) { while (MANAGER->garbageCollect(/*abortAll*/true)) {
LOG_TOPIC("96298", WARN, Logger::TRANSACTIONS) << "still waiting for managed transaction"; LOG_TOPIC("96298", INFO, Logger::TRANSACTIONS) << "still waiting for managed transaction";
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
} }
@ -100,6 +109,7 @@ void ManagerFeature::stop() {
std::lock_guard<std::mutex> guard(_workItemMutex); std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset(); _workItem.reset();
} }
// at this point all cursors should have been aborted already // at this point all cursors should have been aborted already
MANAGER->garbageCollect(/*abortAll*/true); MANAGER->garbageCollect(/*abortAll*/true);
} }

View File

@ -51,7 +51,6 @@ class ManagerFeature final : public application_features::ApplicationFeature {
private: private:
static std::unique_ptr<transaction::Manager> MANAGER; static std::unique_ptr<transaction::Manager> MANAGER;
private:
std::mutex _workItemMutex; std::mutex _workItemMutex;
Scheduler::WorkHandle _workItem; Scheduler::WorkHandle _workItem;

View File

@ -262,9 +262,7 @@ void Task::setParameter(std::shared_ptr<arangodb::velocypack::Builder> const& pa
void Task::setUser(std::string const& user) { _user = user; } void Task::setUser(std::string const& user) { _user = user; }
std::function<void(bool cancelled)> Task::callbackFunction() { std::function<void(bool cancelled)> Task::callbackFunction() {
auto self = shared_from_this(); return [self = shared_from_this(), this](bool cancelled) {
return [self, this](bool cancelled) {
if (cancelled) { if (cancelled) {
MUTEX_LOCKER(guard, _tasksLock); MUTEX_LOCKER(guard, _tasksLock);

View File

@ -0,0 +1,83 @@
/*jshint globalstrict:false, strict:false */
/*global arango, assertTrue, assertFalse, assertEqual */
////////////////////////////////////////////////////////////////////////////////
/// @brief test async requests
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2015 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2015, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
let jsunity = require('jsunity');
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function AsyncRequestSuite () {
'use strict';
return {
testAsyncRequest() {
let res = arango.GET_RAW("/_api/version", { "x-arango-async" : "true" });
assertEqual(202, res.code);
assertFalse(res.headers.hasOwnProperty("x-arango-async-id"));
},
testAsyncRequestStore() {
let res = arango.GET_RAW("/_api/version", { "x-arango-async" : "store" });
assertEqual(202, res.code);
assertTrue(res.headers.hasOwnProperty("x-arango-async-id"));
const id = res.headers["x-arango-async-id"];
let tries = 0;
while (++tries < 30) {
res = arango.PUT_RAW("/_api/job/" + id, "");
if (res.code === 200) {
break;
}
require("internal").sleep(0.5);
}
assertEqual(200, res.code);
},
testAsyncRequestQueueFull() {
let res = arango.PUT_RAW("/_admin/debug/failat/queueFull", "");
if (res.code !== 200) {
// abort test - failure mode is not activated on server
return;
}
try {
res = arango.GET_RAW("/_api/version", { "x-arango-async" : "true" });
assertEqual(503, res.code);
} finally {
arango.DELETE("/_admin/debug/failat/queueFull");
}
},
};
}
jsunity.run(AsyncRequestSuite);
return jsunity.done();