1
0
Fork 0
arangodb/arangod/Dispatcher/Dispatcher.cpp

243 lines
7.1 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Martin Schoenert
////////////////////////////////////////////////////////////////////////////////
#include "Dispatcher.h"
#include "Basics/ConditionLocker.h"
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/logging.h"
#include "Dispatcher/DispatcherQueue.h"
#include "Dispatcher/DispatcherThread.h"
#include "Dispatcher/Job.h"
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the default dispatcher thread
////////////////////////////////////////////////////////////////////////////////
static DispatcherThread* CreateDispatcherThread(DispatcherQueue* queue) {
return new DispatcherThread(queue);
}
Dispatcher::Dispatcher(Scheduler* scheduler)
: _scheduler(scheduler), _stopping(false) {
_queues.resize(SYSTEM_QUEUE_SIZE, nullptr);
}
Dispatcher::~Dispatcher() {
for (size_t i = 0; i < SYSTEM_QUEUE_SIZE; ++i) {
delete _queues[i];
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief adds the standard queue
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::addStandardQueue(size_t nrThreads, size_t maxSize) {
TRI_ASSERT(_queues[STANDARD_QUEUE] == nullptr);
_queues[STANDARD_QUEUE] =
new DispatcherQueue(_scheduler, this, STANDARD_QUEUE,
CreateDispatcherThread, nrThreads, maxSize);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief adds the AQL queue (used for the cluster)
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::addAQLQueue(size_t nrThreads, size_t maxSize) {
TRI_ASSERT(_queues[AQL_QUEUE] == nullptr);
_queues[AQL_QUEUE] = new DispatcherQueue(
_scheduler, this, AQL_QUEUE, CreateDispatcherThread, nrThreads, maxSize);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief starts a new named queue
///
/// This is not thread safe. Only used during initialization.
////////////////////////////////////////////////////////////////////////////////
int Dispatcher::addExtraQueue(size_t identifier, size_t nrThreads,
size_t maxSize) {
if (identifier == 0) {
return TRI_ERROR_QUEUE_ALREADY_EXISTS;
}
size_t n = identifier + (SYSTEM_QUEUE_SIZE - 1);
if (_queues.size() <= n) {
_queues.resize(n + 1, nullptr);
}
if (_queues[n] != nullptr) {
return TRI_ERROR_QUEUE_ALREADY_EXISTS;
}
if (_stopping != 0) {
return TRI_ERROR_DISPATCHER_IS_STOPPING;
}
_queues[n] = new DispatcherQueue(_scheduler, this, n, CreateDispatcherThread,
nrThreads, maxSize);
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief adds a new job
////////////////////////////////////////////////////////////////////////////////
int Dispatcher::addJob(std::unique_ptr<Job>& job) {
job->requestStatisticsAgentSetQueueStart();
// do not start new jobs if we are already shutting down
if (_stopping.load(memory_order_relaxed)) {
return TRI_ERROR_DISPATCHER_IS_STOPPING;
}
// try to find a suitable queue
size_t qnr = job->queue();
DispatcherQueue* queue;
if (qnr >= _queues.size() || (queue = _queues[qnr]) == nullptr) {
LOG_WARNING("unknown queue '%lu'", (unsigned long)qnr);
return TRI_ERROR_QUEUE_UNKNOWN;
}
// log success, but do this BEFORE the real add, because the addJob might
// execute
// and delete the job before we have a chance to log something
LOG_TRACE("added job %p to queue '%lu'", (void*)(job.get()),
(unsigned long)qnr);
// add the job to the list of ready jobs
return queue->addJob(job);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief tries to cancel a job
////////////////////////////////////////////////////////////////////////////////
bool Dispatcher::cancelJob(uint64_t jobId) {
bool done = false;
for (size_t i = 0; !done && i < _queues.size(); ++i) {
DispatcherQueue* queue = _queues[i];
if (queue != nullptr) {
done = queue->cancelJob(jobId);
}
}
return done;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief begins shutdown process
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::beginShutdown() {
if (_stopping) {
return;
}
LOG_DEBUG("beginning shutdown sequence of dispatcher");
_stopping = true;
for (size_t i = 0; i < _queues.size(); ++i) {
DispatcherQueue* queue = _queues[i];
if (queue != nullptr) {
queue->beginShutdown();
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shut downs the queue
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::shutdown() {
LOG_DEBUG("shutting down the dispatcher");
for (size_t i = 0; i < _queues.size(); ++i) {
DispatcherQueue* queue = _queues[i];
if (queue != nullptr) {
queue->shutdown();
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief reports status of dispatcher queues
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::reportStatus() {
#ifdef TRI_ENABLE_LOGGER
for (size_t i = 0; i < _queues.size(); ++i) {
DispatcherQueue* queue = _queues[i];
if (queue != nullptr) {
LOG_INFO(
"dispatcher queue '%lu': initial = %d, running = %d, waiting = %d, "
"blocked = %d",
(unsigned long)i, (int)queue->_nrThreads, (int)queue->_nrRunning,
(int)queue->_nrWaiting, (int)queue->_nrBlocked);
}
}
#endif
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the process affinity
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::setProcessorAffinity(size_t id,
std::vector<size_t> const& cores) {
DispatcherQueue* queue;
if (id >= _queues.size() || (queue = _queues[id]) == nullptr) {
return;
}
queue->setProcessorAffinity(cores);
}