diff --git a/arangod/GeneralServer/GeneralCommTask.cpp b/arangod/GeneralServer/GeneralCommTask.cpp index 54103aaaca..82769456cb 100644 --- a/arangod/GeneralServer/GeneralCommTask.cpp +++ b/arangod/GeneralServer/GeneralCommTask.cpp @@ -36,6 +36,7 @@ #include "Logger/Logger.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" +#include "Rest/VppResponse.h" using namespace arangodb; using namespace arangodb::basics; @@ -66,11 +67,11 @@ GeneralCommTask::~GeneralCommTask() { for (auto& i : _writeBuffersStats) { TRI_ReleaseRequestStatistics(i); } - - httpClearRequest(); } void GeneralCommTask::signalTask(TaskData* data) { + // used to output text + // data response if (data->_type == TaskData::TASK_DATA_RESPONSE) { data->RequestStatisticsAgent::transferTo(this); @@ -87,12 +88,21 @@ void GeneralCommTask::signalTask(TaskData* data) { // buffer response else if (data->_type == TaskData::TASK_DATA_BUFFER) { + std::unique_ptr response; + if (transportType() == Endpoint::TransportType::VPP) { + response = std::unique_ptr(new VppResponse( + GeneralResponse::ResponseCode::OK, 0 /*id unset FIXME?*/)); + } else { + response = std::unique_ptr( + new HttpResponse(GeneralResponse::ResponseCode::OK)); + } + data->RequestStatisticsAgent::transferTo(this); - HttpResponse response(GeneralResponse::ResponseCode::OK); velocypack::Slice slice(data->_buffer->data()); - response.setPayload(slice, true, VPackOptions::Defaults); - processResponse(&response); + // FIXME (obi) contentType - text set header/meta information? + response->setPayload(slice, true, VPackOptions::Defaults); + processResponse(response.get()); processRead(); } @@ -155,7 +165,6 @@ void GeneralCommTask::fillWriteBuffer() { void GeneralCommTask::executeRequest(GeneralRequest* request, GeneralResponse* response) { - // create a handler for this request WorkItem::uptr handler( GeneralServerFeature::HANDLER_FACTORY->createHandler(request, response)); @@ -165,7 +174,8 @@ void GeneralCommTask::executeRequest(GeneralRequest* request, httpClearRequest(); delete response; - handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND, 0); + handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND, + request->messageId()); return; } @@ -176,9 +186,10 @@ void GeneralCommTask::executeRequest(GeneralRequest* request, std::string const& asyncExecution = request->header(StaticStrings::Async, found); + // TODO(fc) // the responsibility for the request has been moved to the handler - // TODO(fc) _request should - //_request = nullptr; + // so we give up ownage here by setting _request = nullptr + httpNullRequest(); // http specific - should be removed FIXME // async execution bool ok = false; @@ -215,13 +226,15 @@ void GeneralCommTask::executeRequest(GeneralRequest* request, } if (!ok) { - handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR, 0); + handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR, + request->messageId()); } } void GeneralCommTask::processResponse(GeneralResponse* response) { if (response == nullptr) { - handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR, 0); + handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR, + response->messageId()); } else { addResponse(response, false); } diff --git a/arangod/GeneralServer/GeneralCommTask.h b/arangod/GeneralServer/GeneralCommTask.h index 01754df75d..e768bb58ab 100644 --- a/arangod/GeneralServer/GeneralCommTask.h +++ b/arangod/GeneralServer/GeneralCommTask.h @@ -22,8 +22,8 @@ /// @author Achim Brandt //////////////////////////////////////////////////////////////////////////////// -#ifndef ARANGOD_HTTP_SERVER_HTTP_COMM_TASK_H -#define ARANGOD_HTTP_SERVER_HTTP_COMM_TASK_H 1 +#ifndef ARANGOD_GENERAL_SERVER_GENERAL_COMM_TASK_H +#define ARANGOD_GENERAL_SERVER_GENERAL_COMM_TASK_H 1 #include "Scheduler/SocketTask.h" @@ -149,6 +149,7 @@ class GeneralCommTask : public SocketTask, public RequestStatisticsAgent { protected: virtual void httpClearRequest(){}; // should be removed + virtual void httpNullRequest(){}; // should be removed void executeRequest(GeneralRequest*, GeneralResponse*); // TODO(fc) move to SocketTask @@ -161,7 +162,8 @@ class GeneralCommTask : public SocketTask, public RequestStatisticsAgent { virtual void handleSimpleError(GeneralResponse::ResponseCode, uint64_t messagid) = 0; virtual void handleSimpleError(GeneralResponse::ResponseCode, int code, - std::string const& errorMessage) = 0; + std::string const& errorMessage, + uint64_t messageId) = 0; void fillWriteBuffer(); // fills SocketTasks _writeBuffer // _writeBufferStatistics from // _writeBuffers/_writeBuffersStats diff --git a/arangod/GeneralServer/HttpCommTask.cpp b/arangod/GeneralServer/HttpCommTask.cpp index 658b0e920a..0e198cbd98 100644 --- a/arangod/GeneralServer/HttpCommTask.cpp +++ b/arangod/GeneralServer/HttpCommTask.cpp @@ -230,7 +230,7 @@ bool HttpCommTask::processRead() { // header is too large handleSimpleError( GeneralResponse::ResponseCode::REQUEST_HEADER_FIELDS_TOO_LARGE, - 0); // FIXME messageid not required + 1); // ID does not matter for http (http default is 1) return false; } @@ -260,7 +260,7 @@ bool HttpCommTask::processRead() { _protocolVersion != GeneralRequest::ProtocolVersion::HTTP_1_1) { handleSimpleError( GeneralResponse::ResponseCode::HTTP_VERSION_NOT_SUPPORTED, - 0); // FIXME + 1); // FIXME return false; } @@ -270,7 +270,7 @@ bool HttpCommTask::processRead() { if (_fullUrl.size() > 16384) { handleSimpleError(GeneralResponse::ResponseCode::REQUEST_URI_TOO_LONG, - 0); // FIXME + 1); // FIXME return false; } @@ -373,7 +373,7 @@ bool HttpCommTask::processRead() { // bad request, method not allowed handleSimpleError(GeneralResponse::ResponseCode::METHOD_NOT_ALLOWED, - 0); // FIXME + 1); return false; } } @@ -495,12 +495,12 @@ bool HttpCommTask::processRead() { // not found else if (authResult == GeneralResponse::ResponseCode::NOT_FOUND) { handleSimpleError(authResult, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, - TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND)); + TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND), 1); } // forbidden else if (authResult == GeneralResponse::ResponseCode::FORBIDDEN) { handleSimpleError(authResult, TRI_ERROR_USER_CHANGE_PASSWORD, - "change password"); + "change password", 1); } // not authenticated else { @@ -600,8 +600,7 @@ bool HttpCommTask::checkContentLength(bool expectContentLength) { if (bodyLength < 0) { // bad request, body length is < 0. this is a client error - handleSimpleError(GeneralResponse::ResponseCode::LENGTH_REQUIRED, - 0); // FIXME + handleSimpleError(GeneralResponse::ResponseCode::LENGTH_REQUIRED); return false; } @@ -792,7 +791,9 @@ HttpRequest* HttpCommTask::requestAsHttp() { void HttpCommTask::handleSimpleError(GeneralResponse::ResponseCode responseCode, int errorNum, - std::string const& errorMessage) { + std::string const& errorMessage, + uint64_t messageId) { + (void)messageId; HttpResponse response(responseCode); VPackBuilder builder; @@ -812,8 +813,8 @@ void HttpCommTask::handleSimpleError(GeneralResponse::ResponseCode responseCode, } void HttpCommTask::clearRequest() { - if (_request != nullptr) { + if (_request) { delete _request; + _request = nullptr; } - _request = nullptr; } diff --git a/arangod/GeneralServer/HttpCommTask.h b/arangod/GeneralServer/HttpCommTask.h index 0bb7d50cf8..9b8e079ca9 100644 --- a/arangod/GeneralServer/HttpCommTask.h +++ b/arangod/GeneralServer/HttpCommTask.h @@ -31,18 +31,21 @@ class HttpCommTask : public GeneralCommTask { }; protected: + ~HttpCommTask() { clearRequest(); } + void handleChunk(char const*, size_t) override final; void completedWriteBuffer() override final; - // clears the request object, TODO(fc) see below + // clears the request object, REVIEW/TODO(fc) void clearRequest(); - void httpClearRequest() override { clearRequest(); } + void httpNullRequest() override { _request = nullptr; } void handleSimpleError(GeneralResponse::ResponseCode code, - uint64_t id) override; + uint64_t id = 1) override final; void handleSimpleError(GeneralResponse::ResponseCode, int code, - std::string const& errorMessage) override; + std::string const& errorMessage, + uint64_t messageId = 1) override final; private: void processRequest(); @@ -64,7 +67,7 @@ class HttpCommTask : public GeneralCommTask { private: // the request with possible incomplete body - // TODO(fc) needs to be removed, depends on the underlying protocol + // REVIEW(fc) GeneralRequest* _request = nullptr; size_t _readPosition; // current read position @@ -94,7 +97,7 @@ class HttpCommTask : public GeneralCommTask { // true if request is complete but not handled bool _requestPending = false; }; -} // rest -} // arangodb +} +} #endif diff --git a/arangod/GeneralServer/VppCommTask.cpp b/arangod/GeneralServer/VppCommTask.cpp index 24bdb89f2c..361a3cab7f 100644 --- a/arangod/GeneralServer/VppCommTask.cpp +++ b/arangod/GeneralServer/VppCommTask.cpp @@ -368,14 +368,16 @@ bool VppCommTask::processRead() { << "got request:" << message._header.toJson(); // the handler will take ownersip of this pointer - VppRequest* request = new VppRequest(_connectionInfo, std::move(message)); + VppRequest* request = new VppRequest(_connectionInfo, std::move(message), + chunkHeader._messageID); GeneralServerFeature::HANDLER_FACTORY->setRequestContext(request); // make sure we have a dabase if (request->requestContext() == nullptr) { handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, - TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND)); + TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND), + chunkHeader._messageID); } else { request->setClientTaskId(_taskId); _protocolVersion = request->protocolVersion(); @@ -426,19 +428,11 @@ void VppCommTask::resetState(bool close) { // return context->authenticate(); // } -// convert internal GeneralRequest to VppRequest -// VppRequest* VppCommTask::requestAsVpp() { -// VppRequest* request = dynamic_cast(_request); -// if (request == nullptr) { -// THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); -// } -// return request; -// }; - void VppCommTask::handleSimpleError(GeneralResponse::ResponseCode responseCode, int errorNum, - std::string const& errorMessage) { - VppResponse response(responseCode, 0); // FIXME!!!!!! + std::string const& errorMessage, + uint64_t messageId) { + VppResponse response(responseCode, messageId); VPackBuilder builder; builder.openObject(); diff --git a/arangod/GeneralServer/VppCommTask.h b/arangod/GeneralServer/VppCommTask.h index 4d89fa1b54..6e2cc9990c 100644 --- a/arangod/GeneralServer/VppCommTask.h +++ b/arangod/GeneralServer/VppCommTask.h @@ -68,7 +68,8 @@ class VppCommTask : public GeneralCommTask { addResponse(&response, true); } void handleSimpleError(GeneralResponse::ResponseCode, int code, - std::string const& errorMessage) override; + std::string const& errorMessage, + uint64_t messageId) override; private: // resets the internal state this method can be called to clean up when the diff --git a/lib/Rest/GeneralRequest.h b/lib/Rest/GeneralRequest.h index 42deb7822b..d9f783b07b 100644 --- a/lib/Rest/GeneralRequest.h +++ b/lib/Rest/GeneralRequest.h @@ -32,6 +32,7 @@ #include #include #include +#include namespace arangodb { namespace velocypack { @@ -165,6 +166,11 @@ class GeneralRequest { std::vector const& suffix() const { return _suffix; } void addSuffix(std::string&& part); + // VIRTUAL ////////////////////////////////////////////// + // return 0 for protocols that + // do not care about message ids + virtual uint64_t messageId() { return 1; } + virtual arangodb::Endpoint::TransportType transportType() = 0; virtual int64_t contentLength() const = 0; // get value from headers map. The key must be lowercase. diff --git a/lib/Rest/GeneralResponse.h b/lib/Rest/GeneralResponse.h index 9baddc5652..ebf0d712fe 100644 --- a/lib/Rest/GeneralResponse.h +++ b/lib/Rest/GeneralResponse.h @@ -183,6 +183,8 @@ class GeneralResponse { } public: + virtual uint64_t messageId() { return 1; } + virtual void reset(ResponseCode) = 0; // generates the response body, sets the content type; this might diff --git a/lib/Rest/VppRequest.cpp b/lib/Rest/VppRequest.cpp index d6f4cdcd11..1c07fc0707 100644 --- a/lib/Rest/VppRequest.cpp +++ b/lib/Rest/VppRequest.cpp @@ -58,10 +58,11 @@ std::string const& lookupStringInMap( } VppRequest::VppRequest(ConnectionInfo const& connectionInfo, - VPackMessage&& message) + VPackMessage&& message, uint64_t messageId) : GeneralRequest(connectionInfo), _message(std::move(message)), - _headers(nullptr) { + _headers(nullptr), + _messageId(messageId) { _protocol = "vpp"; _contentType = ContentType::VPACK; _contentTypeResponse = ContentType::VPACK; diff --git a/lib/Rest/VppRequest.h b/lib/Rest/VppRequest.h index 3842f706cc..f16d8edeae 100644 --- a/lib/Rest/VppRequest.h +++ b/lib/Rest/VppRequest.h @@ -59,12 +59,14 @@ class VppRequest : public GeneralRequest { friend class RestBatchHandler; // TODO must be removed private: - VppRequest(ConnectionInfo const& connectionInfo, VPackMessage&& message); + VppRequest(ConnectionInfo const& connectionInfo, VPackMessage&& message, + uint64_t messageId); public: ~VppRequest() {} public: + virtual uint64_t messageId() { return _messageId; } VPackSlice payload(arangodb::velocypack::Options const*) override; int64_t contentLength() const override { @@ -98,6 +100,7 @@ class VppRequest : public GeneralRequest { // values are query parameters std::unordered_map _values; std::unordered_map> _arrayValues; + uint64_t _messageId; const std::unordered_map _cookies; // TODO remove void parseHeaderInformation(); diff --git a/lib/Rest/VppResponse.cpp b/lib/Rest/VppResponse.cpp index 48f6f18a03..7ac9065a05 100644 --- a/lib/Rest/VppResponse.cpp +++ b/lib/Rest/VppResponse.cpp @@ -44,7 +44,7 @@ using namespace arangodb::basics; bool VppResponse::HIDE_PRODUCT_HEADER = false; VppResponse::VppResponse(ResponseCode code, uint64_t id) - : GeneralResponse(code), _header(nullptr), _payload(), _messageID(id) { + : GeneralResponse(code), _header(nullptr), _payload(), _messageId(id) { _contentType = ContentType::VPACK; _connectionType = CONNECTION_KEEP_ALIVE; } @@ -77,7 +77,7 @@ VPackMessageNoOwnBuffer VppResponse::prepareForNetwork() { builder.close(); _header = builder.steal(); return VPackMessageNoOwnBuffer(VPackSlice(_header->data()), - VPackSlice(_payload.data()), _messageID, + VPackSlice(_payload.data()), _messageId, _generateBody); } // void VppResponse::writeHeader(basics::StringBuffer*) {} diff --git a/lib/Rest/VppResponse.h b/lib/Rest/VppResponse.h index 7925f59e7c..733a595723 100644 --- a/lib/Rest/VppResponse.h +++ b/lib/Rest/VppResponse.h @@ -49,6 +49,7 @@ class VppResponse : public GeneralResponse { static bool HIDE_PRODUCT_HEADER; // required by base + virtual uint64_t messageId() override { return _messageId; } void reset(ResponseCode code) final; void setPayload(arangodb::velocypack::Slice const&, bool generateBody, arangodb::velocypack::Options const&) final; @@ -64,7 +65,7 @@ class VppResponse : public GeneralResponse { std::shared_ptr> _header; // generated form _headers when prepared for network VPackBuffer _payload; - uint64_t _messageID; + uint64_t _messageId; bool _generateBody; // this must be true if payload should be send }; }