1
0
Fork 0

Bug fix/scheduling et al (#3161)

* added V8 context lifetime control options `--javascript.v8-contexts-max-invocations` and `--javascript.v8-contexts-max-age`

* make thread scheduling take into account most of the tasks dispatched via the io service
This commit is contained in:
Jan 2017-08-30 10:40:02 +02:00 committed by Frank Celler
parent ec2191cad4
commit 1ace247273
16 changed files with 256 additions and 97 deletions

View File

@ -1,6 +1,18 @@
devel
-----
* added V8 context lifetime control options
`--javascript.v8-contexts-max-invocations` and `--javascript.v8-contexts-max-age`
These options allow specifying after how many invocations a used V8 context is
disposed, or after what time a V8 context is disposed automatically after its
creation. If either of the two thresholds is reached, a V8 context will be
disposed.
The default value of `--javascript.v8-contexts-max-invocations` is 0, meaning that
the maximum number of invocations per context is unlimited. The default value
for `--javascript.v8-contexts-max-age` is 60 seconds.
* fixed wrong ui cluster health information
* fixed issue #3070: Add index in _jobs collection

View File

@ -496,6 +496,23 @@ value, but it may go up as high as specified via the option `--javascript.v8-con
When there are unused V8 contexts that linger around and the number of V8 contexts
is greater than `--javascript.v8-contexts-minimum` the server's garbage collector
thread will automatically delete them.
`--javascript.v8-contexts-max-invocations`
Specifies the maximum number of invocations after which a used V8 context is
disposed. The default value of `--javascript.v8-contexts-max-invocations` is 0,
meaning that the maximum number of invocations per context is unlimited.
`--javascript.v8-contexts-max-age`
Specifies the time duration (in seconds) after which time a V8 context is disposed
automatically after its creation. If the time is elapsed, the context will be disposed.
The default value for `--javascript.v8-contexts-max-age` is 60 seconds.
If both `--javascript.v8-contexts-max-invocations` and `--javascript.v8-contexts-max-age`
are set, then the context will be destroyed when either of the specified threshold
values is reached.
### Garbage collection frequency (time-based)

View File

@ -424,8 +424,7 @@ void ClusterFeature::start() {
// start heartbeat thread
_heartbeatThread = std::make_shared<HeartbeatThread>(
_agencyCallbackRegistry.get(), _heartbeatInterval * 1000, 5,
SchedulerFeature::SCHEDULER->ioService());
_agencyCallbackRegistry.get(), _heartbeatInterval * 1000, 5);
if (!_heartbeatThread->init() || !_heartbeatThread->start()) {
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "heartbeat could not connect to agency endpoints ("

View File

@ -60,8 +60,7 @@ std::atomic<bool> HeartbeatThread::HasRunOnce(false);
HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
uint64_t interval,
uint64_t maxFailsBeforeWarning,
boost::asio::io_service* ioService)
uint64_t maxFailsBeforeWarning)
: Thread("Heartbeat"),
_agencyCallbackRegistry(agencyCallbackRegistry),
_statusLock(std::make_shared<Mutex>()),
@ -77,7 +76,6 @@ HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
_currentVersions(0, 0),
_desiredVersions(std::make_shared<AgencyVersions>(0, 0)),
_wasNotified(false),
_ioService(ioService),
_backgroundJobsPosted(0),
_backgroundJobsLaunched(0),
_backgroundJobScheduledOrRunning(false),
@ -155,8 +153,7 @@ void HeartbeatThread::runBackgroundJob() {
_launchAnotherBackgroundJob = false;
// the JobGuard is in the operator() of HeartbeatBackgroundJob
_ioService->post(HeartbeatBackgroundJob(shared_from_this(),
TRI_microtime()));
SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), TRI_microtime()));
} else {
_backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false;
@ -842,8 +839,7 @@ void HeartbeatThread::syncDBServerStatusQuo() {
// the JobGuard is in the operator() of HeartbeatBackgroundJob
_lastSyncTime = TRI_microtime();
_ioService->
post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime));
SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime));
}

View File

@ -52,7 +52,7 @@ class HeartbeatThread : public Thread,
public std::enable_shared_from_this<HeartbeatThread> {
public:
HeartbeatThread(AgencyCallbackRegistry*, uint64_t interval,
uint64_t maxFailsBeforeWarning, boost::asio::io_service*);
uint64_t maxFailsBeforeWarning);
~HeartbeatThread();
public:
@ -222,19 +222,13 @@ class HeartbeatThread : public Thread,
bool _wasNotified;
//////////////////////////////////////////////////////////////////////////////
/// @brief the io_service to start background jobs
//////////////////////////////////////////////////////////////////////////////
boost::asio::io_service* _ioService;
//////////////////////////////////////////////////////////////////////////////
/// @brief number of background jobs that have been posted to the ioService
/// @brief number of background jobs that have been posted to the scheduler
//////////////////////////////////////////////////////////////////////////////
std::atomic<uint64_t> _backgroundJobsPosted;
//////////////////////////////////////////////////////////////////////////////
/// @brief number of background jobs that have been launched by the ioService
/// @brief number of background jobs that have been launched by the scheduler
//////////////////////////////////////////////////////////////////////////////
std::atomic<uint64_t> _backgroundJobsLaunched;

View File

@ -587,11 +587,9 @@ void GraphStore<V, E>::storeResults(WorkerConfig* config,
size_t start = 0, end = delta;
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
boost::asio::io_service* ioService = SchedulerFeature::SCHEDULER->ioService();
TRI_ASSERT(ioService != nullptr);
do {
_runningThreads++;
ioService->post([this, start, end, now, callback] {
SchedulerFeature::SCHEDULER->post([this, start, end, now, callback] {
try {
RangeIterator<VertexEntry> it = vertexIterator(start, end);
_storeVertices(_config->globalShardIDs(), it);

View File

@ -47,41 +47,41 @@ class JobGuard : public SameThreadAsserter {
void work() {
TRI_ASSERT(!_isWorkingFlag);
if (0 == _isWorking) {
if (0 == _isWorking++) {
_scheduler->workThread();
}
++_isWorking;
_isWorkingFlag = true;
}
void block() {
TRI_ASSERT(!_isBlockedFlag);
if (0 == _isBlocked) {
if (0 == _isBlocked++) {
_scheduler->blockThread();
}
++_isBlocked;
_isBlockedFlag = true;
}
private:
void release() {
if (_isWorkingFlag) {
--_isWorking;
_isWorkingFlag = false;
if (0 == _isWorking) {
if (0 == --_isWorking) {
// if this is the last JobGuard we inform the
// scheduler that the thread is back to idle
_scheduler->unworkThread();
}
}
if (_isBlockedFlag) {
--_isBlocked;
_isBlockedFlag = false;
if (0 == _isBlocked) {
if (0 == --_isBlocked) {
// if this is the last JobGuard we inform the
// scheduler that the thread is now unblocked
_scheduler->unblockThread();
}
}

View File

@ -198,12 +198,14 @@ bool Scheduler::start(ConditionVariable* cv) {
TRI_ASSERT(_nrMinimum <= _nrMaximum);
for (uint64_t i = 0; i < _nrMinimum; ++i) {
MUTEX_LOCKER(locker, _threadCreateLock);
incRunning();
{
MUTEX_LOCKER(locker, _threadCreateLock);
incRunning();
}
try {
startNewThread();
} catch (...) {
MUTEX_LOCKER(locker, _threadCreateLock);
decRunning();
throw;
}
@ -281,6 +283,9 @@ bool Scheduler::stopThreadIfTooMany(double now) {
// make sure no extra threads are created while we check the timestamp
// and while we modify nrRunning
uint64_t const queueCap = std::max(uint64_t(1), uint64_t(_nrMaximum / 4));
uint64_t const nrQueued = std::min(_nrQueued.load(), queueCap);
MUTEX_LOCKER(locker, _threadCreateLock);
// fetch all counters in one atomic operation
@ -288,7 +293,6 @@ bool Scheduler::stopThreadIfTooMany(double now) {
uint64_t const nrRunning = numRunning(counters);
uint64_t const nrBlocked = numBlocked(counters);
uint64_t const nrWorking = numWorking(counters);
uint64_t const nrQueued = _nrQueued;
if (nrRunning <= _nrMinimum + nrBlocked) {
// don't stop a thread if we already reached the minimum
@ -308,7 +312,7 @@ bool Scheduler::stopThreadIfTooMany(double now) {
// set the all busy stamp. this avoids that we shut down all threads
// at the same time
if (_lastAllBusyStamp < MIN_SECONDS / 2.0) {
if (_lastAllBusyStamp < now - MIN_SECONDS / 2.0) {
_lastAllBusyStamp = now - MIN_SECONDS / 2.0;
}
@ -370,32 +374,45 @@ void Scheduler::rebalanceThreads() {
} else if (count % 5 == 0) {
LOG_TOPIC(TRACE, Logger::THREADS) << "rebalancing threads: " << infoStatus();
}
uint64_t const queueCap = std::max(uint64_t(1), uint64_t(_nrMaximum / 4));
while (true) {
{
double const now = TRI_microtime();
uint64_t const nrQueued = std::min(_nrQueued.load(), queueCap);
MUTEX_LOCKER(locker, _threadCreateLock);
uint64_t const counters = _counters.load();
uint64_t const nrRunning = numRunning(counters);
uint64_t const nrWorking = numWorking(counters);
uint64_t const nrQueued = _nrQueued;
uint64_t const nrBlocked = numBlocked(counters);
if (nrRunning >= std::max(_nrMinimum, nrWorking + nrQueued)) {
if (nrRunning >= std::max(_nrMinimum, nrWorking + nrQueued)) { // + std::min(nrBlocked, uint64_t(8)))) {
// all threads are working, and none are blocked. so there is no
// need to start a new thread now
if (nrWorking == nrRunning) {
// all threads maxed out
// still note that all threads are maxed out
_lastAllBusyStamp = now;
}
break;
}
if (nrRunning >= _nrMaximum + nrBlocked) {
// reached the maximum now
break;
}
if (isStopping(counters)) {
// do not start any new threads in case we are already shutting down
break;
}
// LOG_TOPIC(ERR, Logger::THREADS) << "starting new thread. nrRunning: " << nrRunning << ", nrWorking: " << nrWorking << ", nrBlocked: " << nrBlocked << ", nrQueued: " << nrQueued;
// all threads maxed out
// all threads are maxed out
_lastAllBusyStamp = now;
// increase nrRunning by one here already, while holding the lock
incRunning();

View File

@ -99,26 +99,45 @@ class Scheduler {
std::string infoStatus();
inline void queueJob() noexcept { ++_nrQueued; }
inline void unqueueJob() noexcept {
if (--_nrQueued == UINT64_MAX) {
TRI_ASSERT(false);
}
}
private:
void startNewThread();
static void initializeSignalHandlers();
private:
void setStopping() { _counters |= (1ULL << 63); }
inline void incRunning() { _counters += 1ULL << 0; }
inline void decRunning() { _counters -= 1ULL << 0; }
// we store most of the threads status info in a single atomic uint64_t
// the encoding of the values inside this variable is (left to right means
// high to low bytes):
//
// AA BB CC DD
//
// we use the lowest 2 bytes (DD) to store the number of running threads
// the next lowest bytes (CC) are used to store the number of currently working threads
// the next bytes (BB) are used to store the number of currently blocked threads
// the highest bytes (AA) are used only to encode a stopping bit. when this bit is
// set, the scheduler is stopping (or already stopped)
inline void setStopping() noexcept { _counters |= (1ULL << 63); }
inline void workThread() { _counters += 1ULL << 16; }
inline void unworkThread() { _counters -= 1ULL << 16; }
inline void incRunning() noexcept { _counters += 1ULL << 0; }
inline void decRunning() noexcept { _counters -= 1ULL << 0; }
inline void blockThread() { _counters += 1ULL << 32; }
inline void unblockThread() { _counters -= 1ULL << 32; }
inline uint64_t numRunning(uint64_t value) const { return value & 0xFFFFULL; }
inline uint64_t numWorking(uint64_t value) const { return (value >> 16) & 0xFFFFULL; }
inline uint64_t numBlocked(uint64_t value) const { return (value >> 32) & 0xFFFFULL; }
inline bool isStopping(uint64_t value) { return (value & (1ULL << 63)) != 0; }
inline void workThread() noexcept { _counters += 1ULL << 16; }
inline void unworkThread() noexcept { _counters -= 1ULL << 16; }
inline void blockThread() noexcept { _counters += 1ULL << 32; }
inline void unblockThread() noexcept { _counters -= 1ULL << 32; }
inline uint64_t numRunning(uint64_t value) const noexcept { return value & 0xFFFFULL; }
inline uint64_t numWorking(uint64_t value) const noexcept { return (value >> 16) & 0xFFFFULL; }
inline uint64_t numBlocked(uint64_t value) const noexcept { return (value >> 32) & 0xFFFFULL; }
inline bool isStopping(uint64_t value) const noexcept { return (value & (1ULL << 63)) != 0; }
void startIoService();
void startRebalancer();
@ -135,10 +154,8 @@ class Scheduler {
// maximal number of outstanding user requests
uint64_t const _nrMaximum;
// current counters
// - the lowest 16 bits contain the number of running threads
// - the next 16 bits contain the number of working threads
// - the next 16 bits contain the number of blocked threads
// current counters. refer to the above description of the
// meaning of its individual bits
std::atomic<uint64_t> _counters;
// number of jobs that are currently been queued, but not worked on

View File

@ -49,8 +49,8 @@ std::string const GlobalContextMethods::CodeWarmupExports =
V8Context::V8Context(size_t id, v8::Isolate* isolate)
: _id(id), _isolate(isolate), _locker(nullptr),
_numExecutions(0), _creationStamp(TRI_microtime()),
_lastGcStamp(0.0), _hasActiveExternals(false) {}
_creationStamp(TRI_microtime()), _lastGcStamp(0.0),
_invocations(0), _invocationsSinceLastGc(0), _hasActiveExternals(false) {}
void V8Context::lockAndEnter() {
TRI_ASSERT(_isolate != nullptr);
@ -60,6 +60,9 @@ void V8Context::lockAndEnter() {
TRI_ASSERT(_locker->IsLocked(_isolate));
TRI_ASSERT(v8::Locker::IsLocked(_isolate));
++_invocations;
++_invocationsSinceLastGc;
}
void V8Context::unlockAndExit() {
@ -78,9 +81,29 @@ bool V8Context::hasGlobalMethodsQueued() {
return !_globalMethods.empty();
}
void V8Context::setCleaned(double stamp) {
_lastGcStamp = stamp;
_invocationsSinceLastGc = 0;
}
double V8Context::age() const {
return TRI_microtime() - _creationStamp;
}
bool V8Context::shouldBeRemoved(double maxAge, uint64_t maxInvocations) const {
if (maxAge > 0.0 && age() > maxAge) {
// context is "too old"
return true;
}
if (maxInvocations > 0 && _invocations >= maxInvocations) {
// context is used often enough
return true;
}
// re-use the context
return false;
}
bool V8Context::addGlobalContextMethod(
std::string const& method) {

View File

@ -112,21 +112,27 @@ class V8Context {
public:
V8Context(size_t id, v8::Isolate* isolate);
size_t id() const { return _id; }
bool isDefault() const { return _id == 0; }
bool isUsed() const { return _locker != nullptr; }
double age() const;
void lockAndEnter();
void unlockAndExit();
uint64_t invocations() const { return _invocations; }
uint64_t invocationsSinceLastGc() const { return _invocationsSinceLastGc; }
bool shouldBeRemoved(double maxAge, uint64_t maxInvocations) const;
bool hasGlobalMethodsQueued();
void setCleaned(double stamp);
size_t const _id;
v8::Persistent<v8::Context> _context;
v8::Isolate* _isolate;
v8::Locker* _locker;
size_t _numExecutions;
double const _creationStamp;
double _lastGcStamp;
uint64_t _invocations;
uint64_t _invocationsSinceLastGc;
bool _hasActiveExternals;
Mutex _globalMethodsLock;

View File

@ -94,9 +94,11 @@ V8DealerFeature::V8DealerFeature(
: application_features::ApplicationFeature(server, "V8Dealer"),
_gcFrequency(30.0),
_gcInterval(1000),
_maxContextAge(60.0),
_nrMaxContexts(0),
_nrMinContexts(0),
_nrInflightContexts(0),
_maxContextInvocations(0),
_allowAdminExecute(false),
_ok(false),
_nextId(0),
@ -129,7 +131,7 @@ void V8DealerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"--javascript.gc-interval",
"JavaScript request-based garbage collection interval (each x requests)",
new UInt64Parameter(&_gcInterval));
options->addOption("--javascript.app-path", "directory for Foxx applications",
new StringParameter(&_appPath));
@ -152,6 +154,16 @@ void V8DealerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"--javascript.v8-contexts-minimum",
"minimum number of V8 contexts that keep available for executing JavaScript actions",
new UInt64Parameter(&_nrMinContexts));
options->addHiddenOption(
"--javascript.v8-contexts-max-invocations",
"maximum number of invocations for each V8 context before it is disposed",
new UInt64Parameter(&_maxContextInvocations));
options->addHiddenOption(
"--javascript.v8-contexts-max-age",
"maximum age for each V8 context (in seconds) before it is disposed",
new DoubleParameter(&_maxContextAge));
options->addHiddenOption(
"--javascript.allow-admin-execute",
@ -396,7 +408,7 @@ void V8DealerFeature::collectGarbage() {
if (context == nullptr && !_dirtyContexts.empty()) {
context = _dirtyContexts.back();
_dirtyContexts.pop_back();
if (context->_numExecutions < 50 && !context->_hasActiveExternals) {
if (context->invocationsSinceLastGc() < 50 && !context->_hasActiveExternals) {
// don't collect this one yet. it doesn't have externals, so there
// is no urge for garbage collection
_freeContexts.emplace_back(context);
@ -426,10 +438,11 @@ void V8DealerFeature::collectGarbage() {
gc->updateGcStamp(lastGc);
if (context != nullptr) {
arangodb::CustomWorkStack custom("V8 GC", (uint64_t)context->_id);
arangodb::CustomWorkStack custom("V8 GC", (uint64_t)context->id());
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "collecting V8 garbage in context #" << context->_id
<< ", numExecutions: " << context->_numExecutions
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "collecting V8 garbage in context #" << context->id()
<< ", invocations total: " << context->invocations()
<< ", invocations since last gc: " << context->invocationsSinceLastGc()
<< ", hasActive: " << context->_hasActiveExternals
<< ", wasDirty: " << wasDirty;
bool hasActiveExternals = false;
@ -460,24 +473,21 @@ void V8DealerFeature::collectGarbage() {
// update garbage collection statistics
context->_hasActiveExternals = hasActiveExternals;
context->_numExecutions = 0;
context->_lastGcStamp = lastGc;
context->setCleaned(lastGc);
{
CONDITION_LOCKER(guard, _contextCondition);
double const minAge = 15.0;
if (_contexts.size() > _nrMinContexts &&
!context->isDefault() &&
context->age() > minAge &&
context->shouldBeRemoved(_maxContextAge, _maxContextInvocations) &&
_contextsModificationBlockers == 0) {
// remove the extra context as it is not needed anymore
_contexts.erase(std::remove_if(_contexts.begin(), _contexts.end(), [&context](V8Context* c) {
return (c->_id == context->_id);
return (c->id() == context->id());
}));
LOG_TOPIC(DEBUG, Logger::V8) << "removed superfluous V8 context #" << context->_id << ", number of contexts is now: " << _contexts.size();
LOG_TOPIC(DEBUG, Logger::V8) << "removed superfluous V8 context #" << context->id() << ", number of contexts is now: " << _contexts.size();
guard.unlock();
shutdownContext(context);
@ -574,7 +584,7 @@ void V8DealerFeature::loadJavaScriptFileInDefaultContext(TRI_vocbase_t* vocbase,
try {
loadJavaScriptFileInternal(file, context, builder);
} catch (...) {
LOG_TOPIC(WARN, Logger::V8) << "caught exception while executing JavaScript file '" << file << "' in context #" << context->_id;
LOG_TOPIC(WARN, Logger::V8) << "caught exception while executing JavaScript file '" << file << "' in context #" << context->id();
throw;
}
}
@ -627,7 +637,7 @@ void V8DealerFeature::enterLockedContext(TRI_vocbase_t* vocbase,
v8g->_allowUseDatabase = allowUseDatabase;
try {
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "entering V8 context " << context->_id;
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "entering V8 context " << context->id();
context->handleGlobalContextMethods();
} catch (...) {
// ignore errors here
@ -669,7 +679,7 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase,
}
for (auto it = _freeContexts.begin(); it != _freeContexts.end(); ++it) {
if ((*it)->_id == id) {
if ((*it)->id() == id) {
context = (*it);
_freeContexts.erase(it);
_busyContexts.emplace(context);
@ -680,7 +690,7 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase,
if (context == nullptr) {
// still not found
for (auto it = _dirtyContexts.begin(); it != _dirtyContexts.end(); ++it) {
if ((*it)->_id == id) {
if ((*it)->id() == id) {
context = (*it);
_dirtyContexts.erase(it);
_busyContexts.emplace(context);
@ -699,7 +709,7 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase,
// check if such context exists at all
bool found = false;
for (auto& it : _contexts) {
if (it->_id == id) {
if (it->id() == id) {
found = true;
break;
}
@ -772,7 +782,7 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase,
try {
_freeContexts.push_back(context);
LOG_TOPIC(DEBUG, Logger::V8) << "created additional V8 context #" << context->_id << ", number of contexts is now " << _contexts.size();
LOG_TOPIC(DEBUG, Logger::V8) << "created additional V8 context #" << context->id() << ", number of contexts is now " << _contexts.size();
} catch (...) {
TRI_ASSERT(!_contexts.empty());
_contexts.pop_back();
@ -842,7 +852,7 @@ void V8DealerFeature::exitContextInternal(V8Context* context) {
void V8DealerFeature::exitLockedContext(V8Context* context) {
TRI_ASSERT(context != nullptr);
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "leaving V8 context " << context->_id;
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "leaving V8 context " << context->id();
auto isolate = context->_isolate;
TRI_ASSERT(isolate != nullptr);
@ -878,7 +888,6 @@ void V8DealerFeature::exitLockedContext(V8Context* context) {
{
TRI_GET_GLOBALS();
context->_hasActiveExternals = v8g->hasActiveExternals();
++context->_numExecutions;
TRI_vocbase_t* vocbase = v8g->_vocbase;
TRI_ASSERT(vocbase != nullptr);
@ -941,7 +950,7 @@ void V8DealerFeature::exitContext(V8Context* context) {
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "V8 context has reached GC timeout threshold and will be "
"scheduled for GC";
performGarbageCollection = true;
} else if (context->_numExecutions >= _gcInterval) {
} else if (context->invocationsSinceLastGc() >= _gcInterval) {
LOG_TOPIC(TRACE, arangodb::Logger::V8)
<< "V8 context has reached maximum number of requests and will "
"be scheduled for GC";
@ -1018,13 +1027,13 @@ void V8DealerFeature::applyContextUpdate(V8Context* context) {
{
v8::Context::Scope contextScope(localContext);
p.first(context->_isolate, localContext, context->_id);
p.first(context->_isolate, localContext, context->id());
}
localContext->Exit();
}
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "updated V8 context #" << context->_id;
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "updated V8 context #" << context->id();
}
}
@ -1122,7 +1131,7 @@ V8Context* V8DealerFeature::pickFreeContextForGc() {
for (int i = n - 1; i > 0; --i) {
// check if there's actually anything to clean up in the context
if (_freeContexts[i]->_numExecutions < 50 &&
if (_freeContexts[i]->invocationsSinceLastGc() < 50 &&
!_freeContexts[i]->_hasActiveExternals) {
continue;
}
@ -1273,7 +1282,6 @@ V8Context* V8DealerFeature::buildContext(size_t id) {
static_cast<double>(RandomGenerator::interval(0, 60));
// initialize garbage collection for context
context->_numExecutions = 0;
context->_hasActiveExternals = true;
context->_lastGcStamp = TRI_microtime() + randomWait;
@ -1302,7 +1310,7 @@ bool V8DealerFeature::loadJavaScriptFileInContext(TRI_vocbase_t* vocbase,
try {
loadJavaScriptFileInternal(file, context, builder);
} catch (...) {
LOG_TOPIC(WARN, Logger::V8) << "caught exception while executing JavaScript file '" << file << "' in context #" << context->_id;
LOG_TOPIC(WARN, Logger::V8) << "caught exception while executing JavaScript file '" << file << "' in context #" << context->id();
exitContextInternal(context);
throw;
}
@ -1337,12 +1345,12 @@ void V8DealerFeature::loadJavaScriptFileInternal(std::string const& file, V8Cont
}
}
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "loaded Javascript file '" << file << "' for V8 context #" << context->_id;
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "loaded Javascript file '" << file << "' for V8 context #" << context->id();
}
void V8DealerFeature::shutdownContext(V8Context* context) {
TRI_ASSERT(context != nullptr);
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "shutting down V8 context #" << context->_id;
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "shutting down V8 context #" << context->id();
auto isolate = context->_isolate;
{
@ -1398,7 +1406,7 @@ void V8DealerFeature::shutdownContext(V8Context* context) {
application_features::ApplicationServer::getFeature<V8PlatformFeature>(
"V8Platform")->disposeIsolate(isolate);
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "closed V8 context #" << context->_id;
LOG_TOPIC(TRACE, arangodb::Logger::V8) << "closed V8 context #" << context->id();
delete context;
}

View File

@ -54,12 +54,14 @@ class V8DealerFeature final : public application_features::ApplicationFeature {
private:
double _gcFrequency;
uint64_t _gcInterval;
double _maxContextAge;
std::string _appPath;
std::string _startupDirectory;
std::vector<std::string> _moduleDirectory;
uint64_t _nrMaxContexts; // maximum number of contexts to create
uint64_t _nrMinContexts; // minimum number of contexts to keep
uint64_t _nrInflightContexts; // number of contexts currently in creation
uint64_t _maxContextInvocations; // maximum number of V8 context invocations
bool _allowAdminExecute;
public:

View File

@ -71,6 +71,7 @@ class V8Task : public std::enable_shared_from_this<V8Task> {
static std::shared_ptr<VPackBuilder> registeredTask(std::string const& id);
static std::shared_ptr<VPackBuilder> registeredTasks();
static void shutdownTasks();
static void removeTasksForDatabase(std::string const&);
private:
static Mutex _tasksLock;
@ -79,6 +80,7 @@ class V8Task : public std::enable_shared_from_this<V8Task> {
public:
V8Task(std::string const& id, std::string const& name, TRI_vocbase_t*,
std::string const& command, bool allowUseDatabase);
~V8Task();
public:
void setOffset(double offset);
@ -87,15 +89,20 @@ class V8Task : public std::enable_shared_from_this<V8Task> {
std::shared_ptr<arangodb::velocypack::Builder> const& parameters);
void setUser(std::string const& user);
void start(boost::asio::io_service*);
void start();
void cancel();
std::shared_ptr<VPackBuilder> toVelocyPack() const;
bool databaseMatches(std::string const&) const;
private:
void toVelocyPack(VPackBuilder&) const;
void work();
void queue(std::chrono::microseconds offset);
void unqueue() noexcept;
std::function<void(boost::system::error_code const&)> callbackFunction();
std::string const& name() const { return _name; }
private:
std::string const _id;
@ -115,6 +122,9 @@ class V8Task : public std::enable_shared_from_this<V8Task> {
std::chrono::microseconds _offset;
std::chrono::microseconds _interval;
bool _periodic = false;
Mutex _queueMutex;
bool _queued;
};
Mutex V8Task::_tasksLock;
@ -208,6 +218,24 @@ void V8Task::shutdownTasks() {
_tasks.clear();
}
void V8Task::removeTasksForDatabase(std::string const& name) {
MUTEX_LOCKER(guard, _tasksLock);
for (auto it = _tasks.begin(); it != _tasks.end(); /* no hoisting */) {
if (!(*it).second->databaseMatches(name)) {
++it;
} else {
auto task = (*it).second;
task->cancel();
it = _tasks.erase(it);
}
}
}
bool V8Task::databaseMatches(std::string const& name) const {
return (_vocbaseGuard->vocbase()->name() == name);
}
V8Task::V8Task(std::string const& id, std::string const& name,
TRI_vocbase_t* vocbase, std::string const& command,
bool allowUseDatabase)
@ -218,7 +246,10 @@ V8Task::V8Task(std::string const& id, std::string const& name,
_command(command),
_allowUseDatabase(allowUseDatabase),
_offset(0),
_interval(0) {}
_interval(0),
_queued(false) {}
V8Task::~V8Task() { unqueue(); }
void V8Task::setOffset(double offset) {
_offset = std::chrono::microseconds(static_cast<long long>(offset * 1000000));
@ -246,6 +277,8 @@ V8Task::callbackFunction() {
auto self = shared_from_this();
return [self, this](const boost::system::error_code& error) {
unqueue();
// First tell the scheduler that this thread is working:
JobGuard guard(SchedulerFeature::SCHEDULER);
guard.work();
@ -289,8 +322,8 @@ V8Task::callbackFunction() {
work();
if (_periodic && !SchedulerFeature::SCHEDULER->isStopping()) {
_timer->expires_from_now(_interval);
_timer->async_wait(callbackFunction());
// requeue the task
queue(_interval);
} else {
// in case of one-off tasks or in case of a shutdown, simply
// remove the task from the list
@ -299,23 +332,57 @@ V8Task::callbackFunction() {
};
}
void V8Task::start(boost::asio::io_service* ioService) {
void V8Task::start() {
TRI_ASSERT(ExecContext::CURRENT == nullptr ||
(!_user.empty() && ExecContext::CURRENT->user() == _user));
auto ioService = SchedulerFeature::SCHEDULER->ioService();
_timer.reset(new boost::asio::steady_timer(*ioService));
if (_offset.count() <= 0) {
_offset = std::chrono::microseconds(1);
}
_timer->expires_from_now(_offset);
// initially queue the task
queue(_offset);
}
void V8Task::queue(std::chrono::microseconds offset) {
{
MUTEX_LOCKER(locker, _queueMutex);
TRI_ASSERT(!_queued);
_queued = true;
}
SchedulerFeature::SCHEDULER->queueJob();
_timer->expires_from_now(offset);
_timer->async_wait(callbackFunction());
}
void V8Task::unqueue() noexcept {
bool wasQueued;
{
MUTEX_LOCKER(locker, _queueMutex);
wasQueued = _queued;
if (wasQueued) {
_queued = false;
}
}
if (wasQueued && SchedulerFeature::SCHEDULER != nullptr) {
SchedulerFeature::SCHEDULER->unqueueJob();
}
}
void V8Task::cancel() {
// this will prevent the task from dispatching itself again
_periodic = false;
boost::system::error_code ec;
_timer->cancel(ec);
unqueue();
}
std::shared_ptr<VPackBuilder> V8Task::toVelocyPack() const {
@ -600,9 +667,7 @@ static void JS_RegisterTask(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
// and start
auto ioService = SchedulerFeature::SCHEDULER->ioService();
task->start(ioService);
task->start();
v8::Handle<v8::Value> result = TRI_VPackToV8(isolate, builder->slice());
@ -792,3 +857,5 @@ void TRI_InitV8Dispatcher(v8::Isolate* isolate,
}
void TRI_ShutdownV8Dispatcher() { V8Task::shutdownTasks(); }
void TRI_RemoveDatabaseTasksV8Dispatcher(std::string const& name) { V8Task::removeTasksForDatabase(name); }

View File

@ -31,5 +31,6 @@
void TRI_InitV8Dispatcher(v8::Isolate* isolate, v8::Handle<v8::Context> context);
void TRI_ShutdownV8Dispatcher();
void TRI_RemoveDatabaseTasksV8Dispatcher(std::string const& databaseName);
#endif

View File

@ -37,6 +37,7 @@
#include "V8/v8-vpack.h"
#include "V8Server/V8Context.h"
#include "V8Server/V8DealerFeature.h"
#include "V8Server/v8-dispatcher.h"
#include "VocBase/AuthInfo.h"
#include "VocBase/modes.h"
#include "VocBase/vocbase.h"
@ -450,9 +451,10 @@ arangodb::Result Databases::drop(TRI_vocbase_t* systemVocbase,
return Result(res);
}
TRI_RemoveDatabaseTasksV8Dispatcher(dbName);
// run the garbage collection in case the database held some objects which
// can
// now be freed
// can now be freed
TRI_RunGarbageCollectionV8(isolate, 0.25);
TRI_ExecuteJavaScriptString(