1
0
Fork 0

silence compiler warning

This commit is contained in:
Jan Christoph Uhde 2016-07-28 16:16:56 +02:00
parent fa0015e658
commit 4f00b0a48e
2 changed files with 87 additions and 62 deletions

View File

@ -42,21 +42,38 @@ using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::rest;
VppCommTask::VppCommTask(GeneralServer* server, TRI_socket_t sock,
ConnectionInfo&& info, double timeout)
: Task("VppCommTask"),
GeneralCommTask(server, sock, std::move(info), timeout),
_requestType(GeneralRequest::RequestType::ILLEGAL),
_fullUrl() {
_protocol = "vpp";
// connectionStatisticsAgentSetVpp();
}
namespace {
constexpr size_t getChunkHeaderLength(bool oneChunk) {
return (oneChunk ? 2 * sizeof(uint32_t) + 1 * sizeof(uint64_t)
: 2 * sizeof(uint32_t) + 2 * sizeof(uint64_t));
}
VPackOffsets findAndValidateVPacks(char const* chunkBegin, char const* chunkEnd,
bool messageInOneChunk) {
auto chunkHeaderLength = getChunkHeaderLength(messageInOneChunk);
VPackValidator validator;
// validate Header VelocyPack
auto vpHeaderStart = chunkBegin + chunkHeaderLength;
// check for slice start to the end of Chunk
// isSubPart allows the slice to be shorter than the checked buffer.
validator.validate(vpHeaderStart, std::distance(vpHeaderStart, chunkEnd),
/*isSubPart =*/true);
// check if there is payload and locate the start
VPackSlice vpHeader(vpHeaderStart);
auto vpHeaderLen = vpHeader.byteSize();
auto vpPayloadStart = vpHeaderStart + vpHeaderLen;
if (vpPayloadStart == chunkEnd) {
// possible compare with request type in header if payload is required
return VPackOffsets(vpHeaderStart, chunkEnd); // no payload
}
// validate Payplad VelocyPack
validator.validate(vpPayloadStart, std::distance(vpPayloadStart, chunkEnd),
/*isSubPart =*/false);
return VPackOffsets(vpHeaderStart, vpPayloadStart, chunkEnd);
}
}
void VppCommTask::addResponse(VppResponse* response, bool isError) {
@ -159,7 +176,6 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
auto cursor = _readBuffer->begin();
uint32_t vpChunkLength;
std::memcpy(&header._length, cursor, sizeof(header._length));
cursor += sizeof(header._length);
@ -175,7 +191,7 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
return header;
}
bool VppCommTask::chunkComplete() {
bool VppCommTask::isChunkComplete() {
auto start = _readBuffer->begin();
std::size_t length = std::distance(start, _readBuffer->end());
auto& prv = _processReadVariables;
@ -195,48 +211,38 @@ bool VppCommTask::chunkComplete() {
return true;
}
std::pair<std::size_t, std::size_t> VppCommTask::validateChunkOnBuffer(
std::size_t chunkLength) {
auto readBufferStart = _readBuffer->begin();
auto chunkHeaderLength = getChunkHeaderLength(true);
auto sliceStart = readBufferStart + chunkHeaderLength;
auto sliceLength = chunkLength - chunkHeaderLength;
VPackValidator validator;
// check for slice start to the end of Chunk
// isSubPart allows the slice to be shorter than the checked buffer.
validator.validate(sliceStart, sliceLength, /*isSubPart =*/true);
VPackSlice vppHeader(sliceStart);
auto vppHeaderLen = vppHeader.byteSize();
sliceStart += vppHeaderLen;
sliceLength += vppHeaderLen;
validator.validate(sliceStart, sliceLength, /*isSubPart =*/true);
VPackSlice vppPayload(sliceStart);
auto vppPayloadLen = vppPayload.byteSize();
return std::pair<std::size_t, std::size_t>(vppHeaderLen,
vppHeaderLen + vppPayloadLen);
}
// reads data from the socket
bool VppCommTask::processRead() {
auto readBufferStart = _readBuffer->begin();
if (readBufferStart == nullptr || !chunkComplete()) {
auto chunkBegin = _readBuffer->begin();
if (chunkBegin == nullptr || !isChunkComplete()) {
return true; // no data or incomplete
}
auto header = readChunkHeader();
auto chunkHeader = readChunkHeader();
auto chunkEnd = chunkBegin + chunkHeader._length;
if (header._isFirst && header._chunk == 1) { // one chunk one message
validateChunkOnBuffer(header._length);
// CASE 1: message is in one chunk
if (chunkHeader._isFirst && chunkHeader._chunk == 1) {
VPackOffsets offsets = findAndValidateVPacks(chunkBegin, chunkEnd, true);
VPackMessage message;
message._buffer.append(offsets._begin,
std::distance(offsets._begin, offsets._end));
message._header = VPackSlice(message._buffer.data());
if (offsets._payloadBegin) {
message._payload =
VPackSlice(message._buffer.data() +
std::distance(offsets._begin, offsets._payloadBegin));
}
// free buffer form chunkBegin to chunkBegin + chunkHeader._lenght
// execute
}
auto incompleteMessageItr = _incompleteMessages.find(header._messageId);
if (header._isFirst) { // first chunk of multi chunk message
// CASE 2: message is in multiple chunks
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageId);
// CASE 2a: chunk starts new message
if (chunkHeader._isFirst) { // first chunk of multi chunk message
if (incompleteMessageItr != _incompleteMessages.end()) {
throw std::logic_error(
"Message should be first but is already in the Map of incomplete "
@ -245,20 +251,24 @@ bool VppCommTask::processRead() {
// append to VPackBuffer in incomplete Message
auto numberOfChunks = header._chunk;
auto numberOfChunks = chunkHeader._chunk;
auto chunkNumber = 1;
// set message length
// CASE 2b: chunk continues a message
} else { // followup chunk of some mesage
if (incompleteMessageItr == _incompleteMessages.end()) {
throw std::logic_error("found message without previous part");
}
auto& im = incompleteMessageItr->second; // incomplete Message
auto chunkNumber = header._chunk;
auto chunkNumber = chunkHeader._chunk;
// append to VPackBuffer in incomplete Message
// MESSAGE COMPLETE
if (chunkNumber == im._numberOfChunks) {
// VPackMessage message;
// free buffer form chunkBegin to chunkBegin + chunkHeader._lenght
// execute
}
}

View File

@ -4,16 +4,36 @@
#include "GeneralServer/GeneralCommTask.h"
#include "lib/Rest/VppResponse.h"
#include "lib/Rest/VppRequest.h"
#include <stdexcept>
namespace arangodb {
namespace rest {
struct VPackMessage {
uint32_t _length; // lenght of total message in bytes
VPackMessage() : _buffer(), _header(), _payload() {}
VPackBuffer<uint8_t> _buffer;
VPackSlice _header;
VPackSlice _payload;
};
struct VPackOffsets {
VPackOffsets(char const* begin, char const* end,
char const* payloadBegin = nullptr)
: _begin(begin),
_end(end),
_headBegin(begin),
_headEnd(payloadBegin ? payloadBegin : end),
_payloadBegin(payloadBegin),
_payloadEnd(payloadBegin ? end : nullptr) {}
char const* _begin;
char const* _end;
char const* _headBegin;
char const* _headEnd;
char const* _payloadBegin;
char const* _payloadEnd;
};
class VppCommTask : public GeneralCommTask {
public:
VppCommTask(GeneralServer*, TRI_socket_t, ConnectionInfo&&, double timeout);
@ -26,10 +46,10 @@ class VppCommTask : public GeneralCommTask {
// internal addResponse
void addResponse(GeneralResponse* response, bool isError) override {
VppResponse* vppResponse = dynamic_cast<VppResponse*>(response);
if (vppResponse != nullptr) {
addResponse(vppResponse, isError);
if (vppResponse == nullptr) {
throw std::logic_error("invalid response or response Type");
}
// else throw? do nothing?!!??!!
addResponse(vppResponse, isError);
};
protected:
@ -46,20 +66,22 @@ class VppCommTask : public GeneralCommTask {
private:
using MessageID = uint64_t;
struct IncompleteVPackMessage {
uint32_t _length; // lenght of total message in bytes
std::size_t _numberOfChunks;
VPackBuffer<uint8_t> _chunks;
std::vector<std::pair<std::size_t, std::size_t>> _chunkOffesesAndLengths;
std::vector<std::size_t> _vpackOffsets; // offset to slice in buffer
// std::vector<std::pair<std::size_t, std::size_t>> _chunkOffesesAndLengths;
// std::vector<std::size_t> _vpackOffsets; // offset to slice in buffer
};
std::unordered_map<MessageID, IncompleteVPackMessage> _incompleteMessages;
struct ProcessReadVariables {
bool _currentChunkLength; // size of chunk processed or 0 when expectiong
// new chunk
uint32_t
_currentChunkLength; // size of chunk processed or 0 when expectiong
// new chunk
};
ProcessReadVariables _processReadVariables;
struct ChunkHeader {
uint32_t _length;
@ -68,15 +90,8 @@ class VppCommTask : public GeneralCommTask {
bool _isFirst;
};
bool chunkComplete(); // subfunction of processRead
bool isChunkComplete(); // subfunction of processRead
ChunkHeader readChunkHeader(); // subfuncaion of processRead
// validates chunk on read _readBuffer and returns
// offsets to Payload VPack and The End of Message.
std::pair<std::size_t, std::size_t> validateChunkOnBuffer(std::size_t);
ProcessReadVariables _processReadVariables;
GeneralRequest::RequestType _requestType; // type of request (GET, POST, ...)
std::string _fullUrl; // value of requested URL
// user
// authenticated or not