mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
e08a2d0258
|
@ -40,6 +40,7 @@
|
|||
#include <velocypack/Validator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
|
@ -85,14 +86,12 @@ void VppCommTask::addResponse(VppResponse* response) {
|
|||
}
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "response -- end";
|
||||
|
||||
// FIXME (obi)
|
||||
// If the message is big we will create many small chunks in a loop.
|
||||
// For the first tests we just send single Messages
|
||||
|
||||
// adds chunk header infromation and creates SingBuffer* that can be
|
||||
// used with _writeBuffers
|
||||
auto buffers = createChunkForNetwork(slices, id,
|
||||
std::numeric_limits<std::size_t>::max());
|
||||
auto buffers =
|
||||
createChunkForNetwork(slices, id, std::numeric_limits<std::size_t>::max(),
|
||||
false); // set some sensible maxchunk
|
||||
// size and compression
|
||||
|
||||
double const totalTime = getAgent(id)->elapsedSinceReadStart();
|
||||
|
||||
|
@ -185,143 +184,17 @@ bool VppCommTask::processRead() {
|
|||
bool read_maybe_only_part_of_buffer = false;
|
||||
VppInputMessage message; // filled in CASE 1 or CASE 2b
|
||||
|
||||
// CASE 1: message is in one chunk
|
||||
if (chunkHeader._isFirst && chunkHeader._chunk == 1) {
|
||||
_agents.emplace(
|
||||
std::make_pair(chunkHeader._messageID, RequestStatisticsAgent(true)));
|
||||
|
||||
auto agent = getAgent(chunkHeader._messageID);
|
||||
agent->acquire();
|
||||
agent->requestStatisticsAgentSetReadStart();
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk contains single message";
|
||||
std::size_t payloads = 0;
|
||||
|
||||
try {
|
||||
payloads = validateAndCount(vpackBegin, chunkEnd);
|
||||
} catch (std::exception const& e) {
|
||||
handleSimpleError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, e.what(),
|
||||
chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!"
|
||||
<< e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} catch (...) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
// CASE 1: message is in one chunk
|
||||
if (auto rv = getMessageFromSingleChunk(chunkHeader, message, doExecute,
|
||||
vpackBegin, chunkEnd)) {
|
||||
return *rv;
|
||||
}
|
||||
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd));
|
||||
message.set(chunkHeader._messageID, std::move(buffer), payloads); // fixme
|
||||
|
||||
// message._header = VPackSlice(message._buffer.data());
|
||||
// if (payloadOffset) {
|
||||
// message._payload = VPackSlice(message._buffer.data() + payloadOffset);
|
||||
// }
|
||||
|
||||
doExecute = true;
|
||||
getAgent(chunkHeader._messageID)->requestStatisticsAgentSetReadEnd();
|
||||
}
|
||||
// CASE 2: message is in multiple chunks
|
||||
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID);
|
||||
|
||||
// CASE 2a: chunk starts new message
|
||||
if (chunkHeader._isFirst) { // first chunk of multi chunk message
|
||||
_agents.emplace(
|
||||
std::make_pair(chunkHeader._messageID, RequestStatisticsAgent(true)));
|
||||
|
||||
auto agent = getAgent(chunkHeader._messageID);
|
||||
agent->acquire();
|
||||
agent->requestStatisticsAgentSetReadStart();
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk starts a new message";
|
||||
if (incompleteMessageItr != _incompleteMessages.end()) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "Message should be first but is already in the Map of incomplete "
|
||||
"messages";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} else {
|
||||
if (auto rv = getMessageFromMultiChunks(chunkHeader, message, doExecute,
|
||||
vpackBegin, chunkEnd)) {
|
||||
return *rv;
|
||||
}
|
||||
|
||||
// TODO: is a 32bit value sufficient for the messageLength here?
|
||||
IncompleteVPackMessage message(
|
||||
static_cast<uint32_t>(chunkHeader._messageLength),
|
||||
chunkHeader._chunk /*number of chunks*/);
|
||||
message._buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd));
|
||||
auto insertPair = _incompleteMessages.emplace(
|
||||
std::make_pair(chunkHeader._messageID, std::move(message)));
|
||||
if (!insertPair.second) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "insert failed";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
|
||||
// CASE 2b: chunk continues a message
|
||||
} else { // followup chunk of some mesage
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk continues a message";
|
||||
if (incompleteMessageItr == _incompleteMessages.end()) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "found message without previous part";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
auto& im = incompleteMessageItr->second; // incomplete Message
|
||||
im._currentChunk++;
|
||||
assert(im._currentChunk == chunkHeader._chunk);
|
||||
im._buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd));
|
||||
// check buffer longer than length
|
||||
|
||||
// MESSAGE COMPLETE
|
||||
if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk completes a message";
|
||||
std::size_t payloads = 0;
|
||||
|
||||
try {
|
||||
payloads =
|
||||
validateAndCount(reinterpret_cast<char const*>(im._buffer.data()),
|
||||
reinterpret_cast<char const*>(
|
||||
im._buffer.data() + im._buffer.byteSize()));
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
handleSimpleError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, e.what(),
|
||||
chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!"
|
||||
<< e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} catch (...) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
|
||||
message.set(chunkHeader._messageID, std::move(im._buffer), payloads);
|
||||
_incompleteMessages.erase(incompleteMessageItr);
|
||||
// check length
|
||||
|
||||
doExecute = true;
|
||||
getAgent(chunkHeader._messageID)->requestStatisticsAgentSetReadEnd();
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "chunk does not complete a message";
|
||||
}
|
||||
|
||||
read_maybe_only_part_of_buffer = true;
|
||||
|
@ -455,3 +328,146 @@ void VppCommTask::handleSimpleError(rest::ResponseCode responseCode,
|
|||
_clientClosed = true;
|
||||
}
|
||||
}
|
||||
|
||||
boost::optional<bool> VppCommTask::getMessageFromSingleChunk(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd) {
|
||||
_agents.emplace(
|
||||
std::make_pair(chunkHeader._messageID, RequestStatisticsAgent(true)));
|
||||
|
||||
auto agent = getAgent(chunkHeader._messageID);
|
||||
agent->acquire();
|
||||
agent->requestStatisticsAgentSetReadStart();
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk contains single message";
|
||||
std::size_t payloads = 0;
|
||||
|
||||
try {
|
||||
payloads = validateAndCount(vpackBegin, chunkEnd);
|
||||
} catch (std::exception const& e) {
|
||||
handleSimpleError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, e.what(),
|
||||
chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!"
|
||||
<< e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} catch (...) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd));
|
||||
message.set(chunkHeader._messageID, std::move(buffer), payloads); // fixme
|
||||
|
||||
doExecute = true;
|
||||
getAgent(chunkHeader._messageID)->requestStatisticsAgentSetReadEnd();
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
boost::optional<bool> VppCommTask::getMessageFromMultiChunks(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd) {
|
||||
// CASE 2: message is in multiple chunks
|
||||
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID);
|
||||
|
||||
// CASE 2a: chunk starts new message
|
||||
if (chunkHeader._isFirst) { // first chunk of multi chunk message
|
||||
_agents.emplace(
|
||||
std::make_pair(chunkHeader._messageID, RequestStatisticsAgent(true)));
|
||||
|
||||
auto agent = getAgent(chunkHeader._messageID);
|
||||
agent->acquire();
|
||||
agent->requestStatisticsAgentSetReadStart();
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk starts a new message";
|
||||
if (incompleteMessageItr != _incompleteMessages.end()) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "Message should be first but is already in the Map of "
|
||||
"incomplete "
|
||||
"messages";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: is a 32bit value sufficient for the messageLength here?
|
||||
IncompleteVPackMessage message(
|
||||
static_cast<uint32_t>(chunkHeader._messageLength),
|
||||
chunkHeader._chunk /*number of chunks*/);
|
||||
message._buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd));
|
||||
auto insertPair = _incompleteMessages.emplace(
|
||||
std::make_pair(chunkHeader._messageID, std::move(message)));
|
||||
if (!insertPair.second) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "insert failed";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
|
||||
// CASE 2b: chunk continues a message
|
||||
} else { // followup chunk of some mesage
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk continues a message";
|
||||
if (incompleteMessageItr == _incompleteMessages.end()) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "found message without previous part";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
auto& im = incompleteMessageItr->second; // incomplete Message
|
||||
im._currentChunk++;
|
||||
assert(im._currentChunk == chunkHeader._chunk);
|
||||
im._buffer.append(vpackBegin, std::distance(vpackBegin, chunkEnd));
|
||||
// check buffer longer than length
|
||||
|
||||
// MESSAGE COMPLETE
|
||||
if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "chunk completes a message";
|
||||
std::size_t payloads = 0;
|
||||
|
||||
try {
|
||||
payloads =
|
||||
validateAndCount(reinterpret_cast<char const*>(im._buffer.data()),
|
||||
reinterpret_cast<char const*>(
|
||||
im._buffer.data() + im._buffer.byteSize()));
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
handleSimpleError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, e.what(),
|
||||
chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!"
|
||||
<< e.what();
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
} catch (...) {
|
||||
handleSimpleError(rest::ResponseCode::BAD, chunkHeader._messageID);
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: "
|
||||
<< "VPack Validation failed!";
|
||||
closeTask(rest::ResponseCode::BAD);
|
||||
return false;
|
||||
}
|
||||
|
||||
message.set(chunkHeader._messageID, std::move(im._buffer), payloads);
|
||||
_incompleteMessages.erase(incompleteMessageItr);
|
||||
// check length
|
||||
|
||||
doExecute = true;
|
||||
getAgent(chunkHeader._messageID)->requestStatisticsAgentSetReadEnd();
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "VppCommTask: "
|
||||
<< "chunk does not complete a message";
|
||||
}
|
||||
return boost::none;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "lib/Rest/VppRequest.h"
|
||||
#include "lib/Rest/VppResponse.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <stdexcept>
|
||||
|
||||
namespace arangodb {
|
||||
|
@ -122,6 +123,14 @@ class VppCommTask : public GeneralCommTask {
|
|||
ChunkHeader readChunkHeader(); // sub-function of processRead
|
||||
void replyToIncompleteMessages();
|
||||
|
||||
boost::optional<bool> getMessageFromSingleChunk(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd);
|
||||
|
||||
boost::optional<bool> getMessageFromMultiChunks(
|
||||
ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute,
|
||||
char const* vpackBegin, char const* chunkEnd);
|
||||
|
||||
// user
|
||||
// authenticated or not
|
||||
// database aus url
|
||||
|
|
|
@ -70,7 +70,7 @@ struct VppInputMessage {
|
|||
if (!_payload.empty()) {
|
||||
return _payload.front();
|
||||
}
|
||||
return VPackSlice{};
|
||||
return VPackSlice::noneSlice();
|
||||
}
|
||||
|
||||
std::vector<VPackSlice> const& payloads() const { return _payload; }
|
||||
|
@ -111,7 +111,7 @@ struct VPackMessageNoOwnBuffer {
|
|||
if (_payloads.size() && _generateBody) {
|
||||
return _payloads.front();
|
||||
}
|
||||
return arangodb::basics::VelocyPackHelper::NullValue();
|
||||
return VPackSlice::noneSlice();
|
||||
}
|
||||
|
||||
std::vector<VPackSlice> payloads() { return _payloads; }
|
||||
|
|
Loading…
Reference in New Issue