1
0
Fork 0

changed async task handling

This commit is contained in:
Jan Steemann 2012-06-21 16:32:54 +02:00
parent 8682f82dfa
commit f100e2ad82
20 changed files with 573 additions and 76 deletions

View File

@ -0,0 +1,206 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief batch request handler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2012, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "RestBatchHandler.h"
#include "Basics/StringUtils.h"
#include "BasicsC/string-buffer.h"
#include "Rest/HttpRequest.h"
#include "VocBase/vocbase.h"
#include "ProtocolBuffers/arangodb.pb.h"
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
using namespace triagens::arango;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
RestBatchHandler::RestBatchHandler (HttpRequest* request, TRI_vocbase_t* vocbase)
: RestVocbaseBaseHandler(request, vocbase) {
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- Handler methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool RestBatchHandler::isDirect () {
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& RestBatchHandler::queue () {
static string const client = "STANDARD";
return client;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
HttpHandler::status_e RestBatchHandler::execute () {
return HANDLER_DONE;
}
/*
bool found;
char const* valueStr = request->value("value", found);
if (found) {
sleep(::atoi(valueStr));
}
else {
sleep(5);
}
// extract the request type
HttpRequest::HttpRequestType type = request->requestType();
string contentType = StringUtils::tolower(StringUtils::trim(request->header("content-type")));
if (type != HttpRequest::HTTP_REQUEST_POST || contentType != "application/x-protobuf") {
generateNotImplemented("ILLEGAL " + BATCH_PATH);
return HANDLER_DONE;
}
LOGGER_INFO << "body size: " << request->bodySize();
FILE* fp = fopen("got","w");
fwrite(request->body(), request->bodySize(), 1, fp);
fclose(fp);
*/
/*
PB_ArangoMessage messages;
bool result = messages.ParseFromArray(request->body(), request->bodySize());
if (!result) {
LOGGER_INFO << "invalid message";
generateError(HttpResponse::BAD,
TRI_ERROR_ARANGO_COLLECTION_PARAMETER_MISSING,
"invalid protobuf message");
return HANDLER_DONE;
}
response = new HttpResponse(HttpResponse::CREATED);
response->setContentType("application/x-protobuf");
response->body().appendText("hihi");
string contentType = StringUtils::tolower(StringUtils::trim(request->header("content-type")));
if (request->requestType() == HttpRequest::HTTP_REQUEST_POST && contentType == "application/x-protobuf") {
return handleProtobufRequest();
}
HttpRequest::HttpRequestType getRequestTypeFromProtoBuf(const PB_ArangoRequestType type) {
switch (type) {
case PB_REQUEST_TYPE_DELETE:
return HttpRequest::HTTP_REQUEST_DELETE;
case PB_REQUEST_TYPE_GET:
return HttpRequest::HTTP_REQUEST_GET;
case PB_REQUEST_TYPE_HEAD:
return HttpRequest::HTTP_REQUEST_HEAD;
case PB_REQUEST_TYPE_POST:
return HttpRequest::HTTP_REQUEST_POST;
case PB_REQUEST_TYPE_PUT:
return HttpRequest::HTTP_REQUEST_PUT;
}
}
bool HttpCommTask::handleProtobufRequest () {
PB_ArangoMessage messages;
bool result = messages.ParseFromArray(request->body(), request->bodySize());
if (!result) {
LOGGER_INFO << "invalid message";
generateError(HttpResponse::BAD,
TRI_ERROR_ARANGO_COLLECTION_PARAMETER_MISSING,
"invalid protobuf message");
return HANDLER_DONE;
}
for (int i = 0; i < messages.messages_size(); ++i) {
PB_ArangoBatchMessage* message = messages(i);
assert(message->type() == PB_BLOB_REQUEST);
PB_ArangoBlobRequest* blob = message->request();
HttpRequest::HttpRequestType requestType = getRequestTypeFromProtoBuf(blob->requesttype());
const string url = blob->url();
}
PB_ArangoKeyValue* kv;
return HANDLER_DONE;
}
*/
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)"
// End:

View File

@ -0,0 +1,131 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief batch request handler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2010-2012, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_REST_HANDLER_REST_BATCH_HANDLER_H
#define TRIAGENS_REST_HANDLER_REST_BATCH_HANDLER_H 1
#include "RestHandler/RestVocbaseBaseHandler.h"
// -----------------------------------------------------------------------------
// --SECTION-- RestBatchHandler
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
namespace triagens {
namespace arango {
////////////////////////////////////////////////////////////////////////////////
/// @brief import request handler
////////////////////////////////////////////////////////////////////////////////
class RestBatchHandler : public RestVocbaseBaseHandler {
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
RestBatchHandler (rest::HttpRequest* request, struct TRI_vocbase_s* vocbase);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- Handler methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool isDirect ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& queue ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
status_e execute ();
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup ArangoDB
/// @{
////////////////////////////////////////////////////////////////////////////////
};
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)"
// End:

View File

@ -101,6 +101,12 @@ string RestVocbaseBaseHandler::COLLECTION_PATH = "/_api/collection";
string RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH = "/_api/import"; string RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH = "/_api/import";
////////////////////////////////////////////////////////////////////////////////
/// @brief batch path
////////////////////////////////////////////////////////////////////////////////
string RestVocbaseBaseHandler::BATCH_PATH = "/_api/batch";
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}

View File

@ -113,6 +113,12 @@ namespace triagens {
static string DOCUMENT_IMPORT_PATH; static string DOCUMENT_IMPORT_PATH;
////////////////////////////////////////////////////////////////////////////////
/// @brief batch path
////////////////////////////////////////////////////////////////////////////////
static string BATCH_PATH;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors // --SECTION-- constructors and destructors
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -39,6 +39,7 @@
#include "build.h" #include "build.h"
#include "Actions/actions.h"
#include "Actions/RestActionHandler.h" #include "Actions/RestActionHandler.h"
#include "Admin/RestHandlerCreator.h" #include "Admin/RestHandlerCreator.h"
#include "Basics/FileUtils.h" #include "Basics/FileUtils.h"
@ -59,6 +60,7 @@
#include "RestHandler/RestDocumentHandler.h" #include "RestHandler/RestDocumentHandler.h"
#include "RestHandler/RestEdgeHandler.h" #include "RestHandler/RestEdgeHandler.h"
#include "RestHandler/RestImportHandler.h" #include "RestHandler/RestImportHandler.h"
#include "RestHandler/RestBatchHandler.h"
#include "RestServer/ArangoHttpServer.h" #include "RestServer/ArangoHttpServer.h"
#include "RestServer/JavascriptDispatcherThread.h" #include "RestServer/JavascriptDispatcherThread.h"
#include "Scheduler/ApplicationScheduler.h" #include "Scheduler/ApplicationScheduler.h"
@ -280,6 +282,11 @@ static void DefineApiHandlers (HttpHandlerFactory* factory,
factory->addPrefixHandler(RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH, factory->addPrefixHandler(RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH,
RestHandlerCreator<RestImportHandler>::createData<TRI_vocbase_t*>, RestHandlerCreator<RestImportHandler>::createData<TRI_vocbase_t*>,
vocbase); vocbase);
// add batch handler
factory->addPrefixHandler(RestVocbaseBaseHandler::BATCH_PATH,
RestHandlerCreator<RestBatchHandler>::createData<TRI_vocbase_t*>,
vocbase);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -951,6 +958,7 @@ int ArangoServer::startupServer () {
// ............................................................................. // .............................................................................
// and cleanup // and cleanup
// ............................................................................. // .............................................................................
closeDatabase(); closeDatabase();
return 0; return 0;
@ -1160,7 +1168,7 @@ int ArangoServer::executeShell (shell_operation_mode_e mode) {
if (v8g) { if (v8g) {
delete v8g; delete v8g;
} }
// close the database // close the database
closeDatabase(); closeDatabase();
@ -1385,6 +1393,7 @@ void ArangoServer::closeDatabase () {
ApplicationUserManager::unloadRoles(); ApplicationUserManager::unloadRoles();
TRI_DestroyVocBase(_vocbase); TRI_DestroyVocBase(_vocbase);
_vocbase = 0; _vocbase = 0;
LOGGER_INFO << "ArangoDB has been shut down"; LOGGER_INFO << "ArangoDB has been shut down";
} }

View File

@ -162,17 +162,18 @@ void JavascriptDispatcherThread::run () {
DispatcherThread::run(); DispatcherThread::run();
TRI_v8_global_t* v8g = (TRI_v8_global_t*) _isolate->GetData();
_context->Exit(); _context->Exit();
_context.Dispose(); _context.Dispose();
// gc
while (!v8::V8::IdleNotification()) { while (!v8::V8::IdleNotification()) {
} }
_isolate->Exit(); _isolate->Exit();
_isolate->Dispose(); _isolate->Dispose();
// free memory for this thread
TRI_v8_global_t* v8g = (TRI_v8_global_t*) _isolate->GetData();
if (v8g) { if (v8g) {
delete v8g; delete v8g;

View File

@ -126,7 +126,7 @@ Thread::Thread (const string& name)
Thread::~Thread () { Thread::~Thread () {
if (_running != 0) { if (_running != 0) {
LOGGER_WARNING << "forcefully shuting down thread '" << _name << "'"; LOGGER_WARNING << "forcefully shutting down thread '" << _name << "'";
TRI_StopThread(&_thread); TRI_StopThread(&_thread);
} }

View File

@ -90,7 +90,7 @@ namespace triagens {
LOGGER_DEBUG << "beginning shutdown sequence of dispatcher queue '" << _name <<"'"; LOGGER_DEBUG << "beginning shutdown sequence of dispatcher queue '" << _name <<"'";
// broadcast the we want to stop // broadcast that we want to stop
size_t const MAX_TRIES = 10; size_t const MAX_TRIES = 10;
stopping = 1; stopping = 1;

View File

@ -247,6 +247,7 @@ void DispatcherThread::run () {
#ifdef TRI_HAVE_POSIX_THREADS #ifdef TRI_HAVE_POSIX_THREADS
if (queue->stopping != 0) { if (queue->stopping != 0) {
LOGGER_WARNING << "caught cancellation exception during cleanup"; LOGGER_WARNING << "caught cancellation exception during cleanup";
queue->accessQueue.unlock();
throw; throw;
} }
#endif #endif

View File

@ -131,6 +131,12 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
virtual void handleError (basics::TriagensError const&) = 0; virtual void handleError (basics::TriagensError const&) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief shut downs the execution and deletes everything
////////////////////////////////////////////////////////////////////////////////
virtual bool beginShutdown () = 0;
public: public:

View File

@ -34,7 +34,7 @@
#include <Basics/Exceptions.h> #include <Basics/Exceptions.h>
#include "Scheduler/AsyncTask.h" #include "Scheduler/AsyncTask.h"
#include "GeneralServer/GeneralServerJob.h" #include "Rest/Handler.h"
namespace triagens { namespace triagens {
namespace rest { namespace rest {
@ -57,17 +57,7 @@ namespace triagens {
GeneralAsyncCommTask (S* server, socket_t fd, ConnectionInfo const& info) GeneralAsyncCommTask (S* server, socket_t fd, ConnectionInfo const& info)
: Task("GeneralAsyncCommTask"), : Task("GeneralAsyncCommTask"),
T(server, fd, info), T(server, fd, info),
job(0) { _handler(0) {
}
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief sets a job
////////////////////////////////////////////////////////////////////////////////
void setJob (GeneralServerJob<S, typename HF::GeneralHandler>* job) {
this->job = job;
} }
protected: protected:
@ -77,10 +67,7 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
~GeneralAsyncCommTask () { ~GeneralAsyncCommTask () {
if (job != 0) { // this has shut down the job before, should we now free the handler if any?
LOGGER_DEBUG << "job is still active, trying to shutdown";
job->beginShutdown();
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -115,6 +102,19 @@ namespace triagens {
return result; return result;
} }
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief set the handler
////////////////////////////////////////////////////////////////////////////////
void setHandler (Handler* handler) {
assert(handler);
_handler = handler;
}
protected: protected:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -122,38 +122,13 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool handleAsync () { bool handleAsync () {
if (job == 0) { assert(_handler);
LOGGER_WARNING << "no job is known";
}
else {
typename HF::GeneralHandler * handler = job->getHandler();
typename HF::GeneralResponse * response = handler->getResponse();
try { return _handler->handleAsync();
if (response == 0) {
basics::InternalError err("no response received from handler");
handler->handleError(err);
response = handler->getResponse();
}
if (response != 0) {
this->handleResponse(response);
}
}
catch (...) {
LOGGER_ERROR << "caught exception in " << __FILE__ << "@" << __LINE__;
}
delete job;
job = 0;
}
return true;
} }
private: private:
GeneralServerJob<S, typename HF::GeneralHandler>* job; Handler* _handler;
}; };
} }
} }

View File

@ -157,6 +157,8 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void destroyHandler (typename HF::GeneralHandler* handler) { void destroyHandler (typename HF::GeneralHandler* handler) {
assert(handler);
if (_handlerFactory == 0) { if (_handlerFactory == 0) {
delete handler; delete handler;
} }
@ -274,7 +276,7 @@ namespace triagens {
/// @brief handles a request /// @brief handles a request
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
virtual bool handleRequest (CT * task, typename HF::GeneralHandler * handler) { virtual bool handleRequest (CT * task, typename HF::GeneralHandler*& handler) {
// execute handle and requeue // execute handle and requeue
bool done = false; bool done = false;
@ -284,7 +286,10 @@ namespace triagens {
if (status != Handler::HANDLER_REQUEUE) { if (status != Handler::HANDLER_REQUEUE) {
done = true; done = true;
assert(handler);
task->setHandler(0);
destroyHandler(handler); destroyHandler(handler);
handler = 0;
} }
else { else {
continue; continue;

View File

@ -34,6 +34,7 @@
#include "Dispatcher/Dispatcher.h" #include "Dispatcher/Dispatcher.h"
#include "GeneralServer/GeneralCommTask.h" #include "GeneralServer/GeneralCommTask.h"
#include "GeneralServer/GeneralAsyncCommTask.h" #include "GeneralServer/GeneralAsyncCommTask.h"
#include "GeneralServer/GeneralServerJob.h"
namespace triagens { namespace triagens {
namespace rest { namespace rest {
@ -83,10 +84,12 @@ namespace triagens {
/// {@inheritDoc} /// {@inheritDoc}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool handleRequest (CT * task, typename HF::GeneralHandler * handler) { bool handleRequest (CT * task, typename HF::GeneralHandler *& handler) {
// execute handle and requeue // execute handle and requeue
bool done = false; bool done = false;
assert(handler);
while (! done) { while (! done) {
@ -97,6 +100,7 @@ namespace triagens {
if (status != Handler::HANDLER_REQUEUE) { if (status != Handler::HANDLER_REQUEUE) {
done = true; done = true;
this->destroyHandler(handler); this->destroyHandler(handler);
handler = 0;
} }
else { else {
continue; continue;
@ -110,13 +114,17 @@ namespace triagens {
if (atask == 0) { if (atask == 0) {
LOGGER_WARNING << "task is indirect, but not asynchronous"; LOGGER_WARNING << "task is indirect, but not asynchronous";
this->destroyHandler(handler); this->destroyHandler(handler);
handler = 0;
return false; return false;
} }
else { else {
GeneralServerJob<S, typename HF::GeneralHandler>* job GeneralServerJob<S, typename HF::GeneralHandler>* job
= new GeneralServerJob<S, typename HF::GeneralHandler>(dynamic_cast<S*>(this), this->_scheduler, _dispatcher, atask, handler); = new GeneralServerJob<S, typename HF::GeneralHandler>(dynamic_cast<S*>(this), this->_scheduler, _dispatcher, atask, handler);
atask->setJob(job); assert(handler);
atask->setHandler(handler);
handler->setJob(job);
_dispatcher->addJob(job); _dispatcher->addJob(job);
} }
@ -128,6 +136,8 @@ namespace triagens {
LOGGER_WARNING << "no dispatcher is known"; LOGGER_WARNING << "no dispatcher is known";
this->destroyHandler(handler); this->destroyHandler(handler);
handler = 0;
return false; return false;
} }
} }

View File

@ -29,13 +29,15 @@
#ifndef TRIAGENS_FYN_GENERAL_SERVER_GENERAL_SERVER_JOB_H #ifndef TRIAGENS_FYN_GENERAL_SERVER_GENERAL_SERVER_JOB_H
#define TRIAGENS_FYN_GENERAL_SERVER_GENERAL_SERVER_JOB_H 1 #define TRIAGENS_FYN_GENERAL_SERVER_GENERAL_SERVER_JOB_H 1
#include "Dispatcher/Job.h" #include <Basics/Common.h>
#include <Logger/Logger.h>
#include <Basics/Exceptions.h> #include <Basics/Exceptions.h>
#include <Basics/StringUtils.h> #include <Basics/StringUtils.h>
#include <Basics/Mutex.h> #include <Basics/Mutex.h>
#include <Rest/Handler.h> #include <Rest/Handler.h>
#include "Dispatcher/Job.h"
#include "Scheduler/AsyncTask.h" #include "Scheduler/AsyncTask.h"
namespace triagens { namespace triagens {
@ -76,8 +78,10 @@ namespace triagens {
/// @brief destructs a server job /// @brief destructs a server job
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
~GeneralServerJob () { ~GeneralServerJob () {
_server->destroyHandler(_handler); if (_handler) {
delete _handler;
}
} }
public: public:
@ -86,7 +90,7 @@ namespace triagens {
/// {@inheritDoc} /// {@inheritDoc}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
JobType type () { JobType type () {
return _handler->type(); return _handler->type();
} }
@ -113,6 +117,10 @@ namespace triagens {
status_e work () { status_e work () {
LOGGER_TRACE << "beginning job " << static_cast<Job*>(this); LOGGER_TRACE << "beginning job " << static_cast<Job*>(this);
if (_shutdown != 0) {
return Job::JOB_DONE;
}
Handler::status_e status = _handler->execute(); Handler::status_e status = _handler->execute();
LOGGER_TRACE << "finished job " << static_cast<Job*>(this) << " with status " << status; LOGGER_TRACE << "finished job " << static_cast<Job*>(this) << " with status " << status;
@ -135,6 +143,8 @@ namespace triagens {
delete this; delete this;
} }
else { else {
assert(_task);
_done = 1; _done = 1;
_task->signal(); _task->signal();
} }
@ -147,25 +157,24 @@ namespace triagens {
void handleError (basics::TriagensError const& ex) { void handleError (basics::TriagensError const& ex) {
_handler->handleError(ex); _handler->handleError(ex);
} }
public:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief shut downs the execution and deletes everything /// @brief shuts down the execution and deletes everything
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void beginShutdown () { bool beginShutdown () {
LOGGER_TRACE << "beginning shutdown, job (" << ((void*) this) << ") is " << (_done ? "done" : "still running"); LOGGER_TRACE << "shutdown, job (" << ((void*) this) << ") is " << (_done ? "done" : "still running");
if (_done != 0) { if (_done != 0) {
delete this; delete this;
return true;
} }
else { else {
_shutdown = 1; _shutdown = 1;
} }
}
public: return false;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief general server /// @brief general server

View File

@ -50,7 +50,7 @@ namespace triagens {
HttpCommTask::HttpCommTask (HttpServerImpl* server, socket_t fd, ConnectionInfo const& info) HttpCommTask::HttpCommTask (HttpServerImpl* server, socket_t fd, ConnectionInfo const& info)
: Task("HttpCommTask"), : Task("HttpCommTask"),
GeneralCommTask<HttpServerImpl, HttpHandlerFactory>(server, fd, info) { GeneralCommTask<HttpServerImpl, HttpHandlerFactory>(server, fd, info), _handler(0) {
incCounter<GeneralServerStatistics::httpAccessor>(); incCounter<GeneralServerStatistics::httpAccessor>();
} }
@ -58,6 +58,7 @@ namespace triagens {
HttpCommTask::~HttpCommTask () { HttpCommTask::~HttpCommTask () {
decCounter<GeneralServerStatistics::httpAccessor>(); decCounter<GeneralServerStatistics::httpAccessor>();
destroyHandler();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -73,8 +74,10 @@ namespace triagens {
if (! readRequestBody) { if (! readRequestBody) {
const char * ptr = readBuffer->c_str() + readPosition; const char * ptr = readBuffer->c_str() + readPosition;
// TODO FIXME: HTTP request might be shorter than 4 bytes if malformed
const char * end = readBuffer->end() - 3; const char * end = readBuffer->end() - 3;
// TODO FIXME: HTTP request might not contain \r\n\r\n at all if malformed
for (; ptr < end; ptr++) { for (; ptr < end; ptr++) {
if (ptr[0] == '\r' && ptr[1] == '\n' && ptr[2] == '\r' && ptr[3] == '\n') { if (ptr[0] == '\r' && ptr[1] == '\n' && ptr[2] == '\r' && ptr[3] == '\n') {
break; break;
@ -209,10 +212,10 @@ namespace triagens {
bodyPosition = 0; bodyPosition = 0;
bodyLength = 0; bodyLength = 0;
HttpHandler* handler = server->createHandler(request); _handler = server->createHandler(request);
bool ok = false; bool ok = false;
if (handler == 0) { if (_handler == 0) {
LOGGER_TRACE << "no handler is known, giving up"; LOGGER_TRACE << "no handler is known, giving up";
delete request; delete request;
request = 0; request = 0;
@ -221,8 +224,11 @@ namespace triagens {
handleResponse(&response); handleResponse(&response);
} }
else { else {
// let the handler know the comm task
_handler->setTask(this);
request = 0; request = 0;
ok = server->handleRequest(this, handler); ok = server->handleRequest(this, _handler);
if (! ok) { if (! ok) {
HttpResponse response(HttpResponse::SERVER_ERROR); HttpResponse response(HttpResponse::SERVER_ERROR);
@ -237,7 +243,6 @@ namespace triagens {
} }
void HttpCommTask::addResponse (HttpResponse* response) { void HttpCommTask::addResponse (HttpResponse* response) {
StringBuffer * buffer; StringBuffer * buffer;
@ -256,5 +261,18 @@ namespace triagens {
// start output // start output
fillWriteBuffer(); fillWriteBuffer();
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the handler if any present
////////////////////////////////////////////////////////////////////////////////
void HttpCommTask::destroyHandler () {
if (_handler) {
_handler->setTask(0);
server->destroyHandler(_handler);
_handler = 0;
}
}
} }
} }

View File

@ -59,6 +59,14 @@ namespace triagens {
~HttpCommTask (); ~HttpCommTask ();
////////////////////////////////////////////////////////////////////////////////
/// @brief set a handler object
////////////////////////////////////////////////////////////////////////////////
void setHandler (HttpHandler* handler) {
_handler = handler;
}
protected: protected:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -72,6 +80,18 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void addResponse (HttpResponse*); void addResponse (HttpResponse*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the handler if any present
////////////////////////////////////////////////////////////////////////////////
void destroyHandler ();
////////////////////////////////////////////////////////////////////////////////
/// @brief the http handler used
////////////////////////////////////////////////////////////////////////////////
HttpHandler* _handler;
}; };
} }
} }

View File

@ -41,7 +41,7 @@ namespace triagens {
HttpHandler::HttpHandler (HttpRequest* request) HttpHandler::HttpHandler (HttpRequest* request)
: request(request), response(0) { : request(request), response(0), _task(0), _job(0) {
} }
@ -54,6 +54,10 @@ namespace triagens {
if (response != 0) { if (response != 0) {
delete response; delete response;
} }
if (_task != 0) {
_task->setHandler(0);
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -63,5 +67,35 @@ namespace triagens {
HttpResponse* HttpHandler::getResponse () { HttpResponse* HttpHandler::getResponse () {
return response; return response;
} }
bool HttpHandler::handleAsync () {
if (_job == 0) {
LOGGER_WARNING << "no job is known";
}
else {
HttpResponse* response = getResponse();
try {
if (response == 0) {
basics::InternalError err("no response received from handler");
handleError(err);
response = getResponse();
}
if (response != 0) {
_task->handleResponse(response);
}
}
catch (...) {
LOGGER_ERROR << "caught exception in " << __FILE__ << "@" << __LINE__;
}
// this might delete the handler (i.e. ourselves!)
return _job->beginShutdown();
}
return true;
}
} }
} }

View File

@ -29,6 +29,7 @@
#define TRIAGENS_FYN_REST_HTTP_HANDLER_H 1 #define TRIAGENS_FYN_REST_HTTP_HANDLER_H 1
#include <Rest/Handler.h> #include <Rest/Handler.h>
#include <HttpServer/HttpCommTask.h>
namespace triagens { namespace triagens {
namespace rest { namespace rest {
@ -40,7 +41,7 @@ namespace triagens {
/// @brief abstract class for http handlers /// @brief abstract class for http handlers
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
class HttpHandler : public Handler { class HttpHandler : public Handler {
private: private:
HttpHandler (HttpHandler const&); HttpHandler (HttpHandler const&);
HttpHandler& operator= (HttpHandler const&); HttpHandler& operator= (HttpHandler const&);
@ -62,8 +63,46 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
~HttpHandler (); ~HttpHandler ();
public: public:
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the signal
////////////////////////////////////////////////////////////////////////////////
virtual bool handleAsync ();
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the handler
////////////////////////////////////////////////////////////////////////////////
virtual void beginShutdown () {
if (!_job) {
delete this;
}
else {
LOGGER_DEBUG << "job is still active, trying to shutdown";
// this might delete the handler (i.e. ourselves!)
_job->beginShutdown();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the comm task
////////////////////////////////////////////////////////////////////////////////
void setTask (HttpCommTask* task) {
_task = task;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the job
////////////////////////////////////////////////////////////////////////////////
void setJob (Job* job) {
_job = job;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief returns the response /// @brief returns the response
@ -84,6 +123,18 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
HttpResponse* response; HttpResponse* response;
////////////////////////////////////////////////////////////////////////////////
/// @brief the comm task
////////////////////////////////////////////////////////////////////////////////
HttpCommTask* _task;
////////////////////////////////////////////////////////////////////////////////
/// @brief the job
////////////////////////////////////////////////////////////////////////////////
Job* _job;
}; };
} }
} }

View File

@ -227,6 +227,8 @@ namespace triagens {
void HttpHandlerFactory::destroyHandler (HttpHandler* handler) { void HttpHandlerFactory::destroyHandler (HttpHandler* handler) {
vector<MaintenanceCallback*> callbacks; vector<MaintenanceCallback*> callbacks;
assert(handler);
{ {
MUTEX_LOCKER(_activeHandlersLock); MUTEX_LOCKER(_activeHandlersLock);
_numberActiveHandlers--; _numberActiveHandlers--;
@ -235,7 +237,7 @@ namespace triagens {
_maintenanceCallbacks.swap(callbacks); _maintenanceCallbacks.swap(callbacks);
} }
delete handler; handler->beginShutdown();
} }
for (vector<MaintenanceCallback*>::iterator i = callbacks.begin(); i != callbacks.end(); ++i) { for (vector<MaintenanceCallback*>::iterator i = callbacks.begin(); i != callbacks.end(); ++i) {

View File

@ -41,7 +41,7 @@ namespace triagens {
/// @brief abstract class for handlers /// @brief abstract class for handlers
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
class Handler { class Handler {
private: private:
Handler (Handler const&); Handler (Handler const&);
Handler& operator= (Handler const&); Handler& operator= (Handler const&);
@ -117,6 +117,13 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
virtual void handleError (basics::TriagensError const&) = 0; virtual void handleError (basics::TriagensError const&) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the signal
////////////////////////////////////////////////////////////////////////////////
virtual bool handleAsync() = 0;
}; };
} }
} }