1
0
Fork 0

async job management

This commit is contained in:
Jan Steemann 2013-09-26 17:30:03 +02:00
parent a525dab0c4
commit 5a422becd8
19 changed files with 891 additions and 42 deletions

View File

@ -56,6 +56,7 @@
#include "Dispatcher/ApplicationDispatcher.h"
#include "Dispatcher/Dispatcher.h"
#include "HttpServer/ApplicationEndpointServer.h"
#include "HttpServer/AsyncJobManager.h"
#include "HttpServer/HttpHandlerFactory.h"
#include "Logger/Logger.h"
@ -106,10 +107,15 @@ using namespace triagens::arango;
////////////////////////////////////////////////////////////////////////////////
static void DefineApiHandlers (HttpHandlerFactory* factory,
ApplicationAdminServer* admin) {
ApplicationAdminServer* admin,
AsyncJobManager* jobManager) {
// add "/version" handler
admin->addBasicHandlers(factory, "/_api");
admin->addBasicHandlers(factory, "/_api", (void*) jobManager);
// add "/batch" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::BATCH_PATH,
RestHandlerCreator<RestBatchHandler>::createNoData);
// add "/document" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::DOCUMENT_PATH,
@ -123,10 +129,6 @@ static void DefineApiHandlers (HttpHandlerFactory* factory,
factory->addPrefixHandler(RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH,
RestHandlerCreator<RestImportHandler>::createNoData);
// add "/batch" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::BATCH_PATH,
RestHandlerCreator<RestBatchHandler>::createNoData);
// add "/replication" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::REPLICATION_PATH,
RestHandlerCreator<RestReplicationHandler>::createNoData);
@ -141,10 +143,11 @@ static void DefineApiHandlers (HttpHandlerFactory* factory,
////////////////////////////////////////////////////////////////////////////////
static void DefineAdminHandlers (HttpHandlerFactory* factory,
ApplicationAdminServer* admin) {
ApplicationAdminServer* admin,
AsyncJobManager* jobManager) {
// add "/version" handler
admin->addBasicHandlers(factory, "/_admin");
admin->addBasicHandlers(factory, "/_admin", (void*) jobManager);
// add admin handlers
admin->addHandlers(factory, "/_admin");
@ -270,6 +273,7 @@ ArangoServer::ArangoServer (int argc, char** argv)
_applicationDispatcher(0),
_applicationEndpointServer(0),
_applicationAdminServer(0),
_jobManager(0),
_authenticateSystemOnly(false),
_disableAuthentication(false),
_dispatcherThreads(8),
@ -306,6 +310,16 @@ ArangoServer::ArangoServer (int argc, char** argv)
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
ArangoServer::~ArangoServer () {
if (_jobManager != 0) {
delete _jobManager;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -377,7 +391,7 @@ void ArangoServer::buildApplicationServer () {
;
#endif
// .............................................................................
// and start a simple admin server
// .............................................................................
@ -495,9 +509,12 @@ void ArangoServer::buildApplicationServer () {
// endpoint server
// .............................................................................
_jobManager = new AsyncJobManager(&TRI_NewTickServer);
_applicationEndpointServer = new ApplicationEndpointServer(_applicationServer,
_applicationScheduler,
_applicationDispatcher,
_jobManager,
"arangodb",
&SetRequestContext,
(void*) _server);
@ -717,8 +734,8 @@ int ArangoServer::startupServer () {
HttpHandlerFactory* handlerFactory = _applicationEndpointServer->getHandlerFactory();
DefineApiHandlers(handlerFactory, _applicationAdminServer);
DefineAdminHandlers(handlerFactory, _applicationAdminServer);
DefineApiHandlers(handlerFactory, _applicationAdminServer, _jobManager);
DefineAdminHandlers(handlerFactory, _applicationAdminServer, _jobManager);
// add action handler
handlerFactory->addPrefixHandler(

View File

@ -49,6 +49,7 @@ namespace triagens {
class ApplicationDispatcher;
class ApplicationEndpointServer;
class ApplicationScheduler;
class AsyncJobManager;
class HttpServer;
class HttpsServer;
}
@ -100,6 +101,12 @@ namespace triagens {
ArangoServer (int argc, char** argv);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~ArangoServer ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -240,6 +247,12 @@ namespace triagens {
admin::ApplicationAdminServer* _applicationAdminServer;
////////////////////////////////////////////////////////////////////////////////
/// @brief asynchronous job manager
////////////////////////////////////////////////////////////////////////////////
rest::AsyncJobManager* _jobManager;
////////////////////////////////////////////////////////////////////////////////
/// @brief application MR
////////////////////////////////////////////////////////////////////////////////

View File

@ -30,6 +30,7 @@
#include "BasicsC/common.h"
#include "Admin/RestAdminLogHandler.h"
#include "Admin/RestJobHandler.h"
#include "Admin/RestHandlerCreator.h"
#include "Basics/ProgramOptionsDescription.h"
#include "HttpServer/HttpHandlerFactory.h"
@ -156,7 +157,9 @@ void ApplicationAdminServer::allowVersion (string name, string version) {
/// Note that the server does not claim ownership of the factory.
////////////////////////////////////////////////////////////////////////////////
void ApplicationAdminServer::addBasicHandlers (HttpHandlerFactory* factory, string const& prefix) {
void ApplicationAdminServer::addBasicHandlers (HttpHandlerFactory* factory,
string const& prefix,
void* jobManager) {
#if TRI_ENABLE_MAINTAINER_MODE
// this handler does not provide any real benefit. we only use it to compare
// the performance of direct vs. the performance of queued execution
@ -186,6 +189,10 @@ void ApplicationAdminServer::addBasicHandlers (HttpHandlerFactory* factory, stri
RestHandlerCreator<RestVersionHandler>::createData<RestVersionHandler::version_options_t const*>,
(void*) _versionDataDirect);
}
factory->addHandler(prefix + "/job",
RestHandlerCreator<RestJobHandler>::createData<void*>,
jobManager);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -129,7 +129,9 @@ namespace triagens {
/// Note that the server does not claim ownership of the factory.
////////////////////////////////////////////////////////////////////////////////
void addBasicHandlers (rest::HttpHandlerFactory*, string const &prefix);
void addBasicHandlers (rest::HttpHandlerFactory*,
string const &prefix,
void*);
////////////////////////////////////////////////////////////////////////////////
/// @brief adds the http handlers for administration
@ -137,7 +139,8 @@ namespace triagens {
/// Note that the server does not claim ownership of the factory.
////////////////////////////////////////////////////////////////////////////////
void addHandlers (rest::HttpHandlerFactory*, string const& prefix);
void addHandlers (rest::HttpHandlerFactory*,
string const& prefix);
////////////////////////////////////////////////////////////////////////////////
/// @}

View File

@ -0,0 +1,215 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief job control request handler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2010-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "RestJobHandler.h"
#include "Basics/StringUtils.h"
#include "HttpServer/AsyncJobManager.h"
#include "Rest/HttpRequest.h"
#include "Rest/HttpResponse.h"
using namespace triagens::basics;
using namespace triagens::rest;
using namespace triagens::admin;
using namespace std;
// -----------------------------------------------------------------------------
// --SECTION-- public constants
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief name of the queue
////////////////////////////////////////////////////////////////////////////////
const string RestJobHandler::QUEUE_NAME = "STANDARD";
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
RestJobHandler::RestJobHandler (HttpRequest* request, void* data)
: RestBaseHandler(request) {
_jobManager = static_cast<AsyncJobManager*>(data);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- Handler methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool RestJobHandler::isDirect () {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& RestJobHandler::queue () const {
return QUEUE_NAME;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
HttpHandler::status_e RestJobHandler::execute () {
// extract the sub-request type
HttpRequest::HttpRequestType type = _request->requestType();
if (type == HttpRequest::HTTP_REQUEST_GET) {
getJob();
}
if (type == HttpRequest::HTTP_REQUEST_DELETE) {
deleteJob();
}
else {
generateError(HttpResponse::METHOD_NOT_ALLOWED, (int) HttpResponse::METHOD_NOT_ALLOWED);
}
return HANDLER_DONE;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a job result by id
////////////////////////////////////////////////////////////////////////////////
void RestJobHandler::getJob () {
const vector<string> suffix = _request->suffix();
if (suffix.size() != 1) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER);
return;
}
const string& value = suffix[0];
uint64_t jobId = StringUtils::uint64(value);
AsyncJobResult::Status status;
HttpResponse* response = _jobManager->getJobResult(jobId, status);
if (status == AsyncJobResult::JOB_UNDEFINED) {
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return;
}
if (status == AsyncJobResult::JOB_PENDING) {
// TODO: signal "job not ready"
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return;
}
assert(status == AsyncJobResult::JOB_DONE);
// delete our own response
if (_response != 0) {
delete _response;
}
// return the original response
_response = response;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a result by id
////////////////////////////////////////////////////////////////////////////////
void RestJobHandler::deleteJob () {
const vector<string> suffix = _request->suffix();
if (suffix.size() != 1) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER);
return;
}
const string& value = suffix[0];
uint64_t jobId = StringUtils::uint64(value);
bool found = _jobManager->deleteJobResult(jobId);
if (! found) {
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return;
}
TRI_json_t* json = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE);
if (json != 0) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "result", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, true));
}
generateResult(json);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

179
lib/Admin/RestJobHandler.h Normal file
View File

@ -0,0 +1,179 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief job control request handler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2010-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_ADMIN_REST_JOB_HANDLER_H
#define TRIAGENS_ADMIN_REST_JOB_HANDLER_H 1
#include "Admin/RestBaseHandler.h"
#include "HttpServer/AsyncJobManager.h"
namespace triagens {
namespace rest {
class AsyncJobManager;
}
namespace admin {
// -----------------------------------------------------------------------------
// --SECTION-- class RestJobHandler
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief job control request handler
////////////////////////////////////////////////////////////////////////////////
class RestJobHandler : public RestBaseHandler {
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
RestJobHandler (rest::HttpRequest* request, void*);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- Handler methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool isDirect ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& queue () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the handler
////////////////////////////////////////////////////////////////////////////////
status_e execute ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief returns job result by id
////////////////////////////////////////////////////////////////////////////////
void getJob ();
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a job result by id
////////////////////////////////////////////////////////////////////////////////
void deleteJob ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup RestServer
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief async job manager
////////////////////////////////////////////////////////////////////////////////
AsyncJobManager* _jobManager;
////////////////////////////////////////////////////////////////////////////////
/// @brief name of the queue
////////////////////////////////////////////////////////////////////////////////
static const std::string QUEUE_NAME;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
};
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -35,6 +35,7 @@
#include "Dispatcher/Job.h"
#include "GeneralServer/GeneralCommTask.h"
#include "GeneralServer/GeneralAsyncCommTask.h"
#include "HttpServer/AsyncJobManager.h"
#include "Rest/AsyncJobServer.h"
// -----------------------------------------------------------------------------
@ -43,6 +44,7 @@
namespace triagens {
namespace rest {
class AsyncJobManager;
class Dispatcher;
// -----------------------------------------------------------------------------
@ -90,16 +92,21 @@ namespace triagens {
explicit
GeneralServerDispatcher (Scheduler* scheduler, double keepAliveTimeout)
: GeneralServer<S, HF, CT>(scheduler, keepAliveTimeout),
_dispatcher(0) {
_dispatcher(0),
_jobManager(0) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new general server
////////////////////////////////////////////////////////////////////////////////
GeneralServerDispatcher (Scheduler* scheduler, Dispatcher* dispatcher, double keepAliveTimeout)
GeneralServerDispatcher (Scheduler* scheduler,
Dispatcher* dispatcher,
AsyncJobManager* jobManager,
double keepAliveTimeout)
: GeneralServer<S, HF, CT>(scheduler, keepAliveTimeout),
_dispatcher(dispatcher) {
_dispatcher(dispatcher),
_jobManager(jobManager) {
}
////////////////////////////////////////////////////////////////////////////////
@ -234,6 +241,7 @@ namespace triagens {
if (job->isDetached()) {
if (handler != 0) {
_jobManager->finishAsyncJob<S, HF>(job);
delete handler;
}
return;
@ -306,37 +314,48 @@ namespace triagens {
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
/// @brief create a job for asynchronous execution (using the dispatcher)
////////////////////////////////////////////////////////////////////////////////
bool handleRequestAsync (GeneralHandler* handler) {
// execute the handler using the dispatcher
if (_dispatcher != 0) {
Job* ajob = handler->createJob(this, true);
ServerJob* job = dynamic_cast<ServerJob*>(ajob);
bool handleRequestAsync (GeneralHandler* handler,
uint64_t* jobId) {
if (_dispatcher == 0) {
// without a dispatcher, simply give up
RequestStatisticsAgentSetExecuteError(handler);
if (job == 0) {
RequestStatisticsAgentSetExecuteError(handler);
LOGGER_WARNING("no dispatcher is known");
LOGGER_WARNING("task is indirect, but handler failed to create a job - this cannot work!");
return false;
}
// create a job and add it to the dispatcher queue
Job* ajob = handler->createJob(this, true);
ServerJob* job = dynamic_cast<ServerJob*>(ajob);
delete handler;
return false;
}
if (job == 0) {
RequestStatisticsAgentSetExecuteError(handler);
return _dispatcher->addJob(job);
LOGGER_WARNING("task is indirect, but handler failed to create a job - this cannot work!");
delete handler;
return false;
}
if (jobId != 0) {
_jobManager->initAsyncJob<S, HF>(job, jobId);
}
// without a dispatcher, simply give up
RequestStatisticsAgentSetExecuteError(handler);
if (! _dispatcher->addJob(job)) {
// could not add job to job queue
return false;
}
LOGGER_WARNING("no dispatcher is known");
return false;
// job is in queue now
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
/// @brief execute the handler directly, or add it to the dispatcher queue
////////////////////////////////////////////////////////////////////////////////
bool handleRequest (CT * task, GeneralHandler* handler) {
@ -511,6 +530,13 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
Dispatcher* _dispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief the job manager
////////////////////////////////////////////////////////////////////////////////
AsyncJobManager* _jobManager;
};
}
}

View File

@ -86,6 +86,7 @@ namespace triagens {
: Job("HttpServerJob"),
_server(server),
_handler(handler),
_id(0),
_shutdown(0),
_abandon(false),
_isDetached(isDetached) {
@ -113,6 +114,22 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief assign an id to the job. note: the id might be 0
////////////////////////////////////////////////////////////////////////////////
void assignId (uint64_t id) {
_id = id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief assign an id to the job
////////////////////////////////////////////////////////////////////////////////
uint64_t id () const {
return _id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief abandon job
////////////////////////////////////////////////////////////////////////////////
@ -271,6 +288,12 @@ namespace triagens {
H* _handler;
////////////////////////////////////////////////////////////////////////////////
/// @brief job id (only used for detached jobs)
////////////////////////////////////////////////////////////////////////////////
uint64_t _id;
////////////////////////////////////////////////////////////////////////////////
/// @brief shutdown in progress
////////////////////////////////////////////////////////////////////////////////

View File

@ -82,6 +82,7 @@ namespace {
ApplicationEndpointServer::ApplicationEndpointServer (ApplicationServer* applicationServer,
ApplicationScheduler* applicationScheduler,
ApplicationDispatcher* applicationDispatcher,
AsyncJobManager* jobManager,
std::string const& authenticationRealm,
HttpHandlerFactory::context_fptr setContext,
void* contextData)
@ -89,6 +90,7 @@ ApplicationEndpointServer::ApplicationEndpointServer (ApplicationServer* applica
_applicationServer(applicationServer),
_applicationScheduler(applicationScheduler),
_applicationDispatcher(applicationDispatcher),
_jobManager(jobManager),
_authenticationRealm(authenticationRealm),
_setContext(setContext),
_contextData(contextData),
@ -156,6 +158,7 @@ bool ApplicationEndpointServer::buildServers () {
// unencrypted endpoints
server = new HttpServer(_applicationScheduler->scheduler(),
_applicationDispatcher->dispatcher(),
_jobManager,
_keepAliveTimeout,
_handlerFactory);
@ -173,6 +176,7 @@ bool ApplicationEndpointServer::buildServers () {
// https
server = new HttpsServer(_applicationScheduler->scheduler(),
_applicationDispatcher->dispatcher(),
_jobManager,
_keepAliveTimeout,
_handlerFactory,
_sslContext);

View File

@ -46,6 +46,7 @@ namespace triagens {
namespace rest {
class ApplicationDispatcher;
class ApplicationScheduler;
class AsyncJobManager;
// -----------------------------------------------------------------------------
// --SECTION-- class ApplicationEndpointServer
@ -87,6 +88,7 @@ namespace triagens {
ApplicationEndpointServer (ApplicationServer*,
ApplicationScheduler*,
ApplicationDispatcher*,
AsyncJobManager*,
std::string const&,
HttpHandlerFactory::context_fptr,
void*);
@ -288,6 +290,12 @@ namespace triagens {
ApplicationDispatcher* _applicationDispatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief application job manager
////////////////////////////////////////////////////////////////////////////////
AsyncJobManager* _jobManager;
////////////////////////////////////////////////////////////////////////////////
/// @brief authentication realm
////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,309 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief job manager
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2004-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_HTTP_SERVER_JOB_MANAGER_H
#define TRIAGENS_HTTP_SERVER_JOB_MANAGER_H 1
#include "Basics/Common.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "GeneralServer/GeneralServerJob.h"
using namespace std;
using namespace triagens::rest;
// -----------------------------------------------------------------------------
// --SECTION-- class JobManager
// -----------------------------------------------------------------------------
namespace triagens {
namespace rest {
struct AsyncJobResult {
public:
typedef enum {
JOB_UNDEFINED,
JOB_PENDING,
JOB_DONE
}
Status;
typedef uint64_t IdType;
AsyncJobResult () :
_jobId(0),
_response(0),
_stamp(0.0),
_status(JOB_UNDEFINED) {
}
AsyncJobResult (IdType jobId,
HttpResponse* response,
double stamp,
Status status) :
_jobId(jobId),
_response(response),
_stamp(stamp),
_status(status) {
}
~AsyncJobResult () {
}
IdType _jobId;
HttpResponse* _response;
double _stamp;
Status _status;
};
class AsyncJobManager {
private:
AsyncJobManager (AsyncJobManager const&);
AsyncJobManager& operator= (AsyncJobManager const&);
public:
AsyncJobManager (uint64_t (*genFunc)())
: _lock(),
_jobs(),
generate(genFunc) {
}
~AsyncJobManager () {
}
typedef std::map<AsyncJobResult::IdType, AsyncJobResult> JobList;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief get the result of an async job
////////////////////////////////////////////////////////////////////////////////
HttpResponse* getJobResult (AsyncJobResult::IdType jobId,
AsyncJobResult::Status& status) {
WRITE_LOCKER(_lock);
JobList::iterator it = _jobs.find(jobId);
if (it == _jobs.end()) {
status = AsyncJobResult::JOB_UNDEFINED;
return 0;
}
HttpResponse* response = (*it).second._response;
status = (*it).second._status;
// remove the job from the list
_jobs.erase(it);
return response;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief delete the result of an async job
////////////////////////////////////////////////////////////////////////////////
bool deleteJobResult (AsyncJobResult::IdType jobId) {
WRITE_LOCKER(_lock);
JobList::iterator it = _jobs.find(jobId);
if (it == _jobs.end()) {
return false;
}
HttpResponse* response = (*it).second._response;
if (response != 0) {
delete response;
}
// remove the job from the list
_jobs.erase(it);
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief clean up "old" jobs from the list of jobs
////////////////////////////////////////////////////////////////////////////////
size_t cleanup () {
size_t n = 0;
WRITE_LOCKER(_lock);
JobList::iterator it = _jobs.begin();
// iterate the list. the list is sorted by id
while (it != _jobs.end()) {
if ((*it).second._status == AsyncJobResult::JOB_DONE) {
if ((*it).second._response != 0) {
// remove the response
delete (*it).second._response;
}
// remove the job from the list
_jobs.erase(it);
++n;
}
++it;
}
return n;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the list of pending jobs
////////////////////////////////////////////////////////////////////////////////
const std::vector<AsyncJobResult::IdType> pending (size_t maxCount) {
return byStatus(AsyncJobResult::JOB_PENDING, maxCount);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the list of done jobs
////////////////////////////////////////////////////////////////////////////////
const std::vector<AsyncJobResult::IdType> done (size_t maxCount) {
return byStatus(AsyncJobResult::JOB_DONE, maxCount);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the list of jobs by status
////////////////////////////////////////////////////////////////////////////////
const std::vector<AsyncJobResult::IdType> byStatus (AsyncJobResult::Status status,
size_t maxCount) {
vector<AsyncJobResult::IdType> jobs;
size_t n = 0;
{
READ_LOCKER(_lock);
JobList::iterator it = _jobs.begin();
// iterate the list. the list is sorted by id
while (it != _jobs.end()) {
AsyncJobResult::IdType jobId = (*it).first;
if ((*it).second._status == status) {
jobs.push_back(jobId);
if (++n >= maxCount) {
break;
}
}
++it;
}
}
return jobs;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise an async job
////////////////////////////////////////////////////////////////////////////////
template<typename S, typename HF>
void initAsyncJob (GeneralServerJob<S, typename HF::GeneralHandler>* job,
uint64_t* jobId) {
if (jobId == 0) {
return;
}
*jobId = (AsyncJobResult::IdType) generate();
job->assignId((uint64_t) *jobId);
AsyncJobResult ajr(*jobId, 0, TRI_microtime(), AsyncJobResult::JOB_PENDING);
WRITE_LOCKER(_lock);
_jobs[*jobId] = ajr;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief finish the execution of an async job
////////////////////////////////////////////////////////////////////////////////
template<typename S, typename HF>
void finishAsyncJob (GeneralServerJob<S, typename HF::GeneralHandler>* job) {
assert(job != 0);
typename HF::GeneralHandler* handler = job->getHandler();
assert(handler != 0);
AsyncJobResult::IdType jobId = job->id();
if (jobId == 0) {
return;
}
WRITE_LOCKER(_lock);
JobList::iterator it = _jobs.find(jobId);
if (it != _jobs.end()) {
(*it).second._response = handler->stealResponse();
(*it).second._status = AsyncJobResult::JOB_DONE;
}
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief lock to protect the _asyncJobs map
////////////////////////////////////////////////////////////////////////////////
basics::ReadWriteLock _lock;
////////////////////////////////////////////////////////////////////////////////
/// @brief list of pending/done async jobs
////////////////////////////////////////////////////////////////////////////////
JobList _jobs;
////////////////////////////////////////////////////////////////////////////////
/// @brief function pointer for id generation
////////////////////////////////////////////////////////////////////////////////
uint64_t (*generate)();
};
}
}
#endif
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -75,10 +75,11 @@ namespace triagens {
GeneralHttpServer (Scheduler* scheduler,
Dispatcher* dispatcher,
AsyncJobManager* jobManager,
double keepAliveTimeout,
HF* handlerFactory)
: GeneralServer<S, HF, CT>(scheduler, keepAliveTimeout),
GeneralServerDispatcher<S, HF, CT>(scheduler, dispatcher, keepAliveTimeout),
GeneralServerDispatcher<S, HF, CT>(scheduler, dispatcher, jobManager, keepAliveTimeout),
_handlerFactory(handlerFactory) {
}

View File

@ -531,14 +531,30 @@ namespace triagens {
// check for an async request
string const& asyncExecution = this->_request->header("x-arango-async", found);
if (found && triagens::basics::StringUtils::boolean(asyncExecution)) {
if (found && (triagens::basics::StringUtils::boolean(asyncExecution) || asyncExecution == "keep")) {
// we have an async request
this->_request = 0;
ok = this->_server->handleRequestAsync(handler);
uint64_t jobId = 0;
if (asyncExecution == "keep") {
// persist the responses
ok = this->_server->handleRequestAsync(handler, &jobId);
}
else {
// don't persist the responses
ok = this->_server->handleRequestAsync(handler, 0);
}
if (ok) {
HttpResponse response(HttpResponse::ACCEPTED);
if (jobId > 0) {
// return the job id we just created
response.setHeader("x-arango-async-id",
strlen("x-arango-async-id"),
triagens::basics::StringUtils::itoa(jobId));
}
this->handleResponse(&response);
}
}

View File

@ -90,6 +90,16 @@ HttpResponse* HttpHandler::getResponse () const {
return _response;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief steal the response
////////////////////////////////////////////////////////////////////////////////
HttpResponse* HttpHandler::stealResponse () {
HttpResponse* tmp = _response;
_response = 0;
return tmp;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -112,6 +112,12 @@ namespace triagens {
HttpResponse* getResponse () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief steal the response
////////////////////////////////////////////////////////////////////////////////
HttpResponse* stealResponse ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,7 @@
#include "HttpServer/GeneralHttpServer.h"
#include "HttpServer/AsyncJobManager.h"
#include "HttpServer/HttpCommTask.h"
// -----------------------------------------------------------------------------
@ -63,10 +64,11 @@ namespace triagens {
HttpServer (Scheduler* scheduler,
Dispatcher* dispatcher,
AsyncJobManager* jobManager,
double keepAliveTimeout,
HttpHandlerFactory* handlerFactory)
: GeneralServer<HttpServer, HttpHandlerFactory, HttpCommTask<HttpServer> >(scheduler, keepAliveTimeout),
GeneralHttpServer<HttpServer, HttpHandlerFactory, HttpCommTask<HttpServer> >(scheduler, dispatcher, keepAliveTimeout, handlerFactory) {
GeneralHttpServer<HttpServer, HttpHandlerFactory, HttpCommTask<HttpServer> >(scheduler, dispatcher, jobManager, keepAliveTimeout, handlerFactory) {
}
// -----------------------------------------------------------------------------

View File

@ -35,6 +35,7 @@
#include "Basics/ssl-helper.h"
#include "Logger/Logger.h"
#include "HttpServer/AsyncJobManager.h"
#include "HttpServer/GeneralHttpServer.h"
#include "HttpServer/HttpCommTask.h"
#include "HttpServer/HttpHandler.h"
@ -66,12 +67,13 @@ namespace triagens {
HttpsServer (Scheduler* scheduler,
Dispatcher* dispatcher,
AsyncJobManager* jobManager,
double keepAliveTimeout,
HttpHandlerFactory* handlerFactory,
SSL_CTX* ctx)
: GeneralServer<HttpsServer, HttpHandlerFactory, HttpCommTask<HttpsServer> >(scheduler, keepAliveTimeout),
GeneralSslServer<HttpsServer, HttpHandlerFactory, HttpCommTask<HttpsServer> >(scheduler, dispatcher, keepAliveTimeout, handlerFactory, ctx),
GeneralHttpServer<HttpsServer, HttpHandlerFactory, HttpCommTask<HttpsServer> >(scheduler, dispatcher, keepAliveTimeout, handlerFactory) {
GeneralHttpServer<HttpsServer, HttpHandlerFactory, HttpCommTask<HttpsServer> >(scheduler, dispatcher, jobManager, keepAliveTimeout, handlerFactory) {
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -121,6 +121,7 @@ lib_libarango_fe_a_SOURCES = \
lib/Admin/RestAdminBaseHandler.cpp \
lib/Admin/RestAdminLogHandler.cpp \
lib/Admin/RestBaseHandler.cpp \
lib/Admin/RestJobHandler.cpp \
lib/Admin/RestVersionHandler.cpp \
lib/ApplicationServer/ApplicationFeature.cpp \
lib/ApplicationServer/ApplicationServer.cpp \

View File

@ -143,6 +143,14 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the response is a HTTP HEAD response
////////////////////////////////////////////////////////////////////////////////
bool isHeadResponse () const {
return _isHeadResponse;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief http response string
///
@ -415,7 +423,6 @@ namespace triagens {
basics::Dictionary<char const*> _headers;
////////////////////////////////////////////////////////////////////////////////
/// @brief cookies
////////////////////////////////////////////////////////////////////////////////