1
0
Fork 0
arangodb/lib/ZeroMQ/ZeroMQBatchJob.cpp

426 lines
14 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// @brief ZeroMQ batch job
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2012 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Copyright 2012, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "ZeroMQBatchJob.h"
#include "Basics/Exceptions.h"
#include "HttpServer/HttpHandler.h"
#include "HttpServer/HttpHandlerFactory.h"
#include "Logger/Logger.h"
#include "ProtocolBuffers/HttpRequestProtobuf.h"
#include "Rest/HttpResponse.h"
using namespace triagens::basics;
using namespace triagens::rest;
using namespace std;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Dispatcher
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief constructrs a new jobs
////////////////////////////////////////////////////////////////////////////////
ZeroMQBatchJob::ZeroMQBatchJob (zframe_t* address,
HttpHandlerFactory* handlerFactory,
char const* data,
size_t length)
: Job("ZeroMQBatchJob"),
_requests(),
_responses(),
_address(address),
_handlerFactory(handlerFactory),
_handler(0),
_nrCurrentRequest(0) {
int ok = _requests.ParseFromArray(data, length);
if (! ok) {
PB_ArangoBatchMessage* message = _responses.add_messages();
LOGGER_ERROR << "received corrupted ZeroMQ message in " << __FILE__ << "@" << __LINE__;
message->set_type(PB_ERROR_RESPONSE);
PB_ArangoErrorResponse* em = message->mutable_errorresponse();
em->set_message("cannot get any response from handler");
_nrCurrentRequest = (size_t) _requests.messages().size();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructors
////////////////////////////////////////////////////////////////////////////////
ZeroMQBatchJob::~ZeroMQBatchJob () {
// clear the current handler
if (_handler != 0) {
delete _handler;
_handler = 0;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Dispatcher
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief returns true if all requests are handled
////////////////////////////////////////////////////////////////////////////////
bool ZeroMQBatchJob::isDone () {
return ! (_nrCurrentRequest < (size_t) _requests.messages().size());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns true if current request is direct
////////////////////////////////////////////////////////////////////////////////
bool ZeroMQBatchJob::isDirect () {
return (_handler != 0) ? _handler->isDirect() : true;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- Job methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Dispatcher
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
Job::JobType ZeroMQBatchJob::type () {
return _handler == 0 ? Job::READ_JOB : _handler->type();
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
string const& ZeroMQBatchJob::queue () {
static string const standard = "STANDARD";
return _handler == 0 ? standard : _handler->queue();
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::setDispatcherThread (DispatcherThread* thread) {
if (_handler != 0) {
_handler->setDispatcherThread(thread);
}
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
Job::status_e ZeroMQBatchJob::work () {
Handler::status_e status = executeCurrentRequest();
// if the handler wants to requeue, stop and put job back onto the queue
if (status == Handler::HANDLER_REQUEUE) {
return JOB_REQUEUE;
}
// if the handler failed, skip this request
if (status == Handler::HANDLER_FAILED) {
handleInternalError();
}
// if we dealt with all requests, send response back to client
if (isDone()) {
return JOB_DONE_ZEROMQ;
}
// handle direct requests now
if (isDirect()) {
return work();
}
// otherwise put job back onto the queue, handle next request
return JOB_REQUEUE;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::cleanup () {
delete this;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
///
/// Finishes the job and sends the result to the internal ZeroMQ server.
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::finish (void* bridge) {
// need a bridge the ZeroMQ queue
if (bridge != 0) {
// serialize protocol buffers
string data;
_responses.SerializeToString(&data);
// copy serialized data into message
zframe_t* empty = zframe_new("", 0);
zframe_t* content = zframe_new(data.c_str(), data.size());
// send the peer address, followed by the content
zframe_send(&_address, bridge, ZFRAME_MORE);
zframe_send(&empty, bridge, ZFRAME_MORE);
zframe_send(&content, bridge, 0);
}
// not good
else {
LOGGER_ERROR << "cannot communicate with internal ZeroMQ queue, no socket is known";
}
// commit suicide
cleanup();
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::handleError (TriagensError const& ex) {
_handler->handleError(ex);
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool ZeroMQBatchJob::beginShutdown () {
// TODO FIXME: check what this method should do
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief extracts the current request
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::extractNextRequest () {
// clear the current handler, this will clear the current request
if (_handler != 0) {
delete _handler;
_handler = 0;
}
if (_nrCurrentRequest < (size_t) _requests.messages().size()) {
HttpRequest* request = new HttpRequestProtobuf(_requests.messages().Get(_nrCurrentRequest));
if (_handlerFactory != 0) {
_handler = _handlerFactory->createHandler(request);
}
if (_handler == 0) {
delete request;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup Dispatcher
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the current request
////////////////////////////////////////////////////////////////////////////////
Handler::status_e ZeroMQBatchJob::executeCurrentRequest () {
Handler::status_e status = Handler::HANDLER_FAILED;
try {
if (_handler == 0) {
handleNotFound();
status = Handler::HANDLER_DONE;
}
else {
try {
status = _handler->execute();
}
catch (basics::TriagensError const& ex) {
_handler->handleError(ex);
}
catch (std::exception const& ex) {
basics::InternalError err(ex);
_handler->handleError(err);
}
catch (...) {
basics::InternalError err;
_handler->handleError(err);
}
if (status == Handler::HANDLER_REQUEUE) {
return status;
}
// store a response or an error message
handleResponse();
}
// get next request and request handler
++_nrCurrentRequest;
extractNextRequest();
}
catch (basics::TriagensError const& ex) {
LOGGER_ERROR << "caught exception in " << __FILE__ << "@" << __LINE__ << ": " << DIAGNOSTIC_INFORMATION(ex);
}
catch (std::exception const& ex) {
LOGGER_ERROR << "caught exception in " << __FILE__ << "@" << __LINE__ << ": " << ex.what();
}
catch (...) {
LOGGER_ERROR << "caught exception in " << __FILE__ << "@" << __LINE__;
}
return status;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handles one response
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::handleResponse () {
HttpResponse* response = _handler->getResponse();
// we need a response, otherwise create an error object
if (response == 0) {
InternalError err("no response received from handler");
_handler->handleError(err);
response = _handler->getResponse();
}
// send the response to ZeroMQ
PB_ArangoBatchMessage* message = _responses.add_messages();
if (response != 0) {
response->write(message);
}
else {
LOGGER_ERROR << "cannot get any response in " << __FILE__ << "@" << __LINE__;
message->set_type(PB_ERROR_RESPONSE);
PB_ArangoErrorResponse* em = message->mutable_errorresponse();
em->set_message("cannot get any response from handler");
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handles not-found
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::handleNotFound () {
PB_ArangoBatchMessage* message = _responses.add_messages();
message->set_type(PB_BLOB_RESPONSE);
PB_ArangoBlobResponse* response = message->mutable_blobresponse();
response->set_status(HttpResponse::NOT_FOUND);
response->set_contenttype(PB_NO_CONTENT);
response->set_contentlength(0);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handles server error
////////////////////////////////////////////////////////////////////////////////
void ZeroMQBatchJob::handleInternalError () {
HttpResponse* response = _handler->getResponse();
PB_ArangoBatchMessage* message = _responses.add_messages();
if (response == 0) {
message->set_type(PB_BLOB_RESPONSE);
PB_ArangoBlobResponse* response = message->mutable_blobresponse();
response->set_status(HttpResponse::SERVER_ERROR);
response->set_contenttype(PB_NO_CONTENT);
}
else {
response->write(message);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}\\)"
// End: