diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 3516e5f1b6..ae6eeb8300 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -192,7 +192,7 @@ add_executable(${BIN_ARANGOD} GeneralServer/GeneralServer.cpp GeneralServer/GeneralServerFeature.cpp GeneralServer/HttpCommTask.cpp - GeneralServer/HttpServerJob.cpp + GeneralServer/GeneralServerJob.cpp GeneralServer/HttpsCommTask.cpp GeneralServer/PathHandler.cpp GeneralServer/RestHandler.cpp diff --git a/arangod/GeneralServer/AsyncJobManager.cpp b/arangod/GeneralServer/AsyncJobManager.cpp index e231a9248b..4b8ffb9d22 100644 --- a/arangod/GeneralServer/AsyncJobManager.cpp +++ b/arangod/GeneralServer/AsyncJobManager.cpp @@ -25,7 +25,7 @@ #include "Basics/ReadLocker.h" #include "Basics/WriteLocker.h" -#include "GeneralServer/HttpServerJob.h" +#include "GeneralServer/GeneralServerJob.h" #include "GeneralServer/RestHandler.h" #include "Logger/Logger.h" #include "Rest/GeneralResponse.h" @@ -260,7 +260,7 @@ std::vector AsyncJobManager::byStatus( /// @brief initializes an async job //////////////////////////////////////////////////////////////////////////////// -void AsyncJobManager::initAsyncJob(HttpServerJob* job, char const* hdr) { +void AsyncJobManager::initAsyncJob(GeneralServerJob* job, char const* hdr) { AsyncCallbackContext* ctx = nullptr; if (hdr != nullptr) { diff --git a/arangod/GeneralServer/AsyncJobManager.h b/arangod/GeneralServer/AsyncJobManager.h index 0f56606495..3a238494a6 100644 --- a/arangod/GeneralServer/AsyncJobManager.h +++ b/arangod/GeneralServer/AsyncJobManager.h @@ -32,7 +32,7 @@ class GeneralResponse; namespace rest { class AsyncCallbackContext; -class HttpServerJob; +class GeneralServerJob; //////////////////////////////////////////////////////////////////////////////// /// @brief AsyncJobResult @@ -133,7 +133,7 @@ class AsyncJobManager { /// @brief initializes an async job ////////////////////////////////////////////////////////////////////////////// - void initAsyncJob(HttpServerJob*, char const*); + void initAsyncJob(GeneralServerJob*, char const*); ////////////////////////////////////////////////////////////////////////////// /// @brief finishes the execution of an async job diff --git a/arangod/GeneralServer/GeneralServer.cpp b/arangod/GeneralServer/GeneralServer.cpp index fb5e8f2b31..5e62d786aa 100644 --- a/arangod/GeneralServer/GeneralServer.cpp +++ b/arangod/GeneralServer/GeneralServer.cpp @@ -33,7 +33,7 @@ #include "GeneralServer/GeneralCommTask.h" #include "GeneralServer/GeneralListenTask.h" #include "GeneralServer/GeneralServerFeature.h" -#include "GeneralServer/HttpServerJob.h" +#include "GeneralServer/GeneralServerJob.h" #include "GeneralServer/RestHandler.h" #include "Logger/Logger.h" #include "Scheduler/ListenTask.h" @@ -127,15 +127,21 @@ bool GeneralServer::handleRequestAsync(GeneralCommTask* task, char const* hdr = found ? hdrStr.c_str() : nullptr; // execute the handler using the dispatcher +<<<<<<< HEAD std::unique_ptr job = std::make_unique( this, handler, true); // hander gets moved!!! task->getAgent(messageId)->transferTo(job.get()); +======= + std::unique_ptr job = + std::make_unique(this, std::move(handler), true); + task->RequestStatisticsAgent::transferTo(job.get()); +>>>>>>> obi-velocystream // register the job with the job manager if (jobId != nullptr) { GeneralServerFeature::JOB_MANAGER->initAsyncJob( - static_cast(job.get()), hdr); + static_cast(job.get()), hdr); *jobId = job->jobId(); } @@ -176,12 +182,12 @@ bool GeneralServer::handleRequest(GeneralCommTask* task, auto messageId = handler->request()->messageId(); // use a dispatcher queue, handler belongs to the job - std::unique_ptr job = std::make_unique( - this, handler); // handler is uique_ptr that gets moved + std::unique_ptr job = + std::make_unique(this, std::move(handler)); task->getAgent(messageId)->transferTo(job.get()); - LOG(TRACE) << "GeneralCommTask " << (void*)task << " created HttpServerJob " - << (void*)job.get(); + LOG(TRACE) << "GeneralCommTask " << (void*)task + << " created GeneralServerJob " << (void*)job.get(); // add the job to the dispatcher int res = DispatcherFeature::DISPATCHER->addJob(job, startThread); diff --git a/arangod/GeneralServer/HttpServerJob.cpp b/arangod/GeneralServer/GeneralServerJob.cpp similarity index 84% rename from arangod/GeneralServer/HttpServerJob.cpp rename to arangod/GeneralServer/GeneralServerJob.cpp index 772afb06fc..104f042684 100644 --- a/arangod/GeneralServer/HttpServerJob.cpp +++ b/arangod/GeneralServer/GeneralServerJob.cpp @@ -22,7 +22,7 @@ /// @author Achim Brandt //////////////////////////////////////////////////////////////////////////////// -#include "HttpServerJob.h" +#include "GeneralServerJob.h" #include "Basics/WorkMonitor.h" #include "Dispatcher/DispatcherQueue.h" @@ -42,28 +42,29 @@ using namespace arangodb::rest; /// @brief constructs a new server job //////////////////////////////////////////////////////////////////////////////// -HttpServerJob::HttpServerJob(GeneralServer* server, - WorkItem::uptr& handler, bool isAsync) - : Job("HttpServerJob"), +GeneralServerJob::GeneralServerJob(GeneralServer* server, + WorkItem::uptr handler, + bool isAsync) + : Job("GeneralServerJob"), _server(server), _workDesc(nullptr), _isAsync(isAsync) { - _handler.swap(handler); + _handler = std::move(handler); } //////////////////////////////////////////////////////////////////////////////// /// @brief destructs a server job //////////////////////////////////////////////////////////////////////////////// -HttpServerJob::~HttpServerJob() { +GeneralServerJob::~GeneralServerJob() { if (_workDesc != nullptr) { WorkMonitor::freeWorkDescription(_workDesc); } } -size_t HttpServerJob::queue() const { return _handler->queue(); } +size_t GeneralServerJob::queue() const { return _handler->queue(); } -void HttpServerJob::work() { +void GeneralServerJob::work() { TRI_ASSERT(_handler.get() != nullptr); RequestStatisticsAgent::transferTo(_handler.get()); @@ -106,13 +107,13 @@ void HttpServerJob::work() { _workDesc = WorkMonitor::popHandler(_handler.release(), false); } -bool HttpServerJob::cancel() { return _handler->cancel(); } +bool GeneralServerJob::cancel() { return _handler->cancel(); } -void HttpServerJob::cleanup(DispatcherQueue* queue) { +void GeneralServerJob::cleanup(DispatcherQueue* queue) { queue->removeJob(this); delete this; } -void HttpServerJob::handleError(arangodb::basics::Exception const& ex) { +void GeneralServerJob::handleError(arangodb::basics::Exception const& ex) { _handler->handleError(ex); } diff --git a/arangod/GeneralServer/HttpServerJob.h b/arangod/GeneralServer/GeneralServerJob.h similarity index 85% rename from arangod/GeneralServer/HttpServerJob.h rename to arangod/GeneralServer/GeneralServerJob.h index e4a6ae388f..1f1376b8af 100644 --- a/arangod/GeneralServer/HttpServerJob.h +++ b/arangod/GeneralServer/GeneralServerJob.h @@ -35,15 +35,15 @@ namespace rest { class RestHandler; class GeneralServer; -class HttpServerJob : public Job { - HttpServerJob(HttpServerJob const&) = delete; - HttpServerJob& operator=(HttpServerJob const&) = delete; +class GeneralServerJob : public Job { + GeneralServerJob(GeneralServerJob const&) = delete; + GeneralServerJob& operator=(GeneralServerJob const&) = delete; public: - HttpServerJob(GeneralServer*, arangodb::WorkItem::uptr&, - bool isAsync = false); + GeneralServerJob(GeneralServer*, arangodb::WorkItem::uptr, + bool isAsync = false); - ~HttpServerJob(); + ~GeneralServerJob(); public: RestHandler* handler() const { return _handler.get(); } diff --git a/arangod/GeneralServer/VppCommTask.cpp b/arangod/GeneralServer/VppCommTask.cpp index 792c20f6d8..788ddb80a7 100644 --- a/arangod/GeneralServer/VppCommTask.cpp +++ b/arangod/GeneralServer/VppCommTask.cpp @@ -280,6 +280,7 @@ bool VppCommTask::processRead() { // CASE 1: message is in one chunk if (chunkHeader._isFirst && chunkHeader._chunk == 1) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk contains single message"; std::size_t payloads = 0; try { @@ -308,17 +309,19 @@ bool VppCommTask::processRead() { // } doExecute = true; - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 1"; } // CASE 2: message is in multiple chunks auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID); // CASE 2a: chunk starts new message if (chunkHeader._isFirst) { // first chunk of multi chunk message + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk starts a new message"; if (incompleteMessageItr != _incompleteMessages.end()) { - throw std::logic_error( - "Message should be first but is already in the Map of incomplete " - "messages"); + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) + << "Message should be first but is already in the Map of incomplete " + "messages"; + closeTask(rest::ResponseCode::BAD); + return false; } // TODO: is a 32bit value sufficient for the messageLength here? @@ -329,17 +332,19 @@ bool VppCommTask::processRead() { auto insertPair = _incompleteMessages.emplace( std::make_pair(chunkHeader._messageID, std::move(message))); if (!insertPair.second) { - throw std::logic_error("insert failed"); - closeTask(); + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "insert failed"; + closeTask(rest::ResponseCode::BAD); + return false; } - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2a"; - // CASE 2b: chunk continues a message } else { // followup chunk of some mesage + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk continues a message"; if (incompleteMessageItr == _incompleteMessages.end()) { - throw std::logic_error("found message without previous part"); - closeTask(); + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) + << "found message without previous part"; + closeTask(rest::ResponseCode::BAD); + return false; } auto& im = incompleteMessageItr->second; // incomplete Message im._currentChunk++; @@ -349,6 +354,7 @@ bool VppCommTask::processRead() { // MESSAGE COMPLETE if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "chunk completes a message"; std::size_t payloads = 0; try { @@ -376,9 +382,9 @@ bool VppCommTask::processRead() { // check length doExecute = true; - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2b - complete"; } - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "CASE 2b - still incomplete"; + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) + << "chunk does not complete a message"; } read_maybe_only_part_of_buffer = true; @@ -402,8 +408,11 @@ bool VppCommTask::processRead() { try { type = header.at(1).getInt(); } catch (std::exception const& e) { - throw std::runtime_error( - std::string("Error during Parsing of VppHeader: ") + e.what()); + handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID); + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) + << std::string("VPack Validation failed!") + e.what(); + closeTask(rest::ResponseCode::BAD); + return false; } if (type == 1000) { // do auth