1
0
Fork 0

Merge branch 'obi-velocystream' into obi-velocystream-agent

* obi-velocystream:
  fix usage of unique_ptr
  rename HttpServerJob -> GeneralServerJob
  improve error handling
  improve error handling

Conflicts:
	arangod/GeneralServer/GeneralServer.cpp
This commit is contained in:
Jan Christoph Uhde 2016-08-26 09:20:12 +02:00
commit c42bc4016a
7 changed files with 58 additions and 42 deletions

View File

@ -192,7 +192,7 @@ add_executable(${BIN_ARANGOD}
GeneralServer/GeneralServer.cpp
GeneralServer/GeneralServerFeature.cpp
GeneralServer/HttpCommTask.cpp
GeneralServer/HttpServerJob.cpp
GeneralServer/GeneralServerJob.cpp
GeneralServer/HttpsCommTask.cpp
GeneralServer/PathHandler.cpp
GeneralServer/RestHandler.cpp

View File

@ -25,7 +25,7 @@
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "GeneralServer/HttpServerJob.h"
#include "GeneralServer/GeneralServerJob.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/GeneralResponse.h"
@ -260,7 +260,7 @@ std::vector<AsyncJobResult::IdType> AsyncJobManager::byStatus(
/// @brief initializes an async job
////////////////////////////////////////////////////////////////////////////////
void AsyncJobManager::initAsyncJob(HttpServerJob* job, char const* hdr) {
void AsyncJobManager::initAsyncJob(GeneralServerJob* job, char const* hdr) {
AsyncCallbackContext* ctx = nullptr;
if (hdr != nullptr) {

View File

@ -32,7 +32,7 @@ class GeneralResponse;
namespace rest {
class AsyncCallbackContext;
class HttpServerJob;
class GeneralServerJob;
////////////////////////////////////////////////////////////////////////////////
/// @brief AsyncJobResult
@ -133,7 +133,7 @@ class AsyncJobManager {
/// @brief initializes an async job
//////////////////////////////////////////////////////////////////////////////
void initAsyncJob(HttpServerJob*, char const*);
void initAsyncJob(GeneralServerJob*, char const*);
//////////////////////////////////////////////////////////////////////////////
/// @brief finishes the execution of an async job

View File

@ -33,7 +33,7 @@
#include "GeneralServer/GeneralCommTask.h"
#include "GeneralServer/GeneralListenTask.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/HttpServerJob.h"
#include "GeneralServer/GeneralServerJob.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Scheduler/ListenTask.h"
@ -127,15 +127,21 @@ bool GeneralServer::handleRequestAsync(GeneralCommTask* task,
char const* hdr = found ? hdrStr.c_str() : nullptr;
// execute the handler using the dispatcher
<<<<<<< HEAD
std::unique_ptr<Job> job = std::make_unique<HttpServerJob>(
this, handler, true); // hander gets moved!!!
task->getAgent(messageId)->transferTo(job.get());
=======
std::unique_ptr<Job> job =
std::make_unique<GeneralServerJob>(this, std::move(handler), true);
task->RequestStatisticsAgent::transferTo(job.get());
>>>>>>> obi-velocystream
// register the job with the job manager
if (jobId != nullptr) {
GeneralServerFeature::JOB_MANAGER->initAsyncJob(
static_cast<HttpServerJob*>(job.get()), hdr);
static_cast<GeneralServerJob*>(job.get()), hdr);
*jobId = job->jobId();
}
@ -176,12 +182,12 @@ bool GeneralServer::handleRequest(GeneralCommTask* task,
auto messageId = handler->request()->messageId();
// use a dispatcher queue, handler belongs to the job
std::unique_ptr<Job> job = std::make_unique<HttpServerJob>(
this, handler); // handler is uique_ptr that gets moved
std::unique_ptr<Job> job =
std::make_unique<GeneralServerJob>(this, std::move(handler));
task->getAgent(messageId)->transferTo(job.get());
LOG(TRACE) << "GeneralCommTask " << (void*)task << " created HttpServerJob "
<< (void*)job.get();
LOG(TRACE) << "GeneralCommTask " << (void*)task
<< " created GeneralServerJob " << (void*)job.get();
// add the job to the dispatcher
int res = DispatcherFeature::DISPATCHER->addJob(job, startThread);

View File

@ -22,7 +22,7 @@
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#include "HttpServerJob.h"
#include "GeneralServerJob.h"
#include "Basics/WorkMonitor.h"
#include "Dispatcher/DispatcherQueue.h"
@ -42,28 +42,29 @@ using namespace arangodb::rest;
/// @brief constructs a new server job
////////////////////////////////////////////////////////////////////////////////
HttpServerJob::HttpServerJob(GeneralServer* server,
WorkItem::uptr<RestHandler>& handler, bool isAsync)
: Job("HttpServerJob"),
GeneralServerJob::GeneralServerJob(GeneralServer* server,
WorkItem::uptr<RestHandler> handler,
bool isAsync)
: Job("GeneralServerJob"),
_server(server),
_workDesc(nullptr),
_isAsync(isAsync) {
_handler.swap(handler);
_handler = std::move(handler);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructs a server job
////////////////////////////////////////////////////////////////////////////////
HttpServerJob::~HttpServerJob() {
GeneralServerJob::~GeneralServerJob() {
if (_workDesc != nullptr) {
WorkMonitor::freeWorkDescription(_workDesc);
}
}
size_t HttpServerJob::queue() const { return _handler->queue(); }
size_t GeneralServerJob::queue() const { return _handler->queue(); }
void HttpServerJob::work() {
void GeneralServerJob::work() {
TRI_ASSERT(_handler.get() != nullptr);
RequestStatisticsAgent::transferTo(_handler.get());
@ -106,13 +107,13 @@ void HttpServerJob::work() {
_workDesc = WorkMonitor::popHandler(_handler.release(), false);
}
bool HttpServerJob::cancel() { return _handler->cancel(); }
bool GeneralServerJob::cancel() { return _handler->cancel(); }
void HttpServerJob::cleanup(DispatcherQueue* queue) {
void GeneralServerJob::cleanup(DispatcherQueue* queue) {
queue->removeJob(this);
delete this;
}
void HttpServerJob::handleError(arangodb::basics::Exception const& ex) {
void GeneralServerJob::handleError(arangodb::basics::Exception const& ex) {
_handler->handleError(ex);
}

View File

@ -35,15 +35,15 @@ namespace rest {
class RestHandler;
class GeneralServer;
class HttpServerJob : public Job {
HttpServerJob(HttpServerJob const&) = delete;
HttpServerJob& operator=(HttpServerJob const&) = delete;
class GeneralServerJob : public Job {
GeneralServerJob(GeneralServerJob const&) = delete;
GeneralServerJob& operator=(GeneralServerJob const&) = delete;
public:
HttpServerJob(GeneralServer*, arangodb::WorkItem::uptr<RestHandler>&,
bool isAsync = false);
GeneralServerJob(GeneralServer*, arangodb::WorkItem::uptr<RestHandler>,
bool isAsync = false);
~HttpServerJob();
~GeneralServerJob();
public:
RestHandler* handler() const { return _handler.get(); }

View File

@ -280,6 +280,7 @@ bool VppCommTask::processRead() {
// CASE 1: message is in one chunk
if (chunkHeader._isFirst && chunkHeader._chunk == 1) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk contains single message";
std::size_t payloads = 0;
try {
@ -308,17 +309,19 @@ bool VppCommTask::processRead() {
// }
doExecute = true;
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 1";
}
// CASE 2: message is in multiple chunks
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID);
// CASE 2a: chunk starts new message
if (chunkHeader._isFirst) { // first chunk of multi chunk message
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk starts a new message";
if (incompleteMessageItr != _incompleteMessages.end()) {
throw std::logic_error(
"Message should be first but is already in the Map of incomplete "
"messages");
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "Message should be first but is already in the Map of incomplete "
"messages";
closeTask(rest::ResponseCode::BAD);
return false;
}
// TODO: is a 32bit value sufficient for the messageLength here?
@ -329,17 +332,19 @@ bool VppCommTask::processRead() {
auto insertPair = _incompleteMessages.emplace(
std::make_pair(chunkHeader._messageID, std::move(message)));
if (!insertPair.second) {
throw std::logic_error("insert failed");
closeTask();
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "insert failed";
closeTask(rest::ResponseCode::BAD);
return false;
}
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2a";
// CASE 2b: chunk continues a message
} else { // followup chunk of some mesage
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk continues a message";
if (incompleteMessageItr == _incompleteMessages.end()) {
throw std::logic_error("found message without previous part");
closeTask();
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "found message without previous part";
closeTask(rest::ResponseCode::BAD);
return false;
}
auto& im = incompleteMessageItr->second; // incomplete Message
im._currentChunk++;
@ -349,6 +354,7 @@ bool VppCommTask::processRead() {
// MESSAGE COMPLETE
if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk completes a message";
std::size_t payloads = 0;
try {
@ -376,9 +382,9 @@ bool VppCommTask::processRead() {
// check length
doExecute = true;
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2b - complete";
}
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2b - still incomplete";
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "chunk does not complete a message";
}
read_maybe_only_part_of_buffer = true;
@ -402,8 +408,11 @@ bool VppCommTask::processRead() {
try {
type = header.at(1).getInt();
} catch (std::exception const& e) {
throw std::runtime_error(
std::string("Error during Parsing of VppHeader: ") + e.what());
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< std::string("VPack Validation failed!") + e.what();
closeTask(rest::ResponseCode::BAD);
return false;
}
if (type == 1000) {
// do auth