1
0
Fork 0

Feature/remove job queue thread (#5986)

limiting V8 calls in flight
This commit is contained in:
Frank Celler 2018-08-10 12:17:43 +02:00 committed by GitHub
parent c9ef2b2560
commit a688dc0962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 304 additions and 151 deletions

View File

@ -838,6 +838,7 @@ endif()
# ZLIB_VERSION
# ZLIB_LIBS
# ZLIB_INCLUDE_DIR
add_definitions(-DBOOST_ALL_NO_LIB=1) #disable boost autolink on windows
add_subdirectory(3rdParty)
@ -856,6 +857,7 @@ include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/rocksdb/${ARANGO_ROCKS
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/s2geometry/${ARANGO_S2GEOMETRY_VERSION}/src)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/rocksdb/${ARANGO_ROCKSDB_VERSION})
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/date/include)
# ------------------------------------------------------------------------------
# RocksDB
# ------------------------------------------------------------------------------
@ -880,9 +882,6 @@ include_directories(SYSTEM ${ASIO_INCLUDES})
add_definitions("-DVELOCYPACK_XXHASH=1")
set(V8_LINK_DIRECTORIES "${LINK_DIRECTORIES}" CACHE INTERNAL "" FORCE)
foreach (LINK_DIR ${V8_LINK_DIRECTORIES})
link_directories("${LINK_DIR}")
endforeach()
################################################################################
## ICU
@ -897,16 +896,22 @@ include_directories(SYSTEM ${ICU_INCLUDE_DIR})
include_directories(SYSTEM ${V8_INCLUDE_DIR})
add_definitions("-DARANGODB_V8_VERSION=\"${V8_VERSION}\"")
foreach (LINK_DIR ${V8_LINK_DIRECTORIES})
link_directories("${LINK_DIR}")
endforeach()
################################################################################
## ZLIB
################################################################################
include_directories(SYSTEM ${ZLIB_INCLUDE_DIR})
add_definitions("-DARANGODB_ZLIB_VERSION=\"${ZLIB_VERSION}\"")
link_directories("${PROJECT_BINARY_DIR}/bin")
################################################################################
## cURL
################################################################################
add_definitions(-DCURL_STATICLIB=1)
include_directories(SYSTEM
${CURL_SRC_DIR}/include/

View File

@ -707,7 +707,6 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q
{
VPackObjectBuilder guard(&answerBuilder);
if (operation == "lock") {
// Mark current thread as potentially blocking:
int res = query->trx()->lockCollections();
// let exceptions propagate from here

View File

@ -73,7 +73,7 @@ bool SharedQueryState::execute(std::function<bool()> const& cb) {
// We are shutting down
return false;
}
scheduler->post(_continueCallback);
scheduler->post(_continueCallback, false);
} else {
_wasNotified = true;
guard.signal();

View File

@ -96,7 +96,7 @@ void CacheManagerFeature::validateOptions(
void CacheManagerFeature::start() {
auto scheduler = SchedulerFeature::SCHEDULER;
auto postFn = [scheduler](std::function<void()> fn) -> bool {
scheduler->post(fn);
scheduler->post(fn, false);
return true;
};
_manager.reset(new Manager(postFn, _cacheSize));

View File

@ -167,6 +167,7 @@ void HeartbeatThread::runBackgroundJob() {
{
MUTEX_LOCKER(mutexLocker, *_statusLock);
TRI_ASSERT(_backgroundJobScheduledOrRunning);
if (_launchAnotherBackgroundJob) {
jobNr = ++_backgroundJobsPosted;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail " << jobNr;
@ -174,7 +175,8 @@ void HeartbeatThread::runBackgroundJob() {
// the JobGuard is in the operator() of HeartbeatBackgroundJob
_lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime));
SchedulerFeature::SCHEDULER->post(
HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
} else {
_backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false;
@ -1179,8 +1181,8 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
// the JobGuard is in the operator() of HeartbeatBackgroundJob
_lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime));
SchedulerFeature::SCHEDULER->post(
HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -40,7 +40,7 @@ enum class RequestLane {
TASK_V8
};
enum class RequestPriority : size_t { HIGH = 1, LOW = 2 };
enum class RequestPriority { HIGH, LOW, V8 };
inline RequestPriority PriorityRequestLane(RequestLane lane) {
switch (lane) {
@ -49,7 +49,7 @@ inline RequestPriority PriorityRequestLane(RequestLane lane) {
case RequestLane::CLIENT_AQL:
return RequestPriority::LOW;
case RequestLane::CLIENT_V8:
return RequestPriority::LOW;
return RequestPriority::V8;
case RequestLane::CLIENT_SLOW:
return RequestPriority::LOW;
case RequestLane::AGENCY_INTERNAL:
@ -59,13 +59,13 @@ inline RequestPriority PriorityRequestLane(RequestLane lane) {
case RequestLane::CLUSTER_INTERNAL:
return RequestPriority::HIGH;
case RequestLane::CLUSTER_V8:
return RequestPriority::LOW;
return RequestPriority::V8;
case RequestLane::CLUSTER_ADMIN:
return RequestPriority::LOW;
case RequestLane::SERVER_REPLICATION:
return RequestPriority::LOW;
case RequestLane::TASK_V8:
return RequestPriority::LOW;
return RequestPriority::V8;
}
return RequestPriority::LOW;
}

View File

@ -1593,7 +1593,7 @@ int MMFilesCollection::fillIndexes(
" }, indexes: " + std::to_string(n - 1));
auto poster = [](std::function<void()> fn) -> void {
SchedulerFeature::SCHEDULER->post(fn);
SchedulerFeature::SCHEDULER->post(fn, false);
};
auto queue = std::make_shared<arangodb::basics::LocalTaskQueue>(poster);

View File

@ -325,7 +325,7 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
LOG_TOPIC(WARN, Logger::PREGEL)
<< "No further action taken after receiving all responses";
}
});
}, false);
return VPackBuilder();
}
@ -776,7 +776,7 @@ int Conductor::_sendToAllDBServers(std::string const& path,
PregelFeature::handleWorkerRequest(
_vocbaseGuard.database(), path, message.slice(), response
);
});
}, false);
}
return TRI_ERROR_NO_ERROR;
}

View File

@ -204,7 +204,7 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
scheduler->post([this, i, vertexShard, edgeLookups, vertexOffset] {
TRI_DEFER(_runningThreads--);// exception safe
_loadVertices(i, vertexShard, edgeLookups, vertexOffset);
});
}, false);
// update to next offset
vertexOffset += shardSizes[vertexShard];
}
@ -213,8 +213,8 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
std::this_thread::sleep_for(std::chrono::microseconds(5000));
}
}
scheduler->post(callback);
});
scheduler->post(callback, false);
}, false);
}
template <typename V, typename E>
@ -609,7 +609,7 @@ void GraphStore<V, E>::storeResults(WorkerConfig* config,
<< (TRI_microtime() - now) << "s";
callback();
}
});
}, false);
start = end;
end = end + delta;
if (total < end + delta) { // swallow the rest

View File

@ -294,7 +294,7 @@ void PregelFeature::cleanupWorker(uint64_t executionNumber) {
if (wit != _workers.end()) {
_workers.erase(executionNumber);
}
});
}, false);
}
void PregelFeature::cleanupAll() {

View File

@ -150,7 +150,7 @@ void RecoveryManager::updatedFailedServers() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->post([this, shard] { _renewPrimaryServer(shard); });
scheduler->post([this, shard] { _renewPrimaryServer(shard); }, false);
}
}
}

View File

@ -174,7 +174,8 @@ void Worker<V, E, M>::setupWorker() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->post(
[this, callback] { _graphStore->loadShards(&_config, callback); });
[this, callback] { _graphStore->loadShards(&_config, callback); },
false);
}
}
@ -344,7 +345,7 @@ void Worker<V, E, M>::_startProcessing() {
if (_processVertices(i, vertices) && _state == WorkerState::COMPUTING) {
_finishedProcessing(); // last thread turns the lights out
}
});
}, false);
start = end;
end = end + delta;
if (total < end + delta) { // swallow the rest
@ -721,7 +722,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
_workerAggregators->serializeValues(package);
package.close();
_callConductor(Utils::finishedRecoveryPath, package);
});
}, false);
}
template <typename V, typename E, typename M>
@ -747,7 +748,7 @@ void Worker<V, E, M>::_callConductor(std::string const& path,
scheduler->post([path, message] {
VPackBuilder response;
PregelFeature::handleConductorRequest(path, message.slice(), response);
});
}, false);
} else {
std::shared_ptr<ClusterComm> cc = ClusterComm::instance();
std::string baseUrl =

View File

@ -320,7 +320,7 @@ void Syncer::JobSynchronizer::request(std::function<void()> const& cb) {
});
cb();
});
}, false);
} catch (...) {
// will get here only if Scheduler::post threw
jobDone();

View File

@ -40,7 +40,7 @@ class RestAqlFunctionsHandler : public RestVocbaseBaseHandler {
public:
RestStatus execute() override;
char const* name() const override final { return "RestAqlFunctionsHandler"; }
RequestLane lane() const override final { return RequestLane::CLIENT_SLOW; }
RequestLane lane() const override final { return RequestLane::CLIENT_FAST; }
};
}

View File

@ -61,10 +61,11 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
public:
virtual RestStatus execute() override;
virtual RestStatus continueExecute() override;
char const* name() const override { return "RestCursorHandler"; }
RequestLane lane() const override { return RequestLane::CLIENT_AQL; }
RequestLane lane() const override final { return RequestLane::CLIENT_AQL; }
virtual RestStatus continueExecute() override;
#ifdef USE_ENTERPRISE
void shutdownExecute(bool isFinalized) noexcept override;
#endif

View File

@ -79,7 +79,7 @@ class SchedulerManagerThread final : public Thread {
Scheduler* _scheduler;
asio_ns::io_context* _service;
};
}
} // namespace
// -----------------------------------------------------------------------------
// --SECTION-- SchedulerThread
@ -99,8 +99,8 @@ class SchedulerThread : public Thread {
// when we enter this method,
// _nrRunning has already been increased for this thread
LOG_TOPIC(DEBUG, Logger::THREADS) << "started thread: "
<< _scheduler->infoStatus();
LOG_TOPIC(DEBUG, Logger::THREADS)
<< "started thread: " << _scheduler->infoStatus();
// some random delay value to avoid all initial threads checking for
// their deletion at the very same time
@ -115,8 +115,8 @@ class SchedulerThread : public Thread {
try {
_service->run_one();
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::THREADS) << "scheduler loop caught exception: "
<< ex.what();
LOG_TOPIC(ERR, Logger::THREADS)
<< "scheduler loop caught exception: " << ex.what();
} catch (...) {
LOG_TOPIC(ERR, Logger::THREADS)
<< "scheduler loop caught unknown exception";
@ -148,8 +148,8 @@ class SchedulerThread : public Thread {
}
}
LOG_TOPIC(DEBUG, Logger::THREADS) << "stopped (" << _scheduler->infoStatus()
<< ")";
LOG_TOPIC(DEBUG, Logger::THREADS)
<< "stopped (" << _scheduler->infoStatus() << ")";
if (doDecrement) {
// only decrement here if this wasn't already done above
@ -161,7 +161,7 @@ class SchedulerThread : public Thread {
Scheduler* _scheduler;
asio_ns::io_context* _service;
};
}
} // namespace
// -----------------------------------------------------------------------------
// --SECTION-- Scheduler
@ -170,17 +170,21 @@ class SchedulerThread : public Thread {
Scheduler::Scheduler(uint64_t nrMinimum, uint64_t nrMaximum,
uint64_t maxQueueSize, uint64_t fifo1Size,
uint64_t fifo2Size)
: _maxQueueSize(maxQueueSize),
: _queuedV8(0),
_maxQueuedV8(std::max(static_cast<uint64_t>(1), nrMaximum - nrMinimum)),
_maxQueueSize(maxQueueSize),
_counters(0),
_maxFifoSize{fifo1Size, fifo2Size},
_fifo1(_maxFifoSize[0]),
_fifo2(_maxFifoSize[1]),
_fifos{&_fifo1, &_fifo2},
_maxFifoSize{fifo1Size, fifo2Size, fifo2Size},
_fifo1(_maxFifoSize[FIFO1]),
_fifo2(_maxFifoSize[FIFO2]),
_fifo8(_maxFifoSize[FIFO8]),
_fifos{&_fifo1, &_fifo2, &_fifo8},
_minThreads(nrMinimum),
_maxThreads(nrMaximum),
_lastAllBusyStamp(0.0) {
_fifoSize[0] = 0;
_fifoSize[1] = 0;
_fifoSize[FIFO1] = 0;
_fifoSize[FIFO2] = 0;
_fifoSize[FIFO8] = 0;
// setup signal handlers
initializeSignalHandlers();
@ -208,28 +212,80 @@ Scheduler::~Scheduler() {
}
// do not pass callback by reference, might get deleted before execution
void Scheduler::post(std::function<void()> const& callback) {
void Scheduler::post(std::function<void()> const callback, bool isV8,
uint64_t timeout) {
// increment number of queued and guard against exceptions
incQueued();
try {
// capture without self, ioContext will not live longer than scheduler
_ioContext.get()->post([this, callback]() {
JobGuard guard(this);
guard.work();
auto guardQueue = scopeGuard([this]() { decQueued(); });
decQueued();
callback();
});
} catch (...) {
decQueued();
throw;
// increment number of queued V8 jobs and guard against exceptions
if (isV8) {
++_queuedV8;
}
auto guardV8 = scopeGuard([this, isV8]() {
if (isV8) {
--_queuedV8;
}
});
// capture without self, ioContext will not live longer than scheduler
_ioContext->post([this, callback, isV8, timeout]() {
// at the end (either success or exception),
// reduce number of queued V8
auto guard = scopeGuard([this, isV8]() {
if (isV8) {
--_queuedV8;
}
});
// reduce number of queued now
decQueued();
// start working
JobGuard jobGuard(this);
jobGuard.work();
if (isV8 && _queuedV8 > _maxQueuedV8 &&
numWorking(getCounters()) >= static_cast<uint64_t>(_maxQueuedV8)) {
// this must be done before requeuing the job
guard.fire();
// in case we queued more V8 jobs in the scheduler than desired this
// job is put back into the scheduler queue. An exponential backoff is
// used with a maximum of 256ms. Initial the timeout will be zero.
auto t = timeout;
if (t == 0) {
t = 1;
} else if (t <= 200) {
t *= 2;
}
std::shared_ptr<asio_ns::deadline_timer> timer(
newDeadlineTimer(boost::posix_time::millisec(timeout)));
timer->async_wait(
[this, callback, isV8, t](const asio::error_code& error) {
if (error != asio::error::operation_aborted) {
post(callback, isV8, t);
}
});
return;
}
callback();
});
// no exception happen, cancel guards
guardV8.cancel();
guardQueue.cancel();
}
// do not pass callback by reference, might get deleted before execution
void Scheduler::post(asio_ns::io_context::strand& strand,
std::function<void()> const& callback) {
std::function<void()> const callback) {
incQueued();
try {
@ -253,26 +309,39 @@ bool Scheduler::queue(RequestPriority prio,
bool ok = true;
switch (prio) {
// If there is anything in the fifo1 or if the scheduler
// queue is already full, then append it to the fifo1.
// Otherwise directly queue it.
//
// This does not care if there is anything in fifo2 or
// fifo8 because these queue have lower priority.
case RequestPriority::HIGH:
if (0 < _fifoSize[0]) {
ok = pushToFifo(static_cast<int>(prio), callback);
} else if (canPostDirectly()) {
post(callback);
if (0 < _fifoSize[FIFO1] || !canPostDirectly()) {
ok = pushToFifo(FIFO1, callback, false);
} else {
ok = pushToFifo(static_cast<int>(prio), callback);
post(callback, false);
}
break;
// If there is anything in the fifo1, fifo2, fifo8
// or if the scheduler queue is already full, then
// append it to the fifo2. Otherewise directly queue
// it.
case RequestPriority::LOW:
if (0 < _fifoSize[0]) {
ok = pushToFifo(static_cast<int>(prio), callback);
} else if (0 < _fifoSize[1]) {
ok = pushToFifo(static_cast<int>(prio), callback);
} else if (canPostDirectly()) {
post(callback);
if (0 < _fifoSize[FIFO1] || 0 < _fifoSize[FIFO8] ||
0 < _fifoSize[FIFO2] || !canPostDirectly()) {
ok = pushToFifo(FIFO2, callback, false);
} else {
pushToFifo(static_cast<int>(prio), callback);
post(callback, false);
}
break;
// Also push V8 requests to the fifo2. Even if we could
// queue directly.
case RequestPriority::V8:
ok = pushToFifo(FIFO2, callback, true);
break;
default:
TRI_ASSERT(false);
break;
@ -283,50 +352,68 @@ bool Scheduler::queue(RequestPriority prio,
void Scheduler::drain() {
while (canPostDirectly()) {
bool found = popFifo(1);
bool found = popFifo(FIFO1);
if (!found) {
found = popFifo(2);
found = popFifo(FIFO8);
if (!found) {
break;
found = popFifo(FIFO2);
} else if (canPostDirectly()) {
// There is still enough space in the scheduler queue. Queue
// one more.
popFifo(FIFO2);
}
}
if (!found) {
break;
}
}
}
void Scheduler::addQueueStatistics(velocypack::Builder& b) const {
auto counters = getCounters();
b.add("scheduler-threads",
VPackValue(static_cast<int32_t>(numRunning(counters))));
b.add("in-progress", VPackValue(static_cast<int32_t>(numWorking(counters))));
b.add("queued", VPackValue(static_cast<int32_t>(numQueued(counters))));
b.add("queue-size", VPackValue(static_cast<int32_t>(_maxQueueSize)));
b.add("current-fifo1", VPackValue(static_cast<int32_t>(_fifoSize[0])));
b.add("fifo1-size", VPackValue(static_cast<int32_t>(_maxFifoSize[0])));
b.add("current-fifo2", VPackValue(static_cast<int32_t>(_fifoSize[1])));
b.add("fifo2-size", VPackValue(static_cast<int32_t>(_maxFifoSize[1])));
b.add("scheduler-threads", VPackValue(numRunning(counters)));
b.add("in-progress", VPackValue(numWorking(counters)));
b.add("queued", VPackValue(numQueued(counters)));
b.add("queue-size", VPackValue(_maxQueueSize));
b.add("current-fifo1", VPackValue(_fifoSize[FIFO1]));
b.add("fifo1-size", VPackValue(_maxFifoSize[FIFO1]));
b.add("current-fifo2", VPackValue(_fifoSize[FIFO2]));
b.add("fifo2-size", VPackValue(_maxFifoSize[FIFO2]));
b.add("current-fifo8", VPackValue(_fifoSize[FIFO8]));
b.add("fifo8-size", VPackValue(_maxFifoSize[FIFO8]));
}
Scheduler::QueueStatistics Scheduler::queueStatistics() const {
auto counters = getCounters();
return QueueStatistics{numRunning(counters), numWorking(counters),
numQueued(counters)};
return QueueStatistics{numRunning(counters),
numWorking(counters),
numQueued(counters),
static_cast<uint64_t>(_fifoSize[FIFO1]),
static_cast<uint64_t>(_fifoSize[FIFO8]),
static_cast<uint64_t>(_fifoSize[FIFO8]),
static_cast<uint64_t>(_queuedV8)};
}
std::string Scheduler::infoStatus() {
uint64_t const counters = _counters.load();
return "scheduler threads " + std::to_string(numRunning(counters)) + " (" +
return "scheduler " + std::to_string(numRunning(counters)) + " (" +
std::to_string(_minThreads) + "<" + std::to_string(_maxThreads) +
") in-progress " + std::to_string(numWorking(counters)) + " queued " +
std::to_string(numQueued(counters)) + " (<=" +
std::to_string(_maxQueueSize) + ") fifo1 " +
std::to_string(_fifoSize[0]) + " (<=" + std::to_string(_maxFifoSize[0]) +
") fifo2 " + std::to_string(_fifoSize[1]) + " (<=" +
std::to_string(_maxFifoSize[1]) + ")";
std::to_string(numQueued(counters)) +
" (<=" + std::to_string(_maxQueueSize) + ") V8 " +
std::to_string(_queuedV8) + " (<=" + std::to_string(_maxQueuedV8) +
") F1 " + std::to_string(_fifoSize[FIFO1]) +
" (<=" + std::to_string(_maxFifoSize[FIFO1]) + ") F2 " +
std::to_string(_fifoSize[FIFO2]) +
" (<=" + std::to_string(_maxFifoSize[FIFO2]) + ") F8 " +
std::to_string(_fifoSize[FIFO8]) +
" (<=" + std::to_string(_maxFifoSize[FIFO8]) + ")";
}
bool Scheduler::canPostDirectly() const noexcept {
@ -337,11 +424,13 @@ bool Scheduler::canPostDirectly() const noexcept {
return nrWorking + nrQueued <= _maxQueueSize;
}
bool Scheduler::pushToFifo(size_t fifo, std::function<void()> const& callback) {
size_t p = fifo - 1;
TRI_ASSERT(0 < fifo && p < NUMBER_FIFOS);
bool Scheduler::pushToFifo(int64_t fifo, std::function<void()> const& callback,
bool isV8) {
TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS);
TRI_ASSERT(fifo != FIFO8 || (fifo == FIFO8 && isV8));
auto job = std::make_unique<FifoJob>(callback);
size_t p = static_cast<size_t>(fifo);
auto job = std::make_unique<FifoJob>(callback, isV8);
try {
if (0 < _maxFifoSize[p] && (int64_t)_maxFifoSize[p] <= _fifoSize[p]) {
@ -361,7 +450,7 @@ bool Scheduler::pushToFifo(size_t fifo, std::function<void()> const& callback) {
auto nrQueued = numQueued(counters);
if (0 == nrWorking + nrQueued) {
post([] { /*wakeup call for scheduler thread*/ });
post([] { /*wakeup call for scheduler thread*/ }, false);
}
} catch (...) {
return false;
@ -370,16 +459,30 @@ bool Scheduler::pushToFifo(size_t fifo, std::function<void()> const& callback) {
return true;
}
bool Scheduler::popFifo(size_t fifo) {
int64_t p = fifo - 1;
TRI_ASSERT(0 <= p && p < NUMBER_FIFOS);
bool Scheduler::popFifo(int64_t fifo) {
TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS);
if (fifo == FIFO8 && _queuedV8 >= _maxQueuedV8) {
return false;
}
size_t p = static_cast<size_t>(fifo);
FifoJob* job = nullptr;
bool ok = _fifos[p]->pop(job) && job != nullptr;
if (ok) {
post(job->_callback);
delete job;
auto guard = scopeGuard([job]() {
if (job) {
delete job;
}
});
if (!job->_isV8 || _queuedV8 < _maxQueuedV8) {
post(job->_callback, job->_isV8);
} else {
pushToFifo(FIFO8, job->_callback, job->_isV8);
}
--_fifoSize[p];
}
@ -413,6 +516,8 @@ bool Scheduler::start() {
TRI_ASSERT(0 < _minThreads);
TRI_ASSERT(_minThreads <= _maxThreads);
TRI_ASSERT(0 < _maxQueueSize);
TRI_ASSERT(0 < _maxQueuedV8);
for (uint64_t i = 0; i < _minThreads; ++i) {
{
@ -465,9 +570,8 @@ void Scheduler::shutdown() {
std::this_thread::yield();
// we can be quite generous here with waiting...
// as we are in the shutdown already, we do not care if we need to wait for
// a
// bit longer
// as we are in the shutdown already, we do not care if we need to wait
// for a bit longer
std::this_thread::sleep_for(std::chrono::microseconds(20000));
}
@ -532,11 +636,11 @@ void Scheduler::rebalanceThreads() {
++count;
if (count % 50 == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "rebalancing threads: "
<< infoStatus();
LOG_TOPIC(DEBUG, Logger::THREADS)
<< "rebalancing threads: " << infoStatus();
} else if (count % 5 == 0) {
LOG_TOPIC(TRACE, Logger::THREADS) << "rebalancing threads: "
<< infoStatus();
LOG_TOPIC(TRACE, Logger::THREADS)
<< "rebalancing threads: " << infoStatus();
}
while (true) {

View File

@ -89,11 +89,15 @@ class Scheduler {
uint64_t _running;
uint64_t _working;
uint64_t _queued;
uint64_t _fifo1;
uint64_t _fifo2;
uint64_t _fifo8;
uint64_t _queuedV8;
};
void post(std::function<void()> const& callback);
void post(asio_ns::io_context::strand&,
std::function<void()> const& callback);
void post(std::function<void()> const callback, bool isV8,
uint64_t timeout = 0);
void post(asio_ns::io_context::strand&, std::function<void()> const callback);
bool queue(RequestPriority prio, std::function<void()> const&);
void drain();
@ -147,6 +151,9 @@ class Scheduler {
_counters -= 1ULL << 16;
}
std::atomic<int64_t> _queuedV8;
int64_t const _maxQueuedV8;
// maximal number of running + queued jobs in the Scheduler `io_context`
uint64_t const _maxQueueSize;
@ -175,18 +182,27 @@ class Scheduler {
// queue is full
struct FifoJob {
explicit FifoJob(std::function<void()> const& callback) : _callback(callback) {}
FifoJob(std::function<void()> const& callback, bool isV8)
: _isV8(isV8), _callback(callback) {}
bool const _isV8;
std::function<void()> _callback;
};
bool pushToFifo(size_t fifo, std::function<void()> const& callback);
bool popFifo(size_t fifo);
bool pushToFifo(int64_t fifo, std::function<void()> const& callback,
bool isV8);
bool popFifo(int64_t fifo);
static constexpr int64_t NUMBER_FIFOS = 3;
static constexpr int64_t FIFO1 = 0;
static constexpr int64_t FIFO2 = 1;
static constexpr int64_t FIFO8 = 2;
static int64_t const NUMBER_FIFOS = 2;
uint64_t const _maxFifoSize[NUMBER_FIFOS];
std::atomic<int64_t> _fifoSize[NUMBER_FIFOS];
boost::lockfree::queue<FifoJob*> _fifo1;
boost::lockfree::queue<FifoJob*> _fifo2;
boost::lockfree::queue<FifoJob*> _fifo8;
boost::lockfree::queue<FifoJob*>* _fifos[NUMBER_FIFOS];
// the following methds create tasks in the `io_context`.
@ -297,7 +313,7 @@ class Scheduler {
mutable Mutex _threadCreateLock;
double _lastAllBusyStamp;
};
}
}
} // namespace rest
} // namespace arangodb
#endif

View File

@ -28,8 +28,8 @@
#include "Basics/ConditionVariable.h"
#include "V8/JSLoader.h"
#include <velocypack/Slice.h>
#include <velocypack/Builder.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
struct TRI_vocbase_t;
@ -46,6 +46,7 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
size_t free;
size_t max;
};
public:
static V8DealerFeature* DEALER;
static constexpr ssize_t ANY_CONTEXT = -1;
@ -68,10 +69,10 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
std::string _appPath;
std::string _startupDirectory;
std::vector<std::string> _moduleDirectory;
uint64_t _nrMaxContexts; // maximum number of contexts to create
uint64_t _nrMinContexts; // minimum number of contexts to keep
uint64_t _nrInflightContexts; // number of contexts currently in creation
uint64_t _maxContextInvocations; // maximum number of V8 context invocations
uint64_t _nrMaxContexts; // maximum number of contexts to create
uint64_t _nrMinContexts; // minimum number of contexts to keep
uint64_t _nrInflightContexts; // number of contexts currently in creation
uint64_t _maxContextInvocations; // maximum number of V8 context invocations
bool _allowAdminExecute;
bool _enableJS;
@ -89,11 +90,11 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
/// @brief forceContext == -1 means that any free context may be
/// picked, or a new one will be created if we have not exceeded
/// the maximum number of contexts
/// forceContext == -2 means that any free context may be picked,
/// forceContext == -2 means that any free context may be picked,
/// or a new one will be created if we have not exceeded or exactly
/// reached the maximum number of contexts. this can be used to
/// force the creation of another context for high priority tasks
/// forceContext >= 0 means picking the context with that exact id
/// forceContext >= 0 means picking the context with that exact id
V8Context* enterContext(TRI_vocbase_t*, bool allowUseDatabase,
ssize_t forceContext = ANY_CONTEXT);
void exitContext(V8Context*);
@ -102,15 +103,15 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
std::function<void(v8::Isolate*, v8::Handle<v8::Context>, size_t)>,
TRI_vocbase_t*);
void setMinimumContexts(size_t nr) {
void setMinimumContexts(size_t nr) {
if (nr > _nrMinContexts) {
_nrMinContexts = nr;
_nrMinContexts = nr;
}
}
void setMaximumContexts(size_t nr) {
_nrMaxContexts = nr;
}
uint64_t maximumContexts() const { return _nrMaxContexts; }
void setMaximumContexts(size_t nr) { _nrMaxContexts = nr; }
V8DealerFeature::stats getCurrentContextNumbers();
@ -134,7 +135,8 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
void unblockDynamicContextCreation();
void loadJavaScriptFileInternal(std::string const& file, V8Context* context,
VPackBuilder* builder);
bool loadJavaScriptFileInContext(TRI_vocbase_t*, std::string const& file, V8Context* context, VPackBuilder* builder);
bool loadJavaScriptFileInContext(TRI_vocbase_t*, std::string const& file,
V8Context* context, VPackBuilder* builder);
void prepareLockedContext(TRI_vocbase_t*, V8Context*, bool allowUseDatabase);
void exitContextInternal(V8Context*);
void cleanupLockedContext(V8Context*);
@ -163,15 +165,16 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
std::vector<std::pair<
std::function<void(v8::Isolate*, v8::Handle<v8::Context>, size_t)>,
TRI_vocbase_t*>> _contextUpdates;
TRI_vocbase_t*>>
_contextUpdates;
};
// enters and exits a context and provides an isolate
// in case the passed in isolate is a nullptr
class V8ContextDealerGuard {
public:
explicit V8ContextDealerGuard(Result&, v8::Isolate*&, TRI_vocbase_t*, bool allowModification);
explicit V8ContextDealerGuard(Result&, v8::Isolate*&, TRI_vocbase_t*,
bool allowModification);
V8ContextDealerGuard(V8ContextDealerGuard const&) = delete;
V8ContextDealerGuard& operator=(V8ContextDealerGuard const&) = delete;
~V8ContextDealerGuard();
@ -182,6 +185,6 @@ class V8ContextDealerGuard {
bool _active;
};
}
} // namespace arangodb
#endif

View File

@ -595,7 +595,7 @@ Result Collections::warmup(TRI_vocbase_t& vocbase,
auto idxs = coll.getIndexes();
auto poster = [](std::function<void()> fn) -> void {
SchedulerFeature::SCHEDULER->post(fn);
SchedulerFeature::SCHEDULER->post(fn, false);
};
auto queue = std::make_shared<basics::LocalTaskQueue>(poster);

View File

@ -35,13 +35,22 @@
#define SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) x##y
#define SCOPE_GUARD_TOKEN_PASTE(x, y) SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y)
// helper macros for creating a ScopeGuard using a user-defined lambda or functor
#define TRI_DEFER_FUNC_INTERNAL(func, objname) auto objname = arangodb::scopeGuard(func);
#define TRI_DEFER_FUNC(func) TRI_DEFER_FUNC_INTERNAL(func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__))
// helper macros for creating a ScopeGuard using a user-defined lambda or
// functor
#define TRI_DEFER_FUNC_INTERNAL(func, objname) \
auto objname = arangodb::scopeGuard(func);
#define TRI_DEFER_FUNC(func) \
TRI_DEFER_FUNC_INTERNAL( \
func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__))
// helper macros for creating a capture-all ScopeGuard
#define TRI_DEFER_BLOCK_INTERNAL(func, objname) auto objname = arangodb::scopeGuard([&] { func; });
#define TRI_DEFER_BLOCK(func) TRI_DEFER_BLOCK_INTERNAL(func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__))
#define TRI_DEFER_BLOCK_INTERNAL(func, objname) \
auto objname = arangodb::scopeGuard([&] { func; });
#define TRI_DEFER_BLOCK(func) \
TRI_DEFER_BLOCK_INTERNAL( \
func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__))
// TRI_DEFER currently just maps to TRI_DEFER_BLOCK
// we will fix this later
@ -49,23 +58,21 @@
namespace arangodb {
template<class T>
class ScopeGuard {
template <class T>
class ScopeGuard {
public:
// prevent empty construction
ScopeGuard() = delete;
// prevent copying
ScopeGuard(ScopeGuard const&) = delete;
ScopeGuard& operator=(ScopeGuard const&) = delete;
ScopeGuard(T&& func) noexcept
: _func(std::move(func)),
_active(true) {}
ScopeGuard(T&& func) noexcept : _func(std::move(func)), _active(true) {}
ScopeGuard(ScopeGuard&& other) noexcept(std::is_nothrow_move_constructible<T>::value)
: _func(std::move_if_noexcept(other._func)),
_active(other._active) {
ScopeGuard(ScopeGuard&& other) noexcept(
std::is_nothrow_move_constructible<T>::value)
: _func(std::move_if_noexcept(other._func)), _active(other._active) {
other.cancel();
}
@ -84,6 +91,20 @@ class ScopeGuard {
// make the guard not trigger the function at scope exit
void cancel() noexcept { _active = false; }
// make the guard fire now and deactivate
void fire() noexcept {
if (active()) {
_active = false;
try {
// call the scope exit function
_func();
} catch (...) {
// we must not throw in destructors
}
}
}
// whether or not the guard will trigger the function at scope exit
bool active() const noexcept { return _active; }
@ -95,10 +116,11 @@ class ScopeGuard {
bool _active;
};
template<class T> ScopeGuard<T> scopeGuard(T&& f) {
template <class T>
ScopeGuard<T> scopeGuard(T&& f) {
return ScopeGuard<T>(std::move(f));
}
} // namespace
} // namespace arangodb
#endif