mirror of https://gitee.com/bigwinds/arangodb
Squashed commit of the following:
commit 19a7210119cc284af64251b202f690ab62bf615c Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 23:20:10 2017 +0200 Allow access to /_api/user/ without database check in VST as in HTTP. This is a try to fix #2465 commit f973c1335652540174bf6b78df42290bded357da Merge: 30bdc4c365de45
Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 21:32:15 2017 +0200 Merge remote-tracking branch 'origin/devel' into vst-cleanup commit 30bdc4ca38e028d58cebea8268683d9f94e87ad7 Merge: 14acaea086f6a2
Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 14:52:23 2017 +0200 Merge remote-tracking branch 'origin/devel' into vst-cleanup commit 14acaea8cb5fff2c1c8c27eee32294bfd612f629 Merge: 5bd0e79fb088d3
Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 12:40:29 2017 +0200 Merge remote-tracking branch 'origin/devel' into vst-cleanup commit 5bd0e79ab053447f94f2fd959a33b1e15302b6a8 Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 12:36:23 2017 +0200 Fix bug in authentication in VST commit 9a999ce5e629aee5816a4f61b194b086d11fd169 Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 12:24:35 2017 +0200 Open up endpoints /_open/* in VST without being authenticated commit 32f62db42af6d5bfc24214ebb4ca7c1fb473b55e Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 12:11:31 2017 +0200 Simplify logic by not using boost::optional. commit 925ce2f7b890c6f14205fd3e98553939d89bec61 Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 12:11:20 2017 +0200 Add JWT case for authentication for VST commit f1d7f67a9fc20c91ca000e9a4dad91e0f50f1652 Merge: 040ace37a9ccc9
Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 08:58:51 2017 +0200 Merge remote-tracking branch 'origin/devel' into vst-cleanup commit 040ace3e9dce47ddea5f51d29f0153a70d257c9e Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 08:23:16 2017 +0200 Write out chunk buffer using little endian uints commit 66ad4c0e8d3bc94091664505986b00e7cff39f2f Author: Max Neunhoeffer <max@arangodb.com> Date: Fri May 12 08:12:38 2017 +0200 Move maxChunkSize query to constructor, remove dead code commit f7b4c26cc09c6d512362340c6bf6061bbaae61a9 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 17:57:14 2017 +0200 Do not take protocolVersion from request for VST. commit c76d6685f9507701715abf882f0ac0e192ead59f Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:20:58 2017 +0200 Remove dead code for compression which did never work well. commit b823765b7c041dc5d143f7ab2e13d92f5f42d3eb Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:09:53 2017 +0200 Fix renaming to make compiler happy. commit b327830012d146f4f4e2d0f0467dab351fbd3bd0 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:07:36 2017 +0200 Rename _defaultPortVpp to _defaultPortVst. commit 5eb98a9e64d702476e8ec098b7506b1a63145d64 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:06:28 2017 +0200 Rename header guards VPP -> VST. commit a3e96be26d49d6151898d8d757ede6225ac6abd2 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:05:55 2017 +0200 Rename VppRequest and VppResponse files to Vst. commit 1d5f6f196490c3cf9312be5927b967b4abc03b67 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:04:19 2017 +0200 Change names of header guards VPP -> VST. commit f004b25fbef75050b34bcd4eae4eb17b8b467230 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 13:03:19 2017 +0200 Rename files VppNetwork.h and VstMessage.h commit c04d3aec19e8f0a8a3b90cd48e7736bff037359d Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 12:59:15 2017 +0200 Implement correct chunk header for sending out VelocyStream 1.1 messages. commit 73aeedfbbef24d5929c44bd26c16343480514270 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 01:11:23 2017 +0200 A lot more renaming VPP -> VST. commit 2acedc77f16a82251bc4f9e39526184e2a6f0dec Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:49:20 2017 +0200 Rename header guard. commit 2ade43946aa5ea0048dab81eed5ffd931ce35ea8 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:48:01 2017 +0200 Rename files VppCommTask.* to VstCommTask.* commit 0ffcda02a51cbad35e5ec06117d397aac37323cd Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:46:31 2017 +0200 Rename class VppCommTask to VstCommTask. commit 9478c6c56c995b014b7a14408544b76fa676a317 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:30:47 2017 +0200 Rename protocol versions to VST in ProtocolVersion enum. commit 0b69b7add6d7b5155ac71b8257def282ab4a1637 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:29:29 2017 +0200 Recognize incoming chunk headers in VST/1.1 format. commit f40173c4fc6dc48613ae67ac3fd810d002c9b6cd Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:11:54 2017 +0200 Recognise and remember VST/1.0 and VST/1.1 protocol versions. commit 29c798dc1af2d650c41eb9ed06f635786faf10f6 Author: Max Neunhoeffer <max@arangodb.com> Date: Thu May 11 00:11:23 2017 +0200 Create protocol version VST/1.1 and disable vst+ prefix for endpoints.
This commit is contained in:
parent
235a82a7dc
commit
f614439416
|
@ -217,7 +217,7 @@ SET(ARANGOD_SOURCES
|
|||
GeneralServer/RestHandler.cpp
|
||||
GeneralServer/RestHandlerFactory.cpp
|
||||
GeneralServer/RestStatus.cpp
|
||||
GeneralServer/VppCommTask.cpp
|
||||
GeneralServer/VstCommTask.cpp
|
||||
Graph/AttributeWeightShortestPathFinder.cpp
|
||||
Graph/BaseOptions.cpp
|
||||
Graph/BreadthFirstEnumerator.cpp
|
||||
|
|
|
@ -1623,7 +1623,7 @@ static void Return_PrepareClusterCommResultForJS(
|
|||
r->Set(ErrorMessageKey,
|
||||
TRI_V8_ASCII_STRING("required backend was not available"));
|
||||
} else if (res.status == CL_COMM_RECEIVED) { // Everything is OK
|
||||
// FIXME HANDLE VPP
|
||||
// FIXME HANDLE VST
|
||||
auto httpRequest = std::dynamic_pointer_cast<HttpRequest>(res.answer);
|
||||
if (httpRequest == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid request type");
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
#include "GeneralServer/RestHandlerFactory.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "Meta/conversion.h"
|
||||
#include "Rest/VppResponse.h"
|
||||
#include "Rest/VstResponse.h"
|
||||
#include "Scheduler/Job.h"
|
||||
#include "Scheduler/JobQueue.h"
|
||||
#include "Scheduler/Scheduler.h"
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
namespace arangodb {
|
||||
namespace rest {
|
||||
enum class ProtocolType { HTTP, HTTPS, VPP, VPPS };
|
||||
enum class ProtocolType { HTTP, HTTPS, VST, VSTS };
|
||||
} // rest
|
||||
} // arangodb
|
||||
#endif
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include "GeneralServer/GeneralServer.h"
|
||||
#include "GeneralServer/GeneralServerFeature.h"
|
||||
#include "GeneralServer/HttpCommTask.h"
|
||||
#include "GeneralServer/VppCommTask.h"
|
||||
#include "GeneralServer/VstCommTask.h"
|
||||
#include "Scheduler/Scheduler.h"
|
||||
#include "Scheduler/SchedulerFeature.h"
|
||||
|
||||
|
@ -53,13 +53,6 @@ void GeneralListenTask::handleConnected(std::unique_ptr<Socket> socket,
|
|||
std::shared_ptr<GeneralCommTask> commTask;
|
||||
|
||||
switch (_connectionType) {
|
||||
case ProtocolType::VPPS:
|
||||
case ProtocolType::VPP:
|
||||
commTask =
|
||||
std::make_shared<VppCommTask>(_loop, _server, std::move(socket),
|
||||
std::move(info), _keepAliveTimeout);
|
||||
break;
|
||||
|
||||
case ProtocolType::HTTPS:
|
||||
case ProtocolType::HTTP:
|
||||
commTask =
|
||||
|
|
|
@ -86,18 +86,10 @@ void GeneralServer::stopListening() {
|
|||
bool GeneralServer::openEndpoint(Endpoint* endpoint) {
|
||||
ProtocolType protocolType;
|
||||
|
||||
if (endpoint->transport() == Endpoint::TransportType::HTTP) {
|
||||
if (endpoint->encryption() == Endpoint::EncryptionType::SSL) {
|
||||
protocolType = ProtocolType::HTTPS;
|
||||
} else {
|
||||
protocolType = ProtocolType::HTTP;
|
||||
}
|
||||
if (endpoint->encryption() == Endpoint::EncryptionType::SSL) {
|
||||
protocolType = ProtocolType::HTTPS;
|
||||
} else {
|
||||
if (endpoint->encryption() == Endpoint::EncryptionType::SSL) {
|
||||
protocolType = ProtocolType::VPPS;
|
||||
} else {
|
||||
protocolType = ProtocolType::VPP;
|
||||
}
|
||||
protocolType = ProtocolType::HTTP;
|
||||
}
|
||||
|
||||
std::unique_ptr<ListenTask> task(new GeneralListenTask(
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "GeneralServer/GeneralServerFeature.h"
|
||||
#include "GeneralServer/RestHandler.h"
|
||||
#include "GeneralServer/RestHandlerFactory.h"
|
||||
#include "GeneralServer/VppCommTask.h"
|
||||
#include "GeneralServer/VstCommTask.h"
|
||||
#include "Meta/conversion.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
#include "Statistics/ConnectionStatistics.h"
|
||||
|
@ -270,14 +270,18 @@ bool HttpCommTask::processRead(double startTime) {
|
|||
}
|
||||
|
||||
if (_readBuffer.length() >= 11 &&
|
||||
std::memcmp(_readBuffer.c_str(), "VST/1.0\r\n\r\n", 11) == 0) {
|
||||
(std::memcmp(_readBuffer.c_str(), "VST/1.0\r\n\r\n", 11) == 0 ||
|
||||
std::memcmp(_readBuffer.c_str(), "VST/1.1\r\n\r\n", 11) == 0)) {
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "switching from HTTP to VST";
|
||||
ProtocolVersion protocolVersion = _readBuffer.c_str()[6] == '0'
|
||||
? ProtocolVersion::VST_1_0 : ProtocolVersion::VST_1_1;
|
||||
_abandoned = true;
|
||||
cancelKeepAlive();
|
||||
std::shared_ptr<GeneralCommTask> commTask;
|
||||
commTask = std::make_shared<VppCommTask>(
|
||||
commTask = std::make_shared<VstCommTask>(
|
||||
_loop, _server, std::move(_peer), std::move(_connectionInfo),
|
||||
GeneralServerFeature::keepAliveTimeout(), /*skipSocketInit*/ true);
|
||||
GeneralServerFeature::keepAliveTimeout(),
|
||||
protocolVersion, /*skipSocketInit*/ true);
|
||||
commTask->addToReadBuffer(_readBuffer.c_str() + 11,
|
||||
_readBuffer.length() - 11);
|
||||
commTask->processRead(startTime);
|
||||
|
|
|
@ -1,324 +0,0 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_VPP_NETWORK_H
|
||||
#define ARANGOD_VPP_NETWORK_H 1
|
||||
|
||||
#include "Basics/StringBuffer.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Logger/LoggerFeature.h"
|
||||
|
||||
#include <velocypack/Options.h>
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/Validator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
template <typename T>
|
||||
std::size_t appendToBuffer(basics::StringBuffer* buffer, T& value) {
|
||||
constexpr std::size_t len = sizeof(T);
|
||||
char charArray[len];
|
||||
char const* charPtr = charArray;
|
||||
std::memcpy(&charArray, &value, len);
|
||||
buffer->appendText(charPtr, len);
|
||||
return len;
|
||||
}
|
||||
|
||||
inline constexpr std::size_t chunkHeaderLength(bool firstOfMany) {
|
||||
// chunkLength uint32 , chunkX uint32 , id uint64 , messageLength unit64
|
||||
return sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) +
|
||||
(firstOfMany ? sizeof(uint64_t) : 0);
|
||||
}
|
||||
|
||||
// Send Message Created from Slices
|
||||
|
||||
// working version of single chunk message creation
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
|
||||
std::vector<VPackSlice> const& slices, bool isFirstChunk, uint32_t chunk,
|
||||
uint64_t id, uint64_t totalMessageLength = 0) {
|
||||
using basics::StringBuffer;
|
||||
bool firstOfMany = false;
|
||||
|
||||
// if we have more than one chunk and the chunk is the first
|
||||
// then we are sending the first in a series. if this condition
|
||||
// is true we also send extra 8 bytes for the messageLength
|
||||
// (length of all VPackData)
|
||||
if (isFirstChunk && chunk > 1) {
|
||||
firstOfMany = true;
|
||||
}
|
||||
|
||||
// build chunkX -- see VelocyStream Documentaion
|
||||
chunk <<= 1;
|
||||
chunk |= isFirstChunk ? 0x1 : 0x0;
|
||||
|
||||
// get the lenght of VPack data
|
||||
uint32_t dataLength = 0;
|
||||
for (auto& slice : slices) {
|
||||
// TODO: is a 32bit value sufficient for all Slices here?
|
||||
dataLength += static_cast<uint32_t>(slice.byteSize());
|
||||
}
|
||||
|
||||
// calculate length of current chunk
|
||||
uint32_t chunkLength =
|
||||
dataLength + static_cast<uint32_t>(chunkHeaderLength(firstOfMany));
|
||||
|
||||
auto buffer =
|
||||
std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, chunkLength, false);
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "chunkLength: " << chunkLength;
|
||||
appendToBuffer(buffer.get(), chunkLength);
|
||||
appendToBuffer(buffer.get(), chunk);
|
||||
appendToBuffer(buffer.get(), id);
|
||||
|
||||
if (firstOfMany) {
|
||||
appendToBuffer(buffer.get(), totalMessageLength);
|
||||
}
|
||||
|
||||
// append data in slices
|
||||
for (auto const& slice : slices) {
|
||||
try{
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << slice.toJson() << " , " << slice.byteSize();
|
||||
} catch(...){}
|
||||
buffer->appendText(slice.startAs<char>(), slice.byteSize());
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
// slices, isFirstChunk, chunk, id, totalMessageLength
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkSingle(
|
||||
std::vector<VPackSlice> const& slices, uint64_t id) {
|
||||
return createChunkForNetworkDetail(slices, true, 1, id, 0 /*unused*/);
|
||||
}
|
||||
|
||||
// This method does not respect the max chunksize instead it avoids copying
|
||||
// by moving slices into the createion functions - This is not acceptable for
|
||||
// big slices
|
||||
//
|
||||
// inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFirst(
|
||||
// std::vector<VPackSlice> const& slices, uint64_t id, uint32_t
|
||||
// numberOfChunks,
|
||||
// uint32_t totalMessageLength) {
|
||||
// return createChunkForNetworkDetail(slices, true, numberOfChunks, id,
|
||||
// totalMessageLength);
|
||||
//}
|
||||
//
|
||||
// inline std::unique_ptr<basics::StringBuffer>
|
||||
// createChunkForNetworkMultiFollow(
|
||||
// std::vector<VPackSlice> const& slices, uint64_t id, uint32_t chunkNumber,
|
||||
// uint32_t totalMessageLength) {
|
||||
// return createChunkForNetworkDetail(slices, false, chunkNumber, id, 0);
|
||||
//}
|
||||
|
||||
// helper functions for sending chunks when given a string buffer as input
|
||||
// working version of single chunk message creation
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
|
||||
char const* data, std::size_t begin, std::size_t end, bool isFirstChunk,
|
||||
uint32_t chunk, uint64_t id, uint64_t totalMessageLength = 0) {
|
||||
using basics::StringBuffer;
|
||||
bool firstOfMany = false;
|
||||
|
||||
// if we have more than one chunk and the chunk is the first
|
||||
// then we are sending the first in a series. if this condition
|
||||
// is true we also send extra 8 bytes for the messageLength
|
||||
// (length of all VPackData)
|
||||
if (isFirstChunk && chunk > 1) {
|
||||
firstOfMany = true;
|
||||
}
|
||||
|
||||
// build chunkX -- see VelocyStream Documentaion
|
||||
chunk <<= 1;
|
||||
chunk |= isFirstChunk ? 0x1 : 0x0;
|
||||
|
||||
// get the lenght of VPack data
|
||||
uint32_t dataLength = static_cast<uint32_t>(end - begin);
|
||||
|
||||
// calculate length of current chunk
|
||||
uint32_t chunkLength =
|
||||
dataLength + static_cast<uint32_t>(chunkHeaderLength(firstOfMany));
|
||||
|
||||
auto buffer =
|
||||
std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, chunkLength, false);
|
||||
|
||||
appendToBuffer(buffer.get(), chunkLength);
|
||||
appendToBuffer(buffer.get(), chunk);
|
||||
appendToBuffer(buffer.get(), id);
|
||||
|
||||
if (firstOfMany) {
|
||||
appendToBuffer(buffer.get(), totalMessageLength);
|
||||
}
|
||||
|
||||
buffer->appendText(data + begin, dataLength);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFirst(
|
||||
char const* data, std::size_t begin, std::size_t end, uint64_t id,
|
||||
uint32_t numberOfChunks, uint64_t totalMessageLength) {
|
||||
return createChunkForNetworkDetail(data, begin, end, true, numberOfChunks, id,
|
||||
totalMessageLength);
|
||||
}
|
||||
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFollow(
|
||||
char const* data, std::size_t begin, std::size_t end, uint64_t id,
|
||||
uint32_t chunkNumber) {
|
||||
return createChunkForNetworkDetail(data, begin, end, false, chunkNumber, id,
|
||||
0);
|
||||
}
|
||||
|
||||
// this function will be called when we send multiple compressed
|
||||
// or uncompressed chunks
|
||||
inline void send_many(
|
||||
std::vector<std::unique_ptr<basics::StringBuffer>>& resultVecRef,
|
||||
uint64_t id, std::size_t maxChunkBytes,
|
||||
std::unique_ptr<basics::StringBuffer> completeMessage,
|
||||
std::size_t uncompressedCompleteMessageLength) {
|
||||
uint64_t totalLen = completeMessage->length();
|
||||
std::size_t offsetBegin = 0;
|
||||
std::size_t offsetEnd = maxChunkBytes - chunkHeaderLength(true);
|
||||
// maximum number of bytes for follow up chunks
|
||||
std::size_t maxBytes = maxChunkBytes - chunkHeaderLength(false);
|
||||
|
||||
uint32_t numberOfChunks = 1;
|
||||
{ // calcuate the number of chunks taht will be send
|
||||
std::size_t bytesToSend = totalLen - maxChunkBytes +
|
||||
chunkHeaderLength(true); // data for first chunk
|
||||
while (bytesToSend >= maxBytes) {
|
||||
bytesToSend -= maxBytes;
|
||||
++numberOfChunks;
|
||||
}
|
||||
if (bytesToSend) {
|
||||
++numberOfChunks;
|
||||
}
|
||||
}
|
||||
|
||||
// send first
|
||||
resultVecRef.push_back(
|
||||
createChunkForNetworkMultiFirst(completeMessage->c_str(), offsetBegin,
|
||||
offsetEnd, id, numberOfChunks, totalLen));
|
||||
|
||||
std::uint32_t chunkNumber = 0;
|
||||
while (offsetEnd + maxBytes <= totalLen) {
|
||||
// send middle
|
||||
offsetBegin = offsetEnd;
|
||||
offsetEnd += maxBytes;
|
||||
chunkNumber++;
|
||||
resultVecRef.push_back(createChunkForNetworkMultiFollow(
|
||||
completeMessage->c_str(), offsetBegin, offsetEnd, id, chunkNumber));
|
||||
}
|
||||
|
||||
if (offsetEnd < totalLen) {
|
||||
resultVecRef.push_back(createChunkForNetworkMultiFollow(
|
||||
completeMessage->c_str(), offsetEnd, totalLen, id, ++chunkNumber));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// this function will be called by client code
|
||||
inline std::vector<std::unique_ptr<basics::StringBuffer>> createChunkForNetwork(
|
||||
std::vector<VPackSlice> const& slices, uint64_t id,
|
||||
std::size_t maxChunkBytes, bool compress = false) {
|
||||
/// variables used in this function
|
||||
std::size_t uncompressedPayloadLength = 0;
|
||||
// worst case len in case of compression
|
||||
std::size_t preliminaryPayloadLength = 0;
|
||||
// std::size_t compressedPayloadLength = 0;
|
||||
std::size_t payloadLength = 0; // compressed or uncompressed
|
||||
|
||||
std::vector<std::unique_ptr<basics::StringBuffer>> rv;
|
||||
|
||||
// find out the uncompressed payload length
|
||||
for (auto const& slice : slices) {
|
||||
uncompressedPayloadLength += slice.byteSize();
|
||||
}
|
||||
|
||||
if (compress) {
|
||||
// use some function to calculate the worst case lenght
|
||||
preliminaryPayloadLength = uncompressedPayloadLength;
|
||||
} else {
|
||||
payloadLength = uncompressedPayloadLength;
|
||||
}
|
||||
|
||||
if (!compress &&
|
||||
uncompressedPayloadLength < maxChunkBytes - chunkHeaderLength(false)) {
|
||||
// one chunk uncompressed
|
||||
rv.push_back(createChunkForNetworkSingle(slices, id));
|
||||
return rv;
|
||||
} else if (compress &&
|
||||
preliminaryPayloadLength <
|
||||
maxChunkBytes - chunkHeaderLength(false)) {
|
||||
throw std::logic_error("no implemented");
|
||||
// one chunk compressed
|
||||
} else {
|
||||
//// here we enter the domain of multichunck
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: sending multichunk message";
|
||||
|
||||
// test if we have smaller slices that fit into chunks when there is
|
||||
// no compression - optimization
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: there are slices that do not fit into a single "
|
||||
"totalMessageLength or compression is enabled";
|
||||
// we have big slices that do not fit into single chunks
|
||||
// now we will build one big buffer ans split it into pieces
|
||||
|
||||
// reseve buffer
|
||||
auto vppPayload = std::make_unique<basics::StringBuffer>(
|
||||
TRI_UNKNOWN_MEM_ZONE, uncompressedPayloadLength, false);
|
||||
|
||||
// fill buffer
|
||||
for (auto const& slice : slices) {
|
||||
try{
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << slice.toJson() << " , " << slice.byteSize();
|
||||
} catch(...){}
|
||||
vppPayload->appendText(slice.startAs<char>(), slice.byteSize());
|
||||
}
|
||||
|
||||
if (compress) {
|
||||
// compress uncompressedVppPayload -> vppPayload
|
||||
auto uncommpressedVppPayload = std::move(vppPayload);
|
||||
vppPayload = std::make_unique<basics::StringBuffer>(
|
||||
TRI_UNKNOWN_MEM_ZONE, preliminaryPayloadLength, false);
|
||||
// do compression
|
||||
throw std::logic_error("no implemented");
|
||||
// payloadLength = compressedPayloadLength;
|
||||
}
|
||||
|
||||
// create chunks
|
||||
(void)payloadLength;
|
||||
send_many(rv, id, maxChunkBytes, std::move(vppPayload),
|
||||
uncompressedPayloadLength);
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
|
@ -21,7 +21,7 @@
|
|||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "VppCommTask.h"
|
||||
#include "VstCommTask.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
|
@ -37,7 +37,7 @@
|
|||
#include "GeneralServer/GeneralServerFeature.h"
|
||||
#include "GeneralServer/RestHandler.h"
|
||||
#include "GeneralServer/RestHandlerFactory.h"
|
||||
#include "GeneralServer/VppNetwork.h"
|
||||
#include "GeneralServer/VstNetwork.h"
|
||||
#include "Logger/LoggerFeature.h"
|
||||
#include "Meta/conversion.h"
|
||||
#include "RestServer/ServerFeature.h"
|
||||
|
@ -79,14 +79,16 @@ inline std::size_t validateAndCount(char const* vpStart,
|
|||
}
|
||||
|
||||
|
||||
VppCommTask::VppCommTask(EventLoop loop, GeneralServer* server,
|
||||
VstCommTask::VstCommTask(EventLoop loop, GeneralServer* server,
|
||||
std::unique_ptr<Socket> socket, ConnectionInfo&& info,
|
||||
double timeout, bool skipInit)
|
||||
: Task(loop, "VppCommTask"),
|
||||
double timeout, ProtocolVersion protocolVersion,
|
||||
bool skipInit)
|
||||
: Task(loop, "VstCommTask"),
|
||||
GeneralCommTask(loop, server, std::move(socket), std::move(info), timeout,
|
||||
skipInit),
|
||||
_authenticatedUser(),
|
||||
_authentication(nullptr) {
|
||||
_authentication(nullptr),
|
||||
_protocolVersion(protocolVersion) {
|
||||
_authentication = application_features::ApplicationServer::getFeature<
|
||||
AuthenticationFeature>("Authentication");
|
||||
TRI_ASSERT(_authentication != nullptr);
|
||||
|
@ -95,9 +97,13 @@ VppCommTask::VppCommTask(EventLoop loop, GeneralServer* server,
|
|||
|
||||
// ATTENTION <- this is required so we do not lose information during a resize
|
||||
_readBuffer.reserve(_bufferLength);
|
||||
|
||||
_maxChunkSize = arangodb::application_features::ApplicationServer::getFeature<
|
||||
ServerFeature>("Server")
|
||||
->vstMaxSize();
|
||||
}
|
||||
|
||||
void VppCommTask::addResponse(VppResponse* response, RequestStatistics* stat) {
|
||||
void VstCommTask::addResponse(VstResponse* response, RequestStatistics* stat) {
|
||||
VPackMessageNoOwnBuffer response_message = response->prepareForNetwork();
|
||||
uint64_t const id = response_message._id;
|
||||
|
||||
|
@ -114,36 +120,16 @@ void VppCommTask::addResponse(VppResponse* response, RequestStatistics* stat) {
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
// don't print by default because at this place the toJson() may
|
||||
// invoke the custom type handler, which is not present here
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "created response:";
|
||||
for (auto const& slice : slices) {
|
||||
try {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << slice.toJson();
|
||||
} catch (arangodb::velocypack::Exception const& e) {
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "--";
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "response -- end";
|
||||
#endif
|
||||
|
||||
static uint32_t const chunkSize =
|
||||
arangodb::application_features::ApplicationServer::getFeature<
|
||||
ServerFeature>("Server")
|
||||
->vppMaxSize();
|
||||
|
||||
// set some sensible maxchunk size and compression
|
||||
auto buffers = createChunkForNetwork(slices, id, chunkSize, false);
|
||||
auto buffers = createChunkForNetwork(slices, id, _maxChunkSize,
|
||||
_protocolVersion);
|
||||
double const totalTime = RequestStatistics::ELAPSED_SINCE_READ_START(stat);
|
||||
|
||||
if (stat != nullptr && arangodb::Logger::isEnabled(arangodb::LogLevel::TRACE,
|
||||
Logger::REQUESTS)) {
|
||||
LOG_TOPIC(TRACE, Logger::REQUESTS)
|
||||
<< "\"vst-request-statistics\",\"" << (void*)this << "\",\""
|
||||
<< VppRequest::translateVersion(_protocolVersion) << "\","
|
||||
<< VstRequest::translateVersion(_protocolVersion) << "\","
|
||||
<< static_cast<int>(response->responseCode()) << ","
|
||||
<< _connectionInfo.clientAddress << "\"," << stat->timingsCsv();
|
||||
}
|
||||
|
@ -173,23 +159,40 @@ void VppCommTask::addResponse(VppResponse* response, RequestStatistics* stat) {
|
|||
LOG_TOPIC(INFO, Logger::REQUESTS)
|
||||
<< "\"vst-request-end\",\"" << (void*)this << "/" << id << "\",\""
|
||||
<< _connectionInfo.clientAddress << "\",\""
|
||||
<< VppRequest::translateVersion(_protocolVersion) << "\","
|
||||
<< VstRequest::translateVersion(_protocolVersion) << "\","
|
||||
<< static_cast<int>(response->responseCode()) << ","
|
||||
<< "\"," << Logger::FIXED(totalTime, 6);
|
||||
}
|
||||
|
||||
VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
|
||||
VppCommTask::ChunkHeader header;
|
||||
static uint32_t readLittleEndian32bit(char const* p) {
|
||||
return (static_cast<uint32_t>(static_cast<uint8_t>(p[3])) << 24) |
|
||||
(static_cast<uint32_t>(static_cast<uint8_t>(p[2])) << 16) |
|
||||
(static_cast<uint32_t>(static_cast<uint8_t>(p[1])) << 8) |
|
||||
(static_cast<uint32_t>(static_cast<uint8_t>(p[0])));
|
||||
}
|
||||
|
||||
static uint64_t readLittleEndian64bit(char const* p) {
|
||||
return (static_cast<uint64_t>(static_cast<uint8_t>(p[7])) << 56) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[6])) << 48) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[5])) << 40) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[4])) << 32) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[3])) << 24) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[2])) << 16) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[1])) << 8) |
|
||||
(static_cast<uint64_t>(static_cast<uint8_t>(p[0])));
|
||||
}
|
||||
|
||||
VstCommTask::ChunkHeader VstCommTask::readChunkHeader() {
|
||||
VstCommTask::ChunkHeader header;
|
||||
|
||||
auto cursor = _readBuffer.begin() + _processReadVariables._readBufferOffset;
|
||||
|
||||
std::memcpy(&header._chunkLength, cursor, sizeof(header._chunkLength));
|
||||
header._chunkLength = readLittleEndian32bit(cursor);
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "chunkLength: "
|
||||
<< header._chunkLength;
|
||||
cursor += sizeof(header._chunkLength);
|
||||
|
||||
uint32_t chunkX;
|
||||
std::memcpy(&chunkX, cursor, sizeof(chunkX));
|
||||
uint32_t chunkX = readLittleEndian32bit(cursor);
|
||||
cursor += sizeof(chunkX);
|
||||
|
||||
header._isFirst = chunkX & 0x1;
|
||||
|
@ -197,17 +200,18 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
|
|||
header._chunk = chunkX >> 1;
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "chunk: " << header._chunk;
|
||||
|
||||
std::memcpy(&header._messageID, cursor, sizeof(header._messageID));
|
||||
header._messageID = readLittleEndian64bit(cursor);
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "message id: "
|
||||
<< header._messageID;
|
||||
cursor += sizeof(header._messageID);
|
||||
|
||||
// extract total len of message
|
||||
if (header._isFirst && header._chunk > 1) {
|
||||
std::memcpy(&header._messageLength, cursor, sizeof(header._messageLength));
|
||||
if (_protocolVersion == ProtocolVersion::VST_1_1 ||
|
||||
(header._isFirst && header._chunk > 1)) {
|
||||
header._messageLength = readLittleEndian64bit(cursor);
|
||||
cursor += sizeof(header._messageLength);
|
||||
} else {
|
||||
header._messageLength = 0; // not needed
|
||||
header._messageLength = 0; // not needed for old protocol
|
||||
}
|
||||
|
||||
header._headerLength = std::distance(
|
||||
|
@ -216,7 +220,7 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
|
|||
return header;
|
||||
}
|
||||
|
||||
bool VppCommTask::isChunkComplete(char* start) {
|
||||
bool VstCommTask::isChunkComplete(char* start) {
|
||||
std::size_t length = std::distance(start, _readBuffer.end());
|
||||
auto& prv = _processReadVariables;
|
||||
|
||||
|
@ -235,23 +239,32 @@ bool VppCommTask::isChunkComplete(char* start) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void VppCommTask::handleAuthentication(VPackSlice const& header,
|
||||
void VstCommTask::handleAuthentication(VPackSlice const& header,
|
||||
uint64_t messageId) {
|
||||
// std::string encryption = header.at(2).copyString();
|
||||
std::string user = header.at(3).copyString();
|
||||
std::string pass = header.at(4).copyString();
|
||||
|
||||
bool authOk = false;
|
||||
if (!_authentication->isEnabled()) {
|
||||
authOk = true;
|
||||
} else {
|
||||
auto auth = basics::StringUtils::encodeBase64(user + ":" + pass);
|
||||
std::string auth;
|
||||
AuthInfo::AuthType authType;
|
||||
std::string user;
|
||||
|
||||
std::string encryption = header.at(2).copyString();
|
||||
if (encryption != "jwt") {
|
||||
user = header.at(3).copyString();
|
||||
std::string pass = header.at(4).copyString();
|
||||
auth = basics::StringUtils::encodeBase64(user + ":" + pass);
|
||||
authType = AuthInfo::AuthType::BASIC;
|
||||
} else { // doing JWT
|
||||
auth = header.at(3).copyString();
|
||||
authType = AuthInfo::AuthType::JWT;
|
||||
}
|
||||
AuthResult result = _authentication->authInfo()->checkAuthentication(
|
||||
AuthInfo::AuthType::BASIC, auth);
|
||||
authType, auth);
|
||||
|
||||
authOk = result._authorized;
|
||||
if (authOk) {
|
||||
_authenticatedUser = std::move(user);
|
||||
_authenticatedUser = std::move(result._username);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,7 +282,7 @@ void VppCommTask::handleAuthentication(VPackSlice const& header,
|
|||
}
|
||||
|
||||
// reads data from the socket
|
||||
bool VppCommTask::processRead(double startTime) {
|
||||
bool VstCommTask::processRead(double startTime) {
|
||||
auto& prv = _processReadVariables;
|
||||
|
||||
auto chunkBegin = _readBuffer.begin() + prv._readBufferOffset;
|
||||
|
@ -282,7 +295,7 @@ bool VppCommTask::processRead(double startTime) {
|
|||
auto vpackBegin = chunkBegin + chunkHeader._headerLength;
|
||||
bool doExecute = false;
|
||||
bool read_maybe_only_part_of_buffer = false;
|
||||
VppInputMessage message; // filled in CASE 1 or CASE 2b
|
||||
VstInputMessage message; // filled in CASE 1 or CASE 2b
|
||||
|
||||
if (chunkHeader._isFirst) {
|
||||
// create agent for new messages
|
||||
|
@ -292,15 +305,14 @@ bool VppCommTask::processRead(double startTime) {
|
|||
|
||||
if (chunkHeader._isFirst && chunkHeader._chunk == 1) {
|
||||
// CASE 1: message is in one chunk
|
||||
if (auto rv = getMessageFromSingleChunk(chunkHeader, message, doExecute,
|
||||
vpackBegin, chunkEnd)) {
|
||||
return *rv; // the optional will only contain false or boost::none
|
||||
// so the execution will contine if a message is complete
|
||||
if (!getMessageFromSingleChunk(chunkHeader, message, doExecute,
|
||||
vpackBegin, chunkEnd)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (auto rv = getMessageFromMultiChunks(chunkHeader, message, doExecute,
|
||||
vpackBegin, chunkEnd)) {
|
||||
return *rv;
|
||||
if (!getMessageFromMultiChunks(chunkHeader, message, doExecute,
|
||||
vpackBegin, chunkEnd)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -334,7 +346,7 @@ bool VppCommTask::processRead(double startTime) {
|
|||
} catch (std::exception const& e) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "VstCommTask: "
|
||||
<< "VPack Validation failed: " << e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
|
@ -344,8 +356,8 @@ bool VppCommTask::processRead(double startTime) {
|
|||
if (type == 1000) {
|
||||
handleAuthentication(header, chunkHeader._messageID);
|
||||
} else {
|
||||
// the handler will take ownersip of this pointer
|
||||
std::unique_ptr<VppRequest> request(new VppRequest(
|
||||
// the handler will take ownership of this pointer
|
||||
std::unique_ptr<VstRequest> request(new VstRequest(
|
||||
_connectionInfo, std::move(message), chunkHeader._messageID));
|
||||
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(request.get());
|
||||
request->setUser(_authenticatedUser);
|
||||
|
@ -354,9 +366,13 @@ bool VppCommTask::processRead(double startTime) {
|
|||
AuthLevel level = AuthLevel::RW;
|
||||
if (_authentication->isEnabled()) { // only check authorization if
|
||||
// authentication is enabled
|
||||
std::string const& dbname = request->databaseName();
|
||||
if (!(_authenticatedUser.empty() && dbname.empty())) {
|
||||
level = _authentication->canUseDatabase(_authenticatedUser, dbname);
|
||||
std::string const& path = request->requestPath();
|
||||
if (!StringUtils::isPrefix(path, "/_open/") &&
|
||||
!StringUtils::isPrefix(path, "/_api/user/")) {
|
||||
std::string const& dbname = request->databaseName();
|
||||
if (!(_authenticatedUser.empty() && dbname.empty())) {
|
||||
level = _authentication->canUseDatabase(_authenticatedUser, dbname);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -376,9 +392,8 @@ bool VppCommTask::processRead(double startTime) {
|
|||
chunkHeader._messageID);
|
||||
} else {
|
||||
request->setClientTaskId(_taskId);
|
||||
_protocolVersion = request->protocolVersion();
|
||||
|
||||
std::unique_ptr<VppResponse> response(new VppResponse(
|
||||
std::unique_ptr<VstResponse> response(new VstResponse(
|
||||
rest::ResponseCode::SERVER_ERROR, chunkHeader._messageID));
|
||||
response->setContentTypeRequested(request->contentTypeResponse());
|
||||
executeRequest(std::move(request), std::move(response));
|
||||
|
@ -396,7 +411,7 @@ bool VppCommTask::processRead(double startTime) {
|
|||
return doExecute;
|
||||
}
|
||||
|
||||
void VppCommTask::closeTask(rest::ResponseCode code) {
|
||||
void VstCommTask::closeTask(rest::ResponseCode code) {
|
||||
_processReadVariables._readBufferOffset = 0;
|
||||
_processReadVariables._currentChunkLength = 0;
|
||||
_readBuffer.clear(); // check is this changes the reserved size
|
||||
|
@ -413,7 +428,7 @@ void VppCommTask::closeTask(rest::ResponseCode code) {
|
|||
closeStream();
|
||||
}
|
||||
|
||||
rest::ResponseCode VppCommTask::authenticateRequest(GeneralRequest* request) {
|
||||
rest::ResponseCode VstCommTask::authenticateRequest(GeneralRequest* request) {
|
||||
auto context = (request == nullptr) ? nullptr : request->requestContext();
|
||||
|
||||
if (context == nullptr && request != nullptr) {
|
||||
|
@ -432,17 +447,17 @@ rest::ResponseCode VppCommTask::authenticateRequest(GeneralRequest* request) {
|
|||
return context->authenticate();
|
||||
}
|
||||
|
||||
std::unique_ptr<GeneralResponse> VppCommTask::createResponse(
|
||||
std::unique_ptr<GeneralResponse> VstCommTask::createResponse(
|
||||
rest::ResponseCode responseCode, uint64_t messageId) {
|
||||
return std::unique_ptr<GeneralResponse>(
|
||||
new VppResponse(responseCode, messageId));
|
||||
new VstResponse(responseCode, messageId));
|
||||
}
|
||||
|
||||
void VppCommTask::handleSimpleError(rest::ResponseCode responseCode,
|
||||
void VstCommTask::handleSimpleError(rest::ResponseCode responseCode,
|
||||
int errorNum,
|
||||
std::string const& errorMessage,
|
||||
uint64_t messageId) {
|
||||
VppResponse response(responseCode, messageId);
|
||||
VstResponse response(responseCode, messageId);
|
||||
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
|
@ -460,12 +475,14 @@ void VppCommTask::handleSimpleError(rest::ResponseCode responseCode,
|
|||
}
|
||||
}
|
||||
|
||||
boost::optional<bool> VppCommTask::getMessageFromSingleChunk(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
// Returns true if and only if there was no error, if false is returned,
|
||||
// the connection is closed
|
||||
bool VstCommTask::getMessageFromSingleChunk(
|
||||
ChunkHeader const& chunkHeader, VstInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd) {
|
||||
// add agent for this new message
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "chunk contains single message";
|
||||
std::size_t payloads = 0;
|
||||
|
||||
|
@ -476,13 +493,13 @@ boost::optional<bool> VppCommTask::getMessageFromSingleChunk(
|
|||
TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, e.what(),
|
||||
chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "VstCommTask: "
|
||||
<< "VPack Validation failed: " << e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} catch (...) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "VPack Validation failed";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
|
@ -493,11 +510,13 @@ boost::optional<bool> VppCommTask::getMessageFromSingleChunk(
|
|||
message.set(chunkHeader._messageID, std::move(buffer), payloads); // fixme
|
||||
|
||||
doExecute = true;
|
||||
return boost::none;
|
||||
return true;
|
||||
}
|
||||
|
||||
boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
// Returns true if and only if there was no error, if false is returned,
|
||||
// the connection is closed
|
||||
bool VstCommTask::getMessageFromMultiChunks(
|
||||
ChunkHeader const& chunkHeader, VstInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd) {
|
||||
// CASE 2: message is in multiple chunks
|
||||
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID);
|
||||
|
@ -505,11 +524,11 @@ boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
|||
// CASE 2a: chunk starts new message
|
||||
if (chunkHeader._isFirst) { // first chunk of multi chunk message
|
||||
// add agent for this new message
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "chunk starts a new message";
|
||||
if (incompleteMessageItr != _incompleteMessages.end()) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "VstCommTask: "
|
||||
<< "Message should be first but is already in the Map of "
|
||||
"incomplete "
|
||||
"messages";
|
||||
|
@ -525,7 +544,7 @@ boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
|||
auto insertPair = _incompleteMessages.emplace(
|
||||
std::make_pair(chunkHeader._messageID, std::move(message)));
|
||||
if (!insertPair.second) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "insert failed";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
|
@ -533,11 +552,11 @@ boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
|||
|
||||
// CASE 2b: chunk continues a message
|
||||
} else { // followup chunk of some mesage
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "chunk continues a message";
|
||||
if (incompleteMessageItr == _incompleteMessages.end()) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "VstCommTask: "
|
||||
<< "found message without previous part";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
|
@ -550,7 +569,7 @@ boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
|||
|
||||
// MESSAGE COMPLETE
|
||||
if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "chunk completes a message";
|
||||
std::size_t payloads = 0;
|
||||
|
||||
|
@ -565,13 +584,13 @@ boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
|||
TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, e.what(),
|
||||
chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "VstCommTask: "
|
||||
<< "VPack Validation failed: " << e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} catch (...) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VstCommTask: "
|
||||
<< "VPack Validation failed!";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
|
@ -584,8 +603,8 @@ boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
|||
doExecute = true;
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "VstCommTask: "
|
||||
<< "chunk does not complete a message";
|
||||
}
|
||||
return boost::none;
|
||||
return true;
|
||||
}
|
|
@ -21,17 +21,16 @@
|
|||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_GENERAL_SERVER_VPP_COMM_TASK_H
|
||||
#define ARANGOD_GENERAL_SERVER_VPP_COMM_TASK_H 1
|
||||
#ifndef ARANGOD_GENERAL_SERVER_VST_COMM_TASK_H
|
||||
#define ARANGOD_GENERAL_SERVER_VST_COMM_TASK_H 1
|
||||
|
||||
#include "GeneralServer/GeneralCommTask.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <stdexcept>
|
||||
|
||||
#include "lib/Rest/VppMessage.h"
|
||||
#include "lib/Rest/VppRequest.h"
|
||||
#include "lib/Rest/VppResponse.h"
|
||||
#include "lib/Rest/VstMessage.h"
|
||||
#include "lib/Rest/VstRequest.h"
|
||||
#include "lib/Rest/VstResponse.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
|
@ -39,25 +38,26 @@ class AuthenticationFeature;
|
|||
|
||||
namespace rest {
|
||||
|
||||
class VppCommTask : public GeneralCommTask {
|
||||
class VstCommTask : public GeneralCommTask {
|
||||
public:
|
||||
VppCommTask(EventLoop, GeneralServer*, std::unique_ptr<Socket> socket,
|
||||
ConnectionInfo&&, double timeout, bool skipSocketInit = false);
|
||||
VstCommTask(EventLoop, GeneralServer*, std::unique_ptr<Socket> socket,
|
||||
ConnectionInfo&&, double timeout, ProtocolVersion protocolVersion,
|
||||
bool skipSocketInit = false);
|
||||
|
||||
// convert from GeneralResponse to vppResponse ad dispatch request to class
|
||||
// convert from GeneralResponse to VstResponse ad dispatch request to class
|
||||
// internal addResponse
|
||||
void addResponse(GeneralResponse* response, RequestStatistics* stat) override {
|
||||
VppResponse* vppResponse = dynamic_cast<VppResponse*>(response);
|
||||
VstResponse* vstResponse = dynamic_cast<VstResponse*>(response);
|
||||
|
||||
if (vppResponse == nullptr) {
|
||||
if (vstResponse == nullptr) {
|
||||
throw std::logic_error("invalid response or response Type");
|
||||
}
|
||||
|
||||
addResponse(vppResponse, stat);
|
||||
addResponse(vstResponse, stat);
|
||||
};
|
||||
|
||||
arangodb::Endpoint::TransportType transportType() override {
|
||||
return arangodb::Endpoint::TransportType::VPP;
|
||||
return arangodb::Endpoint::TransportType::VST;
|
||||
};
|
||||
|
||||
protected:
|
||||
|
@ -71,7 +71,7 @@ class VppCommTask : public GeneralCommTask {
|
|||
void handleAuthentication(VPackSlice const& header, uint64_t messageId);
|
||||
|
||||
void handleSimpleError(rest::ResponseCode code, uint64_t id) override {
|
||||
VppResponse response(code, id);
|
||||
VstResponse response(code, id);
|
||||
addResponse(&response, nullptr);
|
||||
}
|
||||
|
||||
|
@ -86,7 +86,7 @@ class VppCommTask : public GeneralCommTask {
|
|||
// request handling aborts prematurely
|
||||
void closeTask(rest::ResponseCode code = rest::ResponseCode::SERVER_ERROR);
|
||||
|
||||
void addResponse(VppResponse*, RequestStatistics* stat);
|
||||
void addResponse(VstResponse*, RequestStatistics* stat);
|
||||
rest::ResponseCode authenticateRequest(GeneralRequest* request);
|
||||
|
||||
private:
|
||||
|
@ -133,16 +133,22 @@ class VppCommTask : public GeneralCommTask {
|
|||
ChunkHeader readChunkHeader(); // sub-function of processRead
|
||||
void replyToIncompleteMessages();
|
||||
|
||||
boost::optional<bool> getMessageFromSingleChunk(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
// Returns true if and only if there was no error, if false is returned,
|
||||
// the connection is closed
|
||||
bool getMessageFromSingleChunk(
|
||||
ChunkHeader const& chunkHeader, VstInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd);
|
||||
|
||||
boost::optional<bool> getMessageFromMultiChunks(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
// Returns true if and only if there was no error, if false is returned,
|
||||
// the connection is closed
|
||||
bool getMessageFromMultiChunks(
|
||||
ChunkHeader const& chunkHeader, VstInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd);
|
||||
|
||||
std::string _authenticatedUser;
|
||||
AuthenticationFeature* _authentication;
|
||||
ProtocolVersion _protocolVersion;
|
||||
uint32_t _maxChunkSize;
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,262 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_VST_NETWORK_H
|
||||
#define ARANGOD_VST_NETWORK_H 1
|
||||
|
||||
#include "Basics/StringBuffer.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Logger/LoggerFeature.h"
|
||||
|
||||
#include <velocypack/Options.h>
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/Validator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
template <typename T>
|
||||
void appendLittleEndian(basics::StringBuffer* buffer, T v) {
|
||||
for (size_t i = 0; i < sizeof(T); ++i) {
|
||||
buffer->appendChar(static_cast<char>(v & 0xffu));
|
||||
v >>= 8;
|
||||
}
|
||||
}
|
||||
|
||||
inline constexpr std::size_t chunkHeaderLength(bool sendTotalLen) {
|
||||
// chunkLength uint32 , chunkX uint32 , id uint64 , messageLength unit64
|
||||
return sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) +
|
||||
(sendTotalLen ? sizeof(uint64_t) : 0);
|
||||
}
|
||||
|
||||
// Send Message Created from Slices
|
||||
|
||||
// working version of single chunk message creation
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
|
||||
std::vector<VPackSlice> const& slices, bool isFirstChunk, uint32_t chunk,
|
||||
uint64_t id, ProtocolVersion protocolVersion,
|
||||
uint64_t totalMessageLength) {
|
||||
using basics::StringBuffer;
|
||||
|
||||
bool sendTotalLen = protocolVersion != ProtocolVersion::VST_1_0 ||
|
||||
(isFirstChunk && chunk > 1);
|
||||
// if we speak VST_1_0 and have more than one chunk and the chunk
|
||||
// is the first then we are sending the first in a series. if this
|
||||
// condition is true we also send extra 8 bytes for the messageLength
|
||||
// (length of all VPackData). In later versions we always send the
|
||||
// total length.
|
||||
|
||||
// build chunkX -- see VelocyStream Documentaion
|
||||
chunk <<= 1;
|
||||
chunk |= isFirstChunk ? 0x1 : 0x0;
|
||||
|
||||
// get the length of VPack data
|
||||
uint32_t dataLength = 0;
|
||||
for (auto& slice : slices) {
|
||||
// TODO: is a 32bit value sufficient for all Slices here?
|
||||
dataLength += static_cast<uint32_t>(slice.byteSize());
|
||||
}
|
||||
|
||||
// calculate length of current chunk
|
||||
uint32_t chunkLength =
|
||||
dataLength + static_cast<uint32_t>(chunkHeaderLength(sendTotalLen));
|
||||
|
||||
auto buffer =
|
||||
std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, chunkLength, false);
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "chunkLength: " << chunkLength;
|
||||
appendLittleEndian(buffer.get(), chunkLength);
|
||||
appendLittleEndian(buffer.get(), chunk);
|
||||
appendLittleEndian(buffer.get(), id);
|
||||
|
||||
if (sendTotalLen) {
|
||||
appendLittleEndian(buffer.get(), totalMessageLength);
|
||||
}
|
||||
|
||||
// append data in slices
|
||||
for (auto const& slice : slices) {
|
||||
try{
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << slice.toJson() << " , " << slice.byteSize();
|
||||
} catch(...){}
|
||||
buffer->appendText(slice.startAs<char>(), slice.byteSize());
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
// helper functions for sending chunks when given a string buffer as input
|
||||
// working version of single chunk message creation
|
||||
inline std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
|
||||
char const* data, std::size_t begin, std::size_t end, bool isFirstChunk,
|
||||
uint32_t chunk, uint64_t id, ProtocolVersion protocolVersion,
|
||||
uint64_t totalMessageLength) {
|
||||
using basics::StringBuffer;
|
||||
|
||||
bool sendTotalLen = protocolVersion != ProtocolVersion::VST_1_0 ||
|
||||
(isFirstChunk && chunk > 1);
|
||||
// if we speak VST_1_0 and have more than one chunk and the chunk
|
||||
// is the first then we are sending the first in a series. if this
|
||||
// condition is true we also send extra 8 bytes for the messageLength
|
||||
// (length of all VPackData). In later versions we always send the
|
||||
// total length.
|
||||
|
||||
// build chunkX -- see VelocyStream Documentaion
|
||||
chunk <<= 1;
|
||||
chunk |= isFirstChunk ? 0x1 : 0x0;
|
||||
|
||||
// get the lenght of VPack data
|
||||
uint32_t dataLength = static_cast<uint32_t>(end - begin);
|
||||
|
||||
// calculate length of current chunk
|
||||
uint32_t chunkLength =
|
||||
dataLength + static_cast<uint32_t>(chunkHeaderLength(sendTotalLen));
|
||||
|
||||
auto buffer =
|
||||
std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, chunkLength, false);
|
||||
|
||||
appendLittleEndian(buffer.get(), chunkLength);
|
||||
appendLittleEndian(buffer.get(), chunk);
|
||||
appendLittleEndian(buffer.get(), id);
|
||||
|
||||
if (sendTotalLen) {
|
||||
appendLittleEndian(buffer.get(), totalMessageLength);
|
||||
}
|
||||
|
||||
buffer->appendText(data + begin, dataLength);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
// this function will be called when we send multiple compressed
|
||||
// or uncompressed chunks
|
||||
inline void send_many(
|
||||
std::vector<std::unique_ptr<basics::StringBuffer>>& resultVecRef,
|
||||
uint64_t id, std::size_t maxChunkBytes,
|
||||
std::unique_ptr<basics::StringBuffer> completeMessage,
|
||||
std::size_t uncompressedCompleteMessageLength,
|
||||
ProtocolVersion protocolVersion) {
|
||||
uint64_t totalLen = completeMessage->length();
|
||||
std::size_t offsetBegin = 0;
|
||||
std::size_t offsetEnd = maxChunkBytes - chunkHeaderLength(true);
|
||||
// maximum number of bytes for follow up chunks
|
||||
std::size_t maxBytes = maxChunkBytes - chunkHeaderLength(false);
|
||||
|
||||
uint32_t numberOfChunks = 1;
|
||||
{ // calculate the number of chunks that will be send
|
||||
std::size_t bytesToSend = totalLen - maxChunkBytes +
|
||||
chunkHeaderLength(true); // data for first chunk
|
||||
while (bytesToSend >= maxBytes) {
|
||||
bytesToSend -= maxBytes;
|
||||
++numberOfChunks;
|
||||
}
|
||||
if (bytesToSend) {
|
||||
++numberOfChunks;
|
||||
}
|
||||
}
|
||||
|
||||
// send first
|
||||
resultVecRef.push_back(
|
||||
createChunkForNetworkDetail(completeMessage->c_str(), offsetBegin,
|
||||
offsetEnd, true, numberOfChunks, id,
|
||||
protocolVersion, totalLen));
|
||||
|
||||
std::uint32_t chunkNumber = 0;
|
||||
while (offsetEnd + maxBytes <= totalLen) {
|
||||
// send middle
|
||||
offsetBegin = offsetEnd;
|
||||
offsetEnd += maxBytes;
|
||||
chunkNumber++;
|
||||
resultVecRef.push_back(createChunkForNetworkDetail(
|
||||
completeMessage->c_str(), offsetBegin, offsetEnd, false, chunkNumber,
|
||||
id, protocolVersion, totalLen));
|
||||
}
|
||||
|
||||
if (offsetEnd < totalLen) {
|
||||
resultVecRef.push_back(createChunkForNetworkDetail(
|
||||
completeMessage->c_str(), offsetEnd, totalLen, false, ++chunkNumber,
|
||||
id, protocolVersion, totalLen));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// this function will be called by client code
|
||||
inline std::vector<std::unique_ptr<basics::StringBuffer>> createChunkForNetwork(
|
||||
std::vector<VPackSlice> const& slices, uint64_t id,
|
||||
std::size_t maxChunkBytes, ProtocolVersion protocolVersion) {
|
||||
/// variables used in this function
|
||||
std::size_t payloadLength = 0;
|
||||
|
||||
std::vector<std::unique_ptr<basics::StringBuffer>> rv;
|
||||
|
||||
// find out the uncompressed payload length
|
||||
for (auto const& slice : slices) {
|
||||
payloadLength += slice.byteSize();
|
||||
}
|
||||
|
||||
bool sendTotalLen = protocolVersion != ProtocolVersion::VST_1_0;
|
||||
size_t chl = chunkHeaderLength(sendTotalLen);
|
||||
|
||||
if (payloadLength < maxChunkBytes - chl) {
|
||||
// one chunk uncompressed
|
||||
rv.push_back(createChunkForNetworkDetail(slices, true, 1, id,
|
||||
protocolVersion, chl + payloadLength));
|
||||
return rv;
|
||||
} else {
|
||||
// here we enter the domain of multichunck
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VstCommTask: sending multichunk message";
|
||||
|
||||
// TODO: test if we have smaller slices that fit into chunks
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VstCommTask: there are slices that do not fit into a single "
|
||||
"totalMessageLength or compression is enabled";
|
||||
// we have big slices that do not fit into single chunks
|
||||
// now we will build one big buffer and split it into pieces
|
||||
|
||||
// reserve buffer
|
||||
auto vstPayload = std::make_unique<basics::StringBuffer>(
|
||||
TRI_UNKNOWN_MEM_ZONE, payloadLength, false);
|
||||
|
||||
// fill buffer
|
||||
for (auto const& slice : slices) {
|
||||
try{
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << slice.toJson() << " , " << slice.byteSize();
|
||||
} catch(...){}
|
||||
vstPayload->appendText(slice.startAs<char>(), slice.byteSize());
|
||||
}
|
||||
|
||||
// create chunks
|
||||
send_many(rv, id, maxChunkBytes, std::move(vstPayload),
|
||||
payloadLength, protocolVersion);
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
|
@ -734,9 +734,9 @@ void MMFilesRestReplicationHandler::handleCommandBarrier() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void MMFilesRestReplicationHandler::handleTrampolineCoordinator() {
|
||||
bool useVpp = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VPP) {
|
||||
useVpp = true;
|
||||
bool useVst = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VST) {
|
||||
useVst = true;
|
||||
}
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid request");
|
||||
|
@ -781,7 +781,7 @@ void MMFilesRestReplicationHandler::handleTrampolineCoordinator() {
|
|||
}
|
||||
|
||||
std::unique_ptr<ClusterCommResult> res;
|
||||
if (!useVpp) {
|
||||
if (!useVst) {
|
||||
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
|
||||
if (httpRequest == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -832,7 +832,7 @@ void MMFilesRestReplicationHandler::handleTrampolineCoordinator() {
|
|||
_response->setContentType(
|
||||
res->result->getHeaderField(StaticStrings::ContentTypeHeader, dummy));
|
||||
|
||||
if (!useVpp) {
|
||||
if (!useVst) {
|
||||
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -856,9 +856,9 @@ void MMFilesRestReplicationHandler::handleTrampolineCoordinator() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void MMFilesRestReplicationHandler::handleCommandLoggerFollow() {
|
||||
bool useVpp = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VPP) {
|
||||
useVpp = true;
|
||||
bool useVst = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VST) {
|
||||
useVst = true;
|
||||
}
|
||||
|
||||
// determine start and end tick
|
||||
|
@ -969,7 +969,7 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
// initialize the dump container
|
||||
MMFilesReplicationDumpContext dump(transactionContext,
|
||||
static_cast<size_t>(determineChunkSize()),
|
||||
includeSystem, cid, useVpp);
|
||||
includeSystem, cid, useVst);
|
||||
|
||||
// and dump
|
||||
int res = MMFilesDumpLogReplication(&dump, transactionIds, firstRegularTick,
|
||||
|
@ -983,7 +983,7 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
|
||||
// generate the result
|
||||
size_t length = 0;
|
||||
if (useVpp) {
|
||||
if (useVst) {
|
||||
length = dump._slices.size();
|
||||
} else {
|
||||
length = TRI_LengthStringBuffer(dump._buffer);
|
||||
|
@ -1010,7 +1010,7 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
dump._fromTickIncluded ? "true" : "false");
|
||||
|
||||
if (length > 0) {
|
||||
if (useVpp) {
|
||||
if (useVst) {
|
||||
for (auto message : dump._slices) {
|
||||
_response->addPayload(std::move(message), &dump._vpackOptions, true);
|
||||
}
|
||||
|
|
|
@ -510,7 +510,7 @@ static int DumpCollection(MMFilesReplicationDumpContext* dump,
|
|||
LogicalCollection* collection,
|
||||
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
|
||||
bool withTicks, bool useVpp = false) {
|
||||
bool withTicks, bool useVst = false) {
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "dumping collection " << collection->cid() << ", tick range "
|
||||
<< dataMin << " - " << dataMax;
|
||||
|
||||
|
@ -521,14 +521,14 @@ static int DumpCollection(MMFilesReplicationDumpContext* dump,
|
|||
bool bufferFull = false;
|
||||
|
||||
auto callback = [&dump, &lastFoundTick, &databaseId, &collectionId,
|
||||
&withTicks, &isEdgeCollection, &bufferFull, &useVpp,
|
||||
&withTicks, &isEdgeCollection, &bufferFull, &useVst,
|
||||
&collection](
|
||||
TRI_voc_tick_t foundTick, MMFilesMarker const* marker) {
|
||||
// note the last tick we processed
|
||||
lastFoundTick = foundTick;
|
||||
|
||||
int res;
|
||||
if (useVpp) {
|
||||
if (useVst) {
|
||||
res = SliceifyMarker(dump, databaseId, collectionId, marker, true,
|
||||
withTicks, isEdgeCollection);
|
||||
} else {
|
||||
|
@ -541,7 +541,7 @@ static int DumpCollection(MMFilesReplicationDumpContext* dump,
|
|||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
|
||||
// TODO if vppcase find out slice lenght of _slices.back()
|
||||
// TODO if vstcase find out slice lenght of _slices.back()
|
||||
if (static_cast<uint64_t>(TRI_LengthStringBuffer(dump->_buffer)) >
|
||||
dump->_chunkSize) {
|
||||
// abort the iteration
|
||||
|
@ -752,7 +752,7 @@ int MMFilesDumpLogReplication(
|
|||
}
|
||||
}
|
||||
|
||||
if (dump->_useVpp) {
|
||||
if (dump->_useVst) {
|
||||
res = SliceifyMarker(dump, databaseId, collectionId, marker, false,
|
||||
true, false);
|
||||
} else {
|
||||
|
@ -817,7 +817,7 @@ int MMFilesDumpLogReplication(
|
|||
int MMFilesDetermineOpenTransactionsReplication(MMFilesReplicationDumpContext* dump,
|
||||
TRI_voc_tick_t tickMin,
|
||||
TRI_voc_tick_t tickMax,
|
||||
bool useVpp) {
|
||||
bool useVst) {
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "determining transactions, tick range " << tickMin << " - "
|
||||
<< tickMax;
|
||||
|
||||
|
@ -906,7 +906,7 @@ int MMFilesDetermineOpenTransactionsReplication(MMFilesReplicationDumpContext* d
|
|||
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
VPackBuilder builder(buffer);
|
||||
if (useVpp) {
|
||||
if (useVst) {
|
||||
if (transactions.empty()) {
|
||||
builder.add(VPackSlice::emptyArraySlice());
|
||||
} else {
|
||||
|
|
|
@ -45,7 +45,7 @@ struct MMFilesReplicationDumpContext {
|
|||
MMFilesReplicationDumpContext(std::shared_ptr<arangodb::transaction::StandaloneContext>
|
||||
transactionContext,
|
||||
size_t chunkSize, bool includeSystem,
|
||||
TRI_voc_cid_t restrictCollection, bool useVpp = false)
|
||||
TRI_voc_cid_t restrictCollection, bool useVst = false)
|
||||
: _transactionContext(transactionContext),
|
||||
_vocbase(transactionContext->vocbase()),
|
||||
_buffer(nullptr),
|
||||
|
@ -61,13 +61,13 @@ struct MMFilesReplicationDumpContext {
|
|||
_fromTickIncluded(false),
|
||||
_compat28(false),
|
||||
_slices(),
|
||||
_useVpp(useVpp) {
|
||||
_useVst(useVst) {
|
||||
if (_chunkSize == 0) {
|
||||
// default chunk size
|
||||
_chunkSize = 128 * 1024;
|
||||
}
|
||||
|
||||
if (!useVpp) {
|
||||
if (!useVst) {
|
||||
_buffer = TRI_CreateSizedStringBuffer(TRI_UNKNOWN_MEM_ZONE, _chunkSize);
|
||||
|
||||
if (_buffer == nullptr) {
|
||||
|
@ -98,7 +98,7 @@ struct MMFilesReplicationDumpContext {
|
|||
bool _fromTickIncluded;
|
||||
bool _compat28;
|
||||
std::vector<VPackBuffer<uint8_t>> _slices;
|
||||
bool _useVpp;
|
||||
bool _useVst;
|
||||
};
|
||||
|
||||
/// @brief dump data from a single collection
|
||||
|
@ -115,6 +115,6 @@ int MMFilesDumpLogReplication(MMFilesReplicationDumpContext*,
|
|||
/// @brief determine the transactions that were open at a given point in time
|
||||
int MMFilesDetermineOpenTransactionsReplication(MMFilesReplicationDumpContext*,
|
||||
TRI_voc_tick_t, TRI_voc_tick_t,
|
||||
bool useVpp = false);
|
||||
bool useVst = false);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -50,8 +50,8 @@ RestStatus RestBatchHandler::execute() {
|
|||
case Endpoint::TransportType::HTTP: {
|
||||
return executeHttp();
|
||||
}
|
||||
case Endpoint::TransportType::VPP: {
|
||||
return executeVpp();
|
||||
case Endpoint::TransportType::VST: {
|
||||
return executeVst();
|
||||
}
|
||||
}
|
||||
// should never get here
|
||||
|
@ -59,7 +59,7 @@ RestStatus RestBatchHandler::execute() {
|
|||
return RestStatus::FAIL;
|
||||
}
|
||||
|
||||
RestStatus RestBatchHandler::executeVpp() {
|
||||
RestStatus RestBatchHandler::executeVst() {
|
||||
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_NO_ERROR,
|
||||
"The RestBatchHandler is not supported for this protocol!");
|
||||
return RestStatus::DONE;
|
||||
|
|
|
@ -66,7 +66,7 @@ class RestBatchHandler : public RestVocbaseBaseHandler {
|
|||
|
||||
private:
|
||||
RestStatus executeHttp();
|
||||
RestStatus executeVpp();
|
||||
RestStatus executeVst();
|
||||
// extract the boundary from the body of a multipart message
|
||||
bool getBoundaryBody(std::string*);
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@ RestStatus RestImportHandler::execute() {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case Endpoint::TransportType::VPP: {
|
||||
case Endpoint::TransportType::VST: {
|
||||
if (found &&
|
||||
(documentType == "documents" || documentType == "array" ||
|
||||
documentType == "list" || documentType == "auto")) {
|
||||
|
|
|
@ -47,7 +47,7 @@ using namespace arangodb::rest;
|
|||
ServerFeature::ServerFeature(application_features::ApplicationServer* server,
|
||||
int* res)
|
||||
: ApplicationFeature(server, "Server"),
|
||||
_vppMaxSize(1024 * 30),
|
||||
_vstMaxSize(1024 * 30),
|
||||
_result(res),
|
||||
_operationMode(OperationMode::MODE_SERVER) {
|
||||
setOptional(true);
|
||||
|
@ -87,7 +87,7 @@ void ServerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
|
||||
options->addOption("--vst.maxsize",
|
||||
"maximal size (in bytes) for a VelocyPack chunk",
|
||||
new UInt32Parameter(&_vppMaxSize));
|
||||
new UInt32Parameter(&_vstMaxSize));
|
||||
}
|
||||
|
||||
void ServerFeature::validateOptions(std::shared_ptr<ProgramOptions>) {
|
||||
|
|
|
@ -58,7 +58,7 @@ class ServerFeature final : public application_features::ApplicationFeature {
|
|||
|
||||
std::vector<std::string> const& scripts() const { return _scripts; }
|
||||
std::vector<std::string> const& unitTests() const { return _unitTests; }
|
||||
uint32_t const& vppMaxSize() const { return _vppMaxSize; }
|
||||
uint32_t const& vstMaxSize() const { return _vstMaxSize; }
|
||||
|
||||
bool isConsoleMode() const {
|
||||
return (_operationMode == OperationMode::MODE_CONSOLE);
|
||||
|
@ -72,7 +72,7 @@ class ServerFeature final : public application_features::ApplicationFeature {
|
|||
bool _restServer = true;
|
||||
std::vector<std::string> _unitTests;
|
||||
std::vector<std::string> _scripts;
|
||||
uint32_t _vppMaxSize;
|
||||
uint32_t _vstMaxSize;
|
||||
int* _result;
|
||||
OperationMode _operationMode;
|
||||
bool _isStopping = false;
|
||||
|
|
|
@ -479,9 +479,9 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RocksDBRestReplicationHandler::handleTrampolineCoordinator() {
|
||||
bool useVpp = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VPP) {
|
||||
useVpp = true;
|
||||
bool useVst = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VST) {
|
||||
useVst = true;
|
||||
}
|
||||
if (_request == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid request");
|
||||
|
@ -526,7 +526,7 @@ void RocksDBRestReplicationHandler::handleTrampolineCoordinator() {
|
|||
}
|
||||
|
||||
std::unique_ptr<ClusterCommResult> res;
|
||||
if (!useVpp) {
|
||||
if (!useVst) {
|
||||
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
|
||||
if (httpRequest == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -577,7 +577,7 @@ void RocksDBRestReplicationHandler::handleTrampolineCoordinator() {
|
|||
_response->setContentType(
|
||||
res->result->getHeaderField(StaticStrings::ContentTypeHeader, dummy));
|
||||
|
||||
if (!useVpp) {
|
||||
if (!useVst) {
|
||||
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
|
||||
if (_response == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -601,9 +601,9 @@ void RocksDBRestReplicationHandler::handleTrampolineCoordinator() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
|
||||
bool useVpp = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VPP) {
|
||||
useVpp = true;
|
||||
bool useVst = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VST) {
|
||||
useVst = true;
|
||||
}
|
||||
|
||||
// determine start and end tick
|
||||
|
@ -703,7 +703,7 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
result.fromTickIncluded() ? "true" : "false");
|
||||
|
||||
if (length > 0) {
|
||||
if (useVpp) {
|
||||
if (useVst) {
|
||||
for (auto message : arangodb::velocypack::ArrayIterator(data)) {
|
||||
_response->addPayload(VPackSlice(message),
|
||||
transactionContext->getVPackOptions(), true);
|
||||
|
@ -747,9 +747,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() {
|
||||
bool useVpp = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VPP) {
|
||||
useVpp = true;
|
||||
bool useVst = false;
|
||||
if (_request->transportType() == Endpoint::TransportType::VST) {
|
||||
useVst = true;
|
||||
}
|
||||
//_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK,
|
||||
// StringUtils::itoa(dump._lastFoundTick));
|
||||
|
@ -759,7 +759,7 @@ void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() {
|
|||
// dump._fromTickIncluded ? "true" : "false");
|
||||
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, "true");
|
||||
VPackSlice slice = VelocyPackHelper::EmptyArrayValue();
|
||||
if (useVpp) {
|
||||
if (useVst) {
|
||||
_response->addPayload(slice, &VPackOptions::Defaults, false);
|
||||
} else {
|
||||
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
|
||||
|
|
|
@ -585,7 +585,7 @@ static void ResponseV8ToCpp(v8::Isolate* isolate, TRI_v8_global_t const* v8g,
|
|||
}
|
||||
break;
|
||||
|
||||
case Endpoint::TransportType::VPP:
|
||||
case Endpoint::TransportType::VST:
|
||||
response->setHeader(arangodb::StaticStrings::ContentTypeHeader,
|
||||
contentType);
|
||||
break;
|
||||
|
@ -673,7 +673,7 @@ static void ResponseV8ToCpp(v8::Isolate* isolate, TRI_v8_global_t const* v8g,
|
|||
}
|
||||
} break;
|
||||
|
||||
case Endpoint::TransportType::VPP: {
|
||||
case Endpoint::TransportType::VST: {
|
||||
VPackBuilder builder;
|
||||
|
||||
v8::Handle<v8::Value> v8Body = res->Get(BodyKey);
|
||||
|
@ -693,7 +693,7 @@ static void ResponseV8ToCpp(v8::Isolate* isolate, TRI_v8_global_t const* v8g,
|
|||
transformations->Get(v8::Integer::New(isolate, i));
|
||||
std::string name = TRI_ObjectToString(transformator);
|
||||
|
||||
// we do not decode in the vpp case
|
||||
// we do not decode in the vst case
|
||||
// check available transformations
|
||||
if (name == "base64decode") {
|
||||
out = StringUtils::decodeBase64(out);
|
||||
|
@ -776,7 +776,7 @@ static void ResponseV8ToCpp(v8::Isolate* isolate, TRI_v8_global_t const* v8g,
|
|||
TRI_FreeString(TRI_UNKNOWN_MEM_ZONE, content);
|
||||
} break;
|
||||
|
||||
case Endpoint::TransportType::VPP: {
|
||||
case Endpoint::TransportType::VST: {
|
||||
VPackBuilder builder;
|
||||
builder.add(
|
||||
VPackValuePair(reinterpret_cast<uint8_t const*>(content), length));
|
||||
|
@ -841,7 +841,7 @@ static void ResponseV8ToCpp(v8::Isolate* isolate, TRI_v8_global_t const* v8g,
|
|||
}
|
||||
} break;
|
||||
|
||||
case Endpoint::TransportType::VPP:
|
||||
case Endpoint::TransportType::VST:
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -1145,7 +1145,7 @@ static void JS_RawRequestBody(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
|||
}
|
||||
} break;
|
||||
|
||||
case Endpoint::TransportType::VPP: {
|
||||
case Endpoint::TransportType::VST: {
|
||||
if (request != nullptr) {
|
||||
auto slice = request->payload();
|
||||
V8Buffer* buffer = nullptr;
|
||||
|
|
|
@ -193,8 +193,8 @@ add_library(${LIB_ARANGO} STATIC
|
|||
Random/UniformCharacter.cpp
|
||||
Rest/GeneralRequest.cpp
|
||||
Rest/GeneralResponse.cpp
|
||||
Rest/VppRequest.cpp
|
||||
Rest/VppResponse.cpp
|
||||
Rest/VstRequest.cpp
|
||||
Rest/VstResponse.cpp
|
||||
Rest/HttpRequest.cpp
|
||||
Rest/HttpResponse.cpp
|
||||
Rest/InitializeRest.cpp
|
||||
|
|
|
@ -99,7 +99,7 @@ std::string Endpoint::unifiedForm(std::string const& specification) {
|
|||
}
|
||||
|
||||
if (StringUtils::isPrefix(copy, "vst+")) {
|
||||
protocol = TransportType::VPP;
|
||||
protocol = TransportType::VST;
|
||||
prefix = "vst+";
|
||||
copy = copy.substr(4);
|
||||
}
|
||||
|
@ -141,9 +141,9 @@ std::string Endpoint::unifiedForm(std::string const& specification) {
|
|||
found = temp.find("]", 1);
|
||||
if (found != std::string::npos && found > 2 && found + 1 == temp.size()) {
|
||||
// hostname only (e.g. [address])
|
||||
if (protocol == TransportType::VPP) {
|
||||
if (protocol == TransportType::VST) {
|
||||
return prefix + copy + ":" +
|
||||
StringUtils::itoa(EndpointIp::_defaultPortVpp);
|
||||
StringUtils::itoa(EndpointIp::_defaultPortVst);
|
||||
} else {
|
||||
return prefix + copy + ":" +
|
||||
StringUtils::itoa(EndpointIp::_defaultPortHttp);
|
||||
|
@ -172,7 +172,7 @@ std::string Endpoint::unifiedForm(std::string const& specification) {
|
|||
return prefix + copy + ":" +
|
||||
StringUtils::itoa(EndpointIp::_defaultPortHttp);
|
||||
} else {
|
||||
return prefix + copy + ":" + StringUtils::itoa(EndpointIp::_defaultPortVpp);
|
||||
return prefix + copy + ":" + StringUtils::itoa(EndpointIp::_defaultPortVst);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,9 +221,6 @@ Endpoint* Endpoint::factory(const Endpoint::EndpointType type,
|
|||
if (StringUtils::isPrefix(copy, "http+")) {
|
||||
protocol = TransportType::HTTP;
|
||||
copy = copy.substr(5);
|
||||
} else if (StringUtils::isPrefix(copy, "vst+")) {
|
||||
protocol = TransportType::VPP;
|
||||
copy = copy.substr(4);
|
||||
} else {
|
||||
// invalid protocol
|
||||
return nullptr;
|
||||
|
@ -271,7 +268,7 @@ Endpoint* Endpoint::factory(const Endpoint::EndpointType type,
|
|||
copy = copy.substr(6);
|
||||
uint16_t defaultPort = (protocol == TransportType::HTTP)
|
||||
? EndpointIp::_defaultPortHttp
|
||||
: EndpointIp::_defaultPortVpp;
|
||||
: EndpointIp::_defaultPortVst;
|
||||
|
||||
size_t found;
|
||||
|
||||
|
@ -340,9 +337,9 @@ std::string const Endpoint::defaultEndpoint(TransportType type) {
|
|||
return "http+tcp://" + std::string(EndpointIp::_defaultHost) + ":" +
|
||||
StringUtils::itoa(EndpointIp::_defaultPortHttp);
|
||||
|
||||
case TransportType::VPP:
|
||||
case TransportType::VST:
|
||||
return "vst+tcp://" + std::string(EndpointIp::_defaultHost) + ":" +
|
||||
StringUtils::itoa(EndpointIp::_defaultPortVpp);
|
||||
StringUtils::itoa(EndpointIp::_defaultPortVst);
|
||||
|
||||
default: {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
|
@ -406,8 +403,8 @@ std::ostream& operator<<(std::ostream& stream,
|
|||
case arangodb::Endpoint::TransportType::HTTP:
|
||||
stream << "http";
|
||||
break;
|
||||
case arangodb::Endpoint::TransportType::VPP:
|
||||
stream << "vsp";
|
||||
case arangodb::Endpoint::TransportType::VST:
|
||||
stream << "vst";
|
||||
break;
|
||||
}
|
||||
return stream;
|
||||
|
|
|
@ -40,7 +40,7 @@ namespace arangodb {
|
|||
|
||||
class Endpoint {
|
||||
public:
|
||||
enum class TransportType { HTTP, VPP };
|
||||
enum class TransportType { HTTP, VST };
|
||||
enum class EndpointType { SERVER, CLIENT };
|
||||
enum class EncryptionType { NONE = 0, SSL };
|
||||
enum class DomainType { UNKNOWN = 0, UNIX, IPV4, IPV6, SRV };
|
||||
|
|
|
@ -46,7 +46,7 @@ using namespace arangodb::basics;
|
|||
#endif
|
||||
|
||||
uint16_t const EndpointIp::_defaultPortHttp = 8529;
|
||||
uint16_t const EndpointIp::_defaultPortVpp = 8530;
|
||||
uint16_t const EndpointIp::_defaultPortVst = 8530;
|
||||
char const* EndpointIp::_defaultHost = "127.0.0.1";
|
||||
|
||||
static std::string buildSpecification(Endpoint::DomainType domainType,
|
||||
|
@ -60,7 +60,7 @@ static std::string buildSpecification(Endpoint::DomainType domainType,
|
|||
case Endpoint::TransportType::HTTP:
|
||||
specification = "http+";
|
||||
break;
|
||||
case Endpoint::TransportType::VPP:
|
||||
case Endpoint::TransportType::VST:
|
||||
specification = "vst+";
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ class EndpointIp : public Endpoint {
|
|||
|
||||
public:
|
||||
static uint16_t const _defaultPortHttp;
|
||||
static uint16_t const _defaultPortVpp;
|
||||
static uint16_t const _defaultPortVst;
|
||||
static char const* _defaultHost;
|
||||
|
||||
private:
|
||||
|
|
|
@ -134,7 +134,7 @@ std::vector<std::string> EndpointList::all(
|
|||
prefix = "http+";
|
||||
break;
|
||||
|
||||
case Endpoint::TransportType::VPP:
|
||||
case Endpoint::TransportType::VST:
|
||||
prefix = "vst+";
|
||||
break;
|
||||
}
|
||||
|
@ -164,7 +164,7 @@ std::vector<std::string> EndpointList::all(
|
|||
// prefix = "http+";
|
||||
// break;
|
||||
//
|
||||
// case Endpoint::TransportType::VPP:
|
||||
// case Endpoint::TransportType::VST:
|
||||
// prefix = "vpp+";
|
||||
// break;
|
||||
// }
|
||||
|
|
|
@ -59,7 +59,7 @@ enum class ContentType {
|
|||
UNSET
|
||||
};
|
||||
|
||||
enum class ProtocolVersion { HTTP_1_0, HTTP_1_1, VPP_1_0, UNKNOWN };
|
||||
enum class ProtocolVersion { HTTP_1_0, HTTP_1_1, VST_1_0, VST_1_1, UNKNOWN };
|
||||
|
||||
enum class ConnectionType {
|
||||
C_NONE,
|
||||
|
|
|
@ -34,8 +34,10 @@ using namespace arangodb::basics;
|
|||
|
||||
std::string GeneralRequest::translateVersion(ProtocolVersion version) {
|
||||
switch (version) {
|
||||
case ProtocolVersion::VPP_1_0:
|
||||
return "VPP/1.0";
|
||||
case ProtocolVersion::VST_1_1:
|
||||
return "VST/1.1";
|
||||
case ProtocolVersion::VST_1_0:
|
||||
return "VST/1.0";
|
||||
case ProtocolVersion::HTTP_1_1:
|
||||
return "HTTP/1.1";
|
||||
case ProtocolVersion::HTTP_1_0:
|
||||
|
|
|
@ -91,7 +91,7 @@ class GeneralRequest {
|
|||
|
||||
public:
|
||||
ProtocolVersion protocolVersion() const { return _version; }
|
||||
char const* protocol() const { return _protocol; } // http, https or vpp
|
||||
char const* protocol() const { return _protocol; } // http, https or vst
|
||||
void setProtocol(char const* protocol) { _protocol = protocol; }
|
||||
|
||||
ConnectionInfo const& connectionInfo() const { return _connectionInfo; }
|
||||
|
@ -192,7 +192,7 @@ class GeneralRequest {
|
|||
|
||||
protected:
|
||||
ProtocolVersion _version;
|
||||
char const* _protocol; // http, https or vpp
|
||||
char const* _protocol; // http, https or vst
|
||||
|
||||
// connection info
|
||||
ConnectionInfo _connectionInfo;
|
||||
|
|
|
@ -21,8 +21,8 @@
|
|||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGODB_REST_VPP_MESSAGE_H
|
||||
#define ARANGODB_REST_VPP_MESSAGE_H 1
|
||||
#ifndef ARANGODB_REST_VST_MESSAGE_H
|
||||
#define ARANGODB_REST_VST_MESSAGE_H 1
|
||||
|
||||
#include <velocypack/Buffer.h>
|
||||
#include <velocypack/Options.h>
|
||||
|
@ -36,21 +36,21 @@
|
|||
namespace arangodb {
|
||||
namespace rest {
|
||||
|
||||
struct VppInputMessage {
|
||||
VppInputMessage() : _buffer(), _id(0), _payloadAmount(0), _payload() {}
|
||||
struct VstInputMessage {
|
||||
VstInputMessage() : _buffer(), _id(0), _payloadAmount(0), _payload() {}
|
||||
|
||||
// cppcheck-suppress *
|
||||
VppInputMessage(uint64_t id, VPackBuffer<uint8_t>&& buff,
|
||||
VstInputMessage(uint64_t id, VPackBuffer<uint8_t>&& buff,
|
||||
std::size_t amount = 1)
|
||||
: _buffer(std::move(buff)), _id(id), _payloadAmount(amount) {
|
||||
init();
|
||||
}
|
||||
|
||||
// no copy
|
||||
VppInputMessage(VppInputMessage const& other) = delete;
|
||||
VstInputMessage(VstInputMessage const& other) = delete;
|
||||
|
||||
// just move
|
||||
VppInputMessage(VppInputMessage&& other)
|
||||
VstInputMessage(VstInputMessage&& other)
|
||||
: _buffer(std::move(other._buffer)), _id(other._id), _payloadAmount(other._payloadAmount) {
|
||||
init();
|
||||
}
|
|
@ -21,8 +21,8 @@
|
|||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "VppRequest.h"
|
||||
#include "VppMessage.h"
|
||||
#include "VstRequest.h"
|
||||
#include "VstMessage.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/Iterator.h>
|
||||
|
@ -59,8 +59,8 @@ std::string const& lookupStringInMap(
|
|||
}
|
||||
}
|
||||
|
||||
VppRequest::VppRequest(ConnectionInfo const& connectionInfo,
|
||||
VppInputMessage&& message, uint64_t messageId)
|
||||
VstRequest::VstRequest(ConnectionInfo const& connectionInfo,
|
||||
VstInputMessage&& message, uint64_t messageId)
|
||||
: GeneralRequest(connectionInfo),
|
||||
_message(std::move(message)),
|
||||
_headers(nullptr),
|
||||
|
@ -72,13 +72,13 @@ VppRequest::VppRequest(ConnectionInfo const& connectionInfo,
|
|||
_user = "root";
|
||||
}
|
||||
|
||||
VPackSlice VppRequest::payload(VPackOptions const* options) {
|
||||
VPackSlice VstRequest::payload(VPackOptions const* options) {
|
||||
// message does not need to be validated here, as it was already
|
||||
// validated before
|
||||
return _message.payload();
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::string> const& VppRequest::headers()
|
||||
std::unordered_map<std::string, std::string> const& VstRequest::headers()
|
||||
const {
|
||||
if (!_headers) {
|
||||
using namespace std;
|
||||
|
@ -93,18 +93,18 @@ std::unordered_map<std::string, std::string> const& VppRequest::headers()
|
|||
return *_headers;
|
||||
}
|
||||
|
||||
std::string const& VppRequest::header(std::string const& key,
|
||||
std::string const& VstRequest::header(std::string const& key,
|
||||
bool& found) const {
|
||||
headers();
|
||||
return lookupStringInMap(*_headers, key, found);
|
||||
}
|
||||
|
||||
std::string const& VppRequest::header(std::string const& key) const {
|
||||
std::string const& VstRequest::header(std::string const& key) const {
|
||||
bool unused = true;
|
||||
return header(key, unused);
|
||||
}
|
||||
|
||||
void VppRequest::parseHeaderInformation() {
|
||||
void VstRequest::parseHeaderInformation() {
|
||||
using namespace std;
|
||||
auto vHeader = _message.header();
|
||||
try {
|
||||
|
@ -128,7 +128,7 @@ void VppRequest::parseHeaderInformation() {
|
|||
}
|
||||
}
|
||||
|
||||
// fullUrl should not be necessary for Vpp
|
||||
// fullUrl should not be necessary for Vst
|
||||
_fullUrl = _requestPath + "?";
|
||||
for (auto const& param : _values) {
|
||||
_fullUrl.append(param.first + "=" +
|
||||
|
@ -145,16 +145,16 @@ void VppRequest::parseHeaderInformation() {
|
|||
|
||||
} catch (std::exception const& e) {
|
||||
throw std::runtime_error(
|
||||
std::string("Error during Parsing of VppHeader: ") + e.what());
|
||||
std::string("Error during Parsing of VstHeader: ") + e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::string const& VppRequest::value(std::string const& key,
|
||||
std::string const& VstRequest::value(std::string const& key,
|
||||
bool& found) const {
|
||||
return lookupStringInMap(_values, key, found);
|
||||
}
|
||||
|
||||
std::string const& VppRequest::value(std::string const& key) const {
|
||||
std::string const& VstRequest::value(std::string const& key) const {
|
||||
bool unused = true;
|
||||
return value(key, unused);
|
||||
}
|
|
@ -22,12 +22,12 @@
|
|||
/// @author Achim Brandt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGODB_REST_VPP_REQUEST_H
|
||||
#define ARANGODB_REST_VPP_REQUEST_H 1
|
||||
#ifndef ARANGODB_REST_VST_REQUEST_H
|
||||
#define ARANGODB_REST_VST_REQUEST_H 1
|
||||
|
||||
#include "Endpoint/ConnectionInfo.h"
|
||||
#include "Rest/GeneralRequest.h"
|
||||
#include "Rest/VppMessage.h"
|
||||
#include "Rest/VstMessage.h"
|
||||
|
||||
#include <velocypack/Buffer.h>
|
||||
#include <velocypack/Builder.h>
|
||||
|
@ -41,8 +41,8 @@ class RestBatchHandler;
|
|||
|
||||
namespace rest {
|
||||
class GeneralCommTask;
|
||||
class VppCommTask;
|
||||
// class VppsCommTask;
|
||||
class VstCommTask;
|
||||
// class VstsCommTask;
|
||||
}
|
||||
|
||||
namespace velocypack {
|
||||
|
@ -50,20 +50,20 @@ class Builder;
|
|||
struct Options;
|
||||
}
|
||||
|
||||
using rest::VppInputMessage;
|
||||
using rest::VstInputMessage;
|
||||
|
||||
class VppRequest final : public GeneralRequest {
|
||||
friend class rest::VppCommTask;
|
||||
// friend class rest::VppsCommTask;
|
||||
class VstRequest final : public GeneralRequest {
|
||||
friend class rest::VstCommTask;
|
||||
// friend class rest::VstsCommTask;
|
||||
friend class rest::GeneralCommTask;
|
||||
friend class RestBatchHandler; // TODO must be removed
|
||||
|
||||
private:
|
||||
VppRequest(ConnectionInfo const& connectionInfo, VppInputMessage&& message,
|
||||
VstRequest(ConnectionInfo const& connectionInfo, VstInputMessage&& message,
|
||||
uint64_t messageId);
|
||||
|
||||
public:
|
||||
~VppRequest() {}
|
||||
~VstRequest() {}
|
||||
|
||||
public:
|
||||
uint64_t messageId() const override { return _messageId; }
|
||||
|
@ -74,7 +74,7 @@ class VppRequest final : public GeneralRequest {
|
|||
}
|
||||
|
||||
virtual arangodb::Endpoint::TransportType transportType() override {
|
||||
return arangodb::Endpoint::TransportType::VPP;
|
||||
return arangodb::Endpoint::TransportType::VST;
|
||||
};
|
||||
|
||||
std::unordered_map<std::string, std::string> const& headers() const override;
|
||||
|
@ -94,7 +94,7 @@ class VppRequest final : public GeneralRequest {
|
|||
std::string const& value(std::string const& key, bool& found) const override;
|
||||
|
||||
private:
|
||||
VppInputMessage _message;
|
||||
VstInputMessage _message;
|
||||
mutable std::unique_ptr<std::unordered_map<std::string, std::string>>
|
||||
_headers;
|
||||
// values are query parameters
|
|
@ -22,7 +22,7 @@
|
|||
/// @author Achim Brandt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "VppResponse.h"
|
||||
#include "VstResponse.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/Dumper.h>
|
||||
|
@ -35,20 +35,20 @@
|
|||
#include "Basics/VPackStringBufferAdapter.h"
|
||||
#include "Basics/tri-strings.h"
|
||||
#include "Meta/conversion.h"
|
||||
#include "Rest/VppRequest.h"
|
||||
#include "Rest/VstRequest.h"
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::basics;
|
||||
|
||||
bool VppResponse::HIDE_PRODUCT_HEADER = false;
|
||||
bool VstResponse::HIDE_PRODUCT_HEADER = false;
|
||||
|
||||
VppResponse::VppResponse(ResponseCode code, uint64_t id)
|
||||
VstResponse::VstResponse(ResponseCode code, uint64_t id)
|
||||
: GeneralResponse(code), _header(nullptr), _messageId(id) {
|
||||
_contentType = ContentType::VPACK;
|
||||
_connectionType = rest::ConnectionType::C_KEEP_ALIVE;
|
||||
}
|
||||
|
||||
void VppResponse::reset(ResponseCode code) {
|
||||
void VstResponse::reset(ResponseCode code) {
|
||||
_responseCode = code;
|
||||
_headers.clear();
|
||||
_connectionType = rest::ConnectionType::C_KEEP_ALIVE;
|
||||
|
@ -56,7 +56,7 @@ void VppResponse::reset(ResponseCode code) {
|
|||
_generateBody = false; // payload has to be set
|
||||
}
|
||||
|
||||
VPackMessageNoOwnBuffer VppResponse::prepareForNetwork() {
|
||||
VPackMessageNoOwnBuffer VstResponse::prepareForNetwork() {
|
||||
// initalize builder with vpackbuffer. then we do not need to
|
||||
// steal the header and can avoid the shared pointer
|
||||
VPackBuilder builder;
|
|
@ -21,29 +21,29 @@
|
|||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGODB_REST_VPP_RESPONSE_H
|
||||
#define ARANGODB_REST_VPP_RESPONSE_H 1
|
||||
#ifndef ARANGODB_REST_VST_RESPONSE_H
|
||||
#define ARANGODB_REST_VST_RESPONSE_H 1
|
||||
|
||||
#include "Basics/StringBuffer.h"
|
||||
#include "Rest/GeneralResponse.h"
|
||||
#include "Rest/VppMessage.h"
|
||||
#include "Rest/VstMessage.h"
|
||||
|
||||
namespace arangodb {
|
||||
class RestBatchHandler;
|
||||
|
||||
namespace rest {
|
||||
class VppCommTask;
|
||||
class VstCommTask;
|
||||
class GeneralCommTask;
|
||||
}
|
||||
|
||||
using rest::VPackMessageNoOwnBuffer;
|
||||
|
||||
class VppResponse : public GeneralResponse {
|
||||
class VstResponse : public GeneralResponse {
|
||||
friend class rest::GeneralCommTask;
|
||||
friend class rest::VppCommTask;
|
||||
friend class rest::VstCommTask;
|
||||
friend class RestBatchHandler; // TODO must be removed
|
||||
|
||||
VppResponse(ResponseCode code, uint64_t id);
|
||||
VstResponse(ResponseCode code, uint64_t id);
|
||||
|
||||
public:
|
||||
static bool HIDE_PRODUCT_HEADER;
|
||||
|
@ -52,7 +52,7 @@ class VppResponse : public GeneralResponse {
|
|||
virtual uint64_t messageId() const override { return _messageId; }
|
||||
void reset(ResponseCode code) final;
|
||||
virtual arangodb::Endpoint::TransportType transportType() override {
|
||||
return arangodb::Endpoint::TransportType::VPP;
|
||||
return arangodb::Endpoint::TransportType::VST;
|
||||
};
|
||||
|
||||
VPackMessageNoOwnBuffer prepareForNetwork();
|
Loading…
Reference in New Issue