1
0
Fork 0

added x-arango-start-thread header feature

This commit is contained in:
jsteemann 2016-05-19 14:15:59 +02:00
parent 04b1d22440
commit dd0fd3df90
12 changed files with 49 additions and 17 deletions

View File

@ -657,7 +657,7 @@ bool HeartbeatThread::syncDBServerStatusQuo() {
<< "could not schedule dbserver sync - dispatcher gone."; << "could not schedule dbserver sync - dispatcher gone.";
return false; return false;
} }
if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) { if (dispatcher->addJob(job, false) == TRI_ERROR_NO_ERROR) {
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "scheduled dbserver sync"; LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "scheduled dbserver sync";
return true; return true;
} }

View File

@ -110,7 +110,7 @@ int Dispatcher::addExtraQueue(size_t identifier, size_t nrThreads,
/// @brief adds a new job /// @brief adds a new job
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int Dispatcher::addJob(std::unique_ptr<Job>& job) { int Dispatcher::addJob(std::unique_ptr<Job>& job, bool startThread) {
job->requestStatisticsAgentSetQueueStart(); job->requestStatisticsAgentSetQueueStart();
// do not start new jobs if we are already shutting down // do not start new jobs if we are already shutting down
@ -133,7 +133,7 @@ int Dispatcher::addJob(std::unique_ptr<Job>& job) {
LOG(TRACE) << "added job " << (void*)(job.get()) << " to queue '" << qnr << "'"; LOG(TRACE) << "added job " << (void*)(job.get()) << " to queue '" << qnr << "'";
// add the job to the list of ready jobs // add the job to the list of ready jobs
return queue->addJob(job); return queue->addJob(job, startThread);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -102,7 +102,7 @@ class Dispatcher {
/// the response over the network to the caller. /// the response over the network to the caller.
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
int addJob(std::unique_ptr<Job>&); int addJob(std::unique_ptr<Job>&, bool startThread);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief tries to cancel a job /// @brief tries to cancel a job

View File

@ -82,7 +82,7 @@ DispatcherQueue::~DispatcherQueue() {
/// @brief adds a job /// @brief adds a job
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int DispatcherQueue::addJob(std::unique_ptr<Job>& job) { int DispatcherQueue::addJob(std::unique_ptr<Job>& job, bool startThread) {
TRI_ASSERT(job.get() != nullptr); TRI_ASSERT(job.get() != nullptr);
// get next free slot, return false is queue is full // get next free slot, return false is queue is full
@ -125,8 +125,8 @@ int DispatcherQueue::addJob(std::unique_ptr<Job>& job) {
} }
// if all threads are blocked, start a new one - we ignore race conditions // if all threads are blocked, start a new one - we ignore race conditions
else if (notEnoughThreads()) { else if (startThread || notEnoughThreads()) {
startQueueThread(); startQueueThread(startThread);
} }
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
@ -351,7 +351,7 @@ void DispatcherQueue::shutdown() {
/// @brief starts a new queue thread /// @brief starts a new queue thread
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void DispatcherQueue::startQueueThread() { void DispatcherQueue::startQueueThread(bool force) {
DispatcherThread* thread = (*createDispatcherThread)(this); DispatcherThread* thread = (*createDispatcherThread)(this);
if (!_affinityCores.empty()) { if (!_affinityCores.empty()) {
@ -371,12 +371,17 @@ void DispatcherQueue::startQueueThread() {
{ {
MUTEX_LOCKER(mutexLocker, _threadsLock); MUTEX_LOCKER(mutexLocker, _threadsLock);
if (!notEnoughThreads()) { if (!force && !notEnoughThreads()) {
delete thread; delete thread;
return; return;
} }
_startedThreads.insert(thread); try {
_startedThreads.insert(thread);
} catch (...) {
delete thread;
return;
}
++_nrRunning; ++_nrRunning;
} }

View File

@ -65,7 +65,7 @@ class DispatcherQueue {
/// @brief adds a job /// @brief adds a job
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
int addJob(std::unique_ptr<Job>&); int addJob(std::unique_ptr<Job>&, bool startThread);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief removes a job /// @brief removes a job
@ -110,7 +110,7 @@ class DispatcherQueue {
/// @brief starts a new queue thread /// @brief starts a new queue thread
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void startQueueThread(); void startQueueThread(bool force);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief called when a thread has stopped /// @brief called when a thread has stopped

View File

@ -68,6 +68,7 @@ HttpCommTask::HttpCommTask(HttpServer* server, TRI_socket_t socket,
_acceptDeflate(false), _acceptDeflate(false),
_newRequest(true), _newRequest(true),
_isChunked(false), _isChunked(false),
_startThread(false),
_request(nullptr), _request(nullptr),
_httpVersion(GeneralRequest::ProtocolVersion::UNKNOWN), _httpVersion(GeneralRequest::ProtocolVersion::UNKNOWN),
_requestType(GeneralRequest::RequestType::ILLEGAL), _requestType(GeneralRequest::RequestType::ILLEGAL),
@ -115,6 +116,7 @@ HttpCommTask::~HttpCommTask() {
void HttpCommTask::handleResponse(HttpResponse* response) { void HttpCommTask::handleResponse(HttpResponse* response) {
_requestPending = false; _requestPending = false;
_isChunked = false; _isChunked = false;
_startThread = false;
addResponse(response); addResponse(response);
} }
@ -576,6 +578,7 @@ void HttpCommTask::finishedChunked() {
_writeBuffersStats.push_back(nullptr); _writeBuffersStats.push_back(nullptr);
_isChunked = false; _isChunked = false;
_startThread = false;
_requestPending = false; _requestPending = false;
fillWriteBuffer(); fillWriteBuffer();
@ -845,8 +848,15 @@ void HttpCommTask::processRequest() {
<< (StringUtils::escapeUnicode(body)) << "\""; << (StringUtils::escapeUnicode(body)) << "\"";
} }
} }
handler->setTaskId(_taskId, _loop); handler->setTaskId(_taskId, _loop);
std::string const& startThread = _request->header(StaticStrings::StartThread, found);
if (found) {
_startThread = StringUtils::boolean(startThread);
}
// clear request object // clear request object
_request = nullptr; _request = nullptr;
@ -949,6 +959,7 @@ void HttpCommTask::resetState(bool close) {
_newRequest = true; _newRequest = true;
_readRequestBody = false; _readRequestBody = false;
_startThread = false;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -71,6 +71,12 @@ class HttpCommTask : public SocketTask, public RequestStatisticsAgent {
~HttpCommTask(); ~HttpCommTask();
public: public:
//////////////////////////////////////////////////////////////////////////////
/// @brief return whether or not the task desires to start a dispatcher thread
//////////////////////////////////////////////////////////////////////////////
bool startThread() const { return _startThread; }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief handles response /// @brief handles response
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -257,6 +263,12 @@ class HttpCommTask : public SocketTask, public RequestStatisticsAgent {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
bool _isChunked; bool _isChunked;
//////////////////////////////////////////////////////////////////////////////
/// @brief start a separate thread if the task is added to the dispatcher?
//////////////////////////////////////////////////////////////////////////////
bool _startThread;
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief the request with possible incomplete body /// @brief the request with possible incomplete body

View File

@ -220,7 +220,7 @@ bool HttpServer::handleRequestAsync(HttpCommTask* task,
} }
// execute the handler using the dispatcher // execute the handler using the dispatcher
int res = _dispatcher->addJob(job); int res = _dispatcher->addJob(job, task->startThread());
// could not add job to job queue // could not add job to job queue
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
@ -260,7 +260,8 @@ bool HttpServer::handleRequest(HttpCommTask* task,
<< (void*)job.get(); << (void*)job.get();
// add the job to the dispatcher // add the job to the dispatcher
int res = _dispatcher->addJob(job);
int res = _dispatcher->addJob(job, task->startThread());
// job is in queue now // job is in queue now
return res == TRI_ERROR_NO_ERROR; return res == TRI_ERROR_NO_ERROR;

View File

@ -89,7 +89,7 @@ bool V8PeriodicTask::handlePeriod() {
return false; return false;
} }
DispatcherFeature::DISPATCHER->addJob(job); DispatcherFeature::DISPATCHER->addJob(job, false);
return true; return true;
} }

View File

@ -90,7 +90,7 @@ bool V8TimerTask::handleTimeout() {
return false; return false;
} }
int res = DispatcherFeature::DISPATCHER->addJob(job); int res = DispatcherFeature::DISPATCHER->addJob(job, false);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "could not add task " << _command << " to queue"; LOG(WARN) << "could not add task " << _command << " to queue";

View File

@ -30,6 +30,7 @@ std::string const StaticStrings::Binary("binary");
std::string const StaticStrings::Empty(""); std::string const StaticStrings::Empty("");
std::string const StaticStrings::N1800("1800"); std::string const StaticStrings::N1800("1800");
// system attribute names // system attribute names
std::string const StaticStrings::IdString("_id"); std::string const StaticStrings::IdString("_id");
std::string const StaticStrings::KeyString("_key"); std::string const StaticStrings::KeyString("_key");
@ -74,6 +75,7 @@ std::string const StaticStrings::OmitWwwAuthenticate("x-omit-www-authenticate");
std::string const StaticStrings::Origin("origin"); std::string const StaticStrings::Origin("origin");
std::string const StaticStrings::Queue("x-arango-queue"); std::string const StaticStrings::Queue("x-arango-queue");
std::string const StaticStrings::Server("server"); std::string const StaticStrings::Server("server");
std::string const StaticStrings::StartThread("x-arango-start-thread");
std::string const StaticStrings::WwwAuthenticate("www-authenticate"); std::string const StaticStrings::WwwAuthenticate("www-authenticate");

View File

@ -80,6 +80,7 @@ class StaticStrings {
static std::string const Origin; static std::string const Origin;
static std::string const Queue; static std::string const Queue;
static std::string const Server; static std::string const Server;
static std::string const StartThread;
static std::string const WwwAuthenticate; static std::string const WwwAuthenticate;
// mime types // mime types