diff --git a/arangod/GeneralServer/GeneralListenTask.cpp b/arangod/GeneralServer/GeneralListenTask.cpp index e49530e2a6..45c58b27d1 100644 --- a/arangod/GeneralServer/GeneralListenTask.cpp +++ b/arangod/GeneralServer/GeneralListenTask.cpp @@ -26,6 +26,7 @@ #include "GeneralServer/GeneralServer.h" #include "GeneralServer/GeneralServerFeature.h" +#include "GeneralServer/VppCommTask.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" #include "Ssl/SslServerFeature.h" @@ -64,11 +65,11 @@ bool GeneralListenTask::handleConnected(TRI_socket_t socket, switch (_connectionType) { case ConnectionType::VPPS: commTask = - new HttpCommTask(_server, socket, std::move(info), _keepAliveTimeout); + new VppCommTask(_server, socket, std::move(info), _keepAliveTimeout); break; case ConnectionType::VPP: commTask = - new HttpCommTask(_server, socket, std::move(info), _keepAliveTimeout); + new VppCommTask(_server, socket, std::move(info), _keepAliveTimeout); break; case ConnectionType::HTTPS: commTask = new HttpsCommTask(_server, socket, std::move(info), diff --git a/arangod/GeneralServer/VppCommTask.cpp b/arangod/GeneralServer/VppCommTask.cpp index 07bf2131fe..296617aeeb 100644 --- a/arangod/GeneralServer/VppCommTask.cpp +++ b/arangod/GeneralServer/VppCommTask.cpp @@ -118,9 +118,17 @@ std::unique_ptr createChunkForNetworkMultiFollow( } } +VppCommTask::VppCommTask(GeneralServer* server, TRI_socket_t sock, + ConnectionInfo&& info, double timeout) + : Task("VppCommTask"), + GeneralCommTask(server, sock, std::move(info), timeout) { + _protocol = "vpp"; + // connectionStatisticsAgentSetVpp(); +} + void VppCommTask::addResponse(VppResponse* response, bool isError) { if (isError) { - // FIXME + // FIXME (obi) // what do we need to do? // clean read buffer? reset process read cursor } @@ -130,16 +138,17 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) { std::vector slices; slices.push_back(response_message._header); + // if payload != Slice() slices.push_back(response_message._payload); uint32_t message_length = 0; for (auto const& slice : slices) { - message_length = slice.byteSize(); + message_length += slice.byteSize(); } - // FIXME + // FIXME (obi) // If the message is big we will create many small chunks in a loop. // For the first tests we just send single Messages StringBuffer tmp(TRI_UNKNOWN_MEM_ZONE, message_length, false); @@ -177,7 +186,7 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() { cursor += sizeof(header._messageID); // extract total len of message - if (header._isFirst && header._chunk == 1) { + if (header._isFirst && header._chunk > 1) { std::memcpy(&header._messageLength, cursor, sizeof(header._messageLength)); cursor += sizeof(header._messageLength); } else { @@ -233,13 +242,15 @@ bool VppCommTask::processRead() { // CASE 1: message is in one chunk if (chunkHeader._isFirst && chunkHeader._chunk == 1) { std::size_t payloadOffset = findAndValidateVPacks(vpackBegin, chunkEnd); - VPackMessage message; message._id = chunkHeader._messageID; message._buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd)); message._header = VPackSlice(message._buffer.data()); if (payloadOffset) { message._payload = VPackSlice(message._buffer.data() + payloadOffset); } + VPackValidator val; + val.validate(message._header.begin(), message._header.byteSize()); + do_execute = true; } // CASE 2: message is in multiple chunks @@ -308,6 +319,7 @@ bool VppCommTask::processRead() { // for now we can handle only one request at a time // lock _request???? REVIEW (fc) + LOG(ERR) << message._header.toJson(); _request = new VppRequest(_connectionInfo, std::move(message)); GeneralServerFeature::HANDLER_FACTORY->setRequestContext(_request); _request->setClientTaskId(_taskId); diff --git a/arangod/GeneralServer/VppCommTask.h b/arangod/GeneralServer/VppCommTask.h index 2152a45fb6..dc71a075b7 100644 --- a/arangod/GeneralServer/VppCommTask.h +++ b/arangod/GeneralServer/VppCommTask.h @@ -56,6 +56,7 @@ class VppCommTask : public GeneralCommTask { protected: void completedWriteBuffer() override final; + virtual void handleChunk(char const*, size_t) {} private: // resets the internal state this method can be called to clean up when the diff --git a/lib/Endpoint/Endpoint.cpp b/lib/Endpoint/Endpoint.cpp index 6ffa06e1b0..c675ed4ade 100644 --- a/lib/Endpoint/Endpoint.cpp +++ b/lib/Endpoint/Endpoint.cpp @@ -51,7 +51,7 @@ Endpoint::Endpoint(DomainType domainType, EndpointType type, TRI_invalidatesocket(&_socket); } -std::string Endpoint::uriForm (std::string const& endpoint) { +std::string Endpoint::uriForm(std::string const& endpoint) { static std::string illegal; if (StringUtils::isPrefix(endpoint, "http+tcp://")) { @@ -86,7 +86,8 @@ std::string Endpoint::unifiedForm(std::string const& specification) { } // read protocol from string - if (StringUtils::isPrefix(copy, "http+") || StringUtils::isPrefix(copy, "http@")) { + if (StringUtils::isPrefix(copy, "http+") || + StringUtils::isPrefix(copy, "http@")) { protocol = TransportType::HTTP; prefix = "http+"; copy = copy.substr(5); @@ -94,7 +95,7 @@ std::string Endpoint::unifiedForm(std::string const& specification) { if (StringUtils::isPrefix(copy, "vpp+")) { protocol = TransportType::VPP; - prefix = "vsp+"; + prefix = "vpp+"; copy = copy.substr(4); } @@ -159,10 +160,10 @@ std::string Endpoint::unifiedForm(std::string const& specification) { // hostname only if (protocol == TransportType::HTTP) { - return prefix + copy + ":" + StringUtils::itoa(EndpointIp::_defaultPortHttp); - } else { return prefix + copy + ":" + - StringUtils::itoa(EndpointIp::_defaultPortVpp); + StringUtils::itoa(EndpointIp::_defaultPortHttp); + } else { + return prefix + copy + ":" + StringUtils::itoa(EndpointIp::_defaultPortVpp); } } @@ -323,11 +324,12 @@ std::string const Endpoint::defaultEndpoint(TransportType type) { StringUtils::itoa(EndpointIp::_defaultPortVpp); default: { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid transport type"); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "invalid transport type"); } } - return ""; // silence GCC + return ""; // silence GCC } //////////////////////////////////////////////////////////////////////////////// @@ -379,7 +381,8 @@ bool Endpoint::setSocketFlags(TRI_socket_t s) { return true; } -std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::TransportType type) { +std::ostream& operator<<(std::ostream& stream, + arangodb::Endpoint::TransportType type) { switch (type) { case arangodb::Endpoint::TransportType::HTTP: stream << "http"; @@ -391,7 +394,8 @@ std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::TransportType return stream; } -std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::EndpointType type) { +std::ostream& operator<<(std::ostream& stream, + arangodb::Endpoint::EndpointType type) { switch (type) { case arangodb::Endpoint::EndpointType::SERVER: stream << "server"; @@ -403,7 +407,8 @@ std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::EndpointType return stream; } -std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::EncryptionType type) { +std::ostream& operator<<(std::ostream& stream, + arangodb::Endpoint::EncryptionType type) { switch (type) { case arangodb::Endpoint::EncryptionType::NONE: stream << "none"; @@ -415,7 +420,8 @@ std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::EncryptionTyp return stream; } -std::ostream& operator<<(std::ostream& stream, arangodb::Endpoint::DomainType type) { +std::ostream& operator<<(std::ostream& stream, + arangodb::Endpoint::DomainType type) { switch (type) { case arangodb::Endpoint::DomainType::UNIX: stream << "unix"; diff --git a/lib/Rest/GeneralRequest.h b/lib/Rest/GeneralRequest.h index 97b9624275..46542d2aa6 100644 --- a/lib/Rest/GeneralRequest.h +++ b/lib/Rest/GeneralRequest.h @@ -63,11 +63,11 @@ class GeneralRequest { enum class RequestType { DELETE_REQ = 0, // windows redefines DELETE GET, - HEAD, - OPTIONS, POST, PUT, + HEAD, PATCH, + OPTIONS, VSTREAM_CRED, VSTREAM_REGISTER, VSTREAM_STATUS, diff --git a/lib/Rest/GeneralResponse.h b/lib/Rest/GeneralResponse.h index 56f1b0de45..470cbba647 100644 --- a/lib/Rest/GeneralResponse.h +++ b/lib/Rest/GeneralResponse.h @@ -182,6 +182,19 @@ class GeneralResponse { arangodb::velocypack::Options const& = arangodb:: velocypack::Options::Defaults) = 0; + virtual void addPayload(VPackSlice const& slice) { + _vpackPayloads.emplace_back(slice.byteSize()); + std::memcpy(&_vpackPayloads.back(), slice.start(), slice.byteSize()); + }; + + virtual void addPayload(VPackBuffer&& buffer) { + _vpackPayloads.push_back(std::move(buffer)); + }; + + virtual void addHeaderInformation(std::string s /* any or variant */){ + + }; + // virtual void setPayload(ContentType contentType, // VPackBuffer&& sliceBuffer, // bool generateBody = true, @@ -192,6 +205,8 @@ class GeneralResponse { ResponseCode _responseCode; // http response code std::unordered_map _headers; // headers/metadata map + + std::vector> _vpackPayloads; }; } diff --git a/lib/Rest/VppMessage.h b/lib/Rest/VppMessage.h index 630efaf02d..46e22f52f4 100644 --- a/lib/Rest/VppMessage.h +++ b/lib/Rest/VppMessage.h @@ -38,7 +38,7 @@ struct VPackMessage { VPackMessage(VPackBuffer&& buff, VPackSlice head, VPackSlice pay, uint64_t id) : _buffer(std::move(buff)), _header(head), _payload(pay), _id(id) {} - VPackMessage(VPackMessage&&) = default; // not necessary just to make sure! + VPackMessage(VPackMessage&& other) = default; VPackBuffer _buffer; VPackSlice _header; diff --git a/lib/Rest/VppRequest.cpp b/lib/Rest/VppRequest.cpp index 96af2ef558..25a60e12a6 100644 --- a/lib/Rest/VppRequest.cpp +++ b/lib/Rest/VppRequest.cpp @@ -31,12 +31,17 @@ #include #include -#include "Basics/conversions.h" #include "Basics/StaticStrings.h" +#include "Basics/StringRef.h" #include "Basics/StringUtils.h" +#include "Basics/conversions.h" #include "Basics/tri-strings.h" +#include "Meta/conversion.h" #include "Logger/Logger.h" +// TODO (obi) +// - REMOVE TRI_ASSERT + using namespace arangodb; using namespace arangodb::basics; @@ -60,16 +65,15 @@ VppRequest::VppRequest(ConnectionInfo const& connectionInfo, : GeneralRequest(connectionInfo), _message(std::move(message)), _headers(nullptr) { - if (message._payload != VPackSlice::noneSlice()) { - _contentType = ContentType::VPACK; - _contentTypeResponse = ContentType::VPACK; - _protocol = "vpp"; - parseHeaderInformation(); - } + _protocol = "vpp"; + _contentType = ContentType::VPACK; + _contentTypeResponse = ContentType::VPACK; + parseHeaderInformation(); + _user = "root"; } VPackSlice VppRequest::payload(VPackOptions const* options) { - // TODO - handle options?? + // TODO (obi)- handle options?? return _message._payload; } @@ -78,8 +82,12 @@ std::unordered_map const& VppRequest::headers() if (!_headers) { using namespace std; _headers = make_unique>(); + LOG(ERR) << _message._header.toJson(); TRI_ASSERT(_message._header.isObject()); - for (auto const& it : VPackObjectIterator(_message._header)) { + VPackSlice meta = _message._header.get("meta"); + // TRI_ASSERT(meta.isObject()); + for (auto const& it : VPackObjectIterator(meta)) { + TRI_ASSERT(it.key.isString()); _headers->emplace(it.key.copyString(), it.value.copyString()); } } @@ -99,7 +107,13 @@ std::string const& VppRequest::header(std::string const& key) const { void VppRequest::parseHeaderInformation() { using namespace std; - TRI_ASSERT(_message._header.isObject()); + auto& vHeader = _message._header; + + TRI_ASSERT(vHeader.isObject()); + _databaseName = vHeader.get("database").copyString(); + _requestPath = vHeader.get("request").copyString(); + _type = meta::toEnum(vHeader.get("requestType").getInt()); + VPackSlice params = _message._header.get("parameter"); TRI_ASSERT(params.isObject()); for (auto const& it : VPackObjectIterator(params)) { diff --git a/lib/Rest/VppResponse.cpp b/lib/Rest/VppResponse.cpp index 72bdf24526..ed9ff80c1a 100644 --- a/lib/Rest/VppResponse.cpp +++ b/lib/Rest/VppResponse.cpp @@ -1,4 +1,4 @@ -//////////////////////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany @@ -61,6 +61,12 @@ void VppResponse::setPayload(ContentType contentType, arangodb::velocypack::Slice const& slice, bool generateBody, VPackOptions const& options) { if (generateBody) { + // addPayload(slice); + if (_payload.empty()) { + throw std::logic_error("payload should be empty!!"); + } + _payload.append(slice.startAs(), + std::distance(slice.begin(), slice.end())); } };