mirror of https://gitee.com/bigwinds/arangodb
more binary handling
This commit is contained in:
parent
dba9e6e29c
commit
61d49924be
|
@ -171,6 +171,7 @@ Handler::status_e RestBatchHandler::execute() {
|
|||
|
||||
|
||||
if (status == Handler::HANDLER_DONE) {
|
||||
// capture output of handler
|
||||
PB_ArangoBatchMessage* batch = _outputMessages->mutable_messages(i);
|
||||
handler->getResponse()->write(batch);
|
||||
}
|
||||
|
@ -183,13 +184,12 @@ Handler::status_e RestBatchHandler::execute() {
|
|||
}
|
||||
}
|
||||
|
||||
size_t messageSize = _outputMessages->ByteSize();
|
||||
|
||||
// allocate output buffer
|
||||
uint32_t messageSize = _outputMessages->ByteSize();
|
||||
char* output = (char*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(char) * messageSize, false);
|
||||
if (output == NULL) {
|
||||
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY, "out of memory");
|
||||
return Handler::HANDLER_DONE; //FAILED;
|
||||
return Handler::HANDLER_DONE;
|
||||
}
|
||||
|
||||
_response = new HttpResponse(HttpResponse::OK);
|
||||
|
@ -201,10 +201,10 @@ Handler::status_e RestBatchHandler::execute() {
|
|||
delete _response;
|
||||
|
||||
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY, "out of memory");
|
||||
return Handler::HANDLER_DONE; //FAILED;
|
||||
return Handler::HANDLER_DONE;
|
||||
}
|
||||
|
||||
_response->body().appendText(output, messageSize);
|
||||
_response->body().appendText(output, (size_t) messageSize);
|
||||
TRI_Free(TRI_UNKNOWN_MEM_ZONE, output);
|
||||
|
||||
// success
|
||||
|
|
|
@ -247,9 +247,25 @@ namespace triagens {
|
|||
_operationsCounter->incFailures();
|
||||
return;
|
||||
}
|
||||
|
||||
if (_endpoint->isBinary()) {
|
||||
PB_ArangoMessage returnMessage;
|
||||
|
||||
if (result->getHttpReturnCode() >= 400) {
|
||||
_operationsCounter->incFailures();
|
||||
if (! returnMessage.ParseFromArray(result->getBody().str().c_str(), result->getContentLength())) {
|
||||
_operationsCounter->incFailures();
|
||||
}
|
||||
else {
|
||||
for (int i = 0; i < returnMessage.messages_size(); ++i) {
|
||||
if (returnMessage.messages(i).blobresponse().status() >= 400) {
|
||||
_operationsCounter->incFailures();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (result->getHttpReturnCode() >= 400) {
|
||||
_operationsCounter->incFailures();
|
||||
}
|
||||
}
|
||||
delete result;
|
||||
}
|
||||
|
|
|
@ -256,13 +256,25 @@ namespace triagens {
|
|||
void addResponse (HttpResponse* response) {
|
||||
triagens::basics::StringBuffer * buffer;
|
||||
|
||||
response->setHeader("connection", strlen("connection"), "Close");
|
||||
|
||||
if (this->_closeRequested) {
|
||||
response->setHeader("connection", strlen("connection"), "Close");
|
||||
}
|
||||
else {
|
||||
// keep-alive is the default
|
||||
response->setHeader("connection", strlen("connection"), "Keep-Alive");
|
||||
}
|
||||
|
||||
// save header
|
||||
buffer = new triagens::basics::StringBuffer(TRI_UNKNOWN_MEM_ZONE);
|
||||
response->writeHeader(buffer);
|
||||
buffer->appendText("\0\0\0\0\0\0\0\0", 8);
|
||||
uint8_t* outPointer = (uint8_t*) buffer->c_str();
|
||||
|
||||
BinaryMessage::writeHeader((uint32_t) response->body().length(), outPointer);
|
||||
|
||||
buffer->appendText(response->body());
|
||||
|
||||
// LOGGER_TRACE << "binary body length: " << response->body().length() << ", hash: " << TRI_FnvHashPointer(response->body().c_str(), response->body().length());
|
||||
|
||||
this->_writeBuffers.push_back(buffer);
|
||||
|
||||
#ifdef TRI_ENABLE_FIGURES
|
||||
|
@ -271,8 +283,6 @@ namespace triagens {
|
|||
|
||||
#endif
|
||||
|
||||
LOGGER_TRACE << "HTTP WRITE FOR " << static_cast<Task*>(this) << ":\n" << buffer->c_str();
|
||||
|
||||
// clear body
|
||||
response->body().clear();
|
||||
|
||||
|
|
|
@ -120,7 +120,13 @@ namespace triagens {
|
|||
/// @brief encode the size_t length in a char[4]
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static void encodeLength (const uint32_t length, uint8_t* out) {
|
||||
static void writeHeader (const uint32_t length, uint8_t* out) {
|
||||
static const char* signature = getSignature();
|
||||
|
||||
for (size_t i = 0; i < 4; ++i) {
|
||||
*(out++) = (uint8_t) signature[i];
|
||||
}
|
||||
|
||||
*(out++) = ((uint32_t) length >> 24);
|
||||
*(out++) = ((uint32_t) length >> 16) & 0xff;
|
||||
*(out++) = ((uint32_t) length >> 8) & 0xff;
|
||||
|
|
|
@ -102,15 +102,39 @@ namespace triagens {
|
|||
break;
|
||||
}
|
||||
|
||||
case (IN_READ_HEADER): {
|
||||
case (IN_READ_HEADER):
|
||||
case (IN_READ_BODY): {
|
||||
if (_connection->handleRead(remainingTime, _readBuffer)) {
|
||||
switch (_state) {
|
||||
case (IN_READ_HEADER):
|
||||
// _result->setData(_readBuffer.c_str(), _readBuffer.length());
|
||||
// _result->getBody().write(_readBuffer.c_str(), _result->getContentLength());
|
||||
_result->setResultType(SimpleHttpResult::COMPLETE);
|
||||
_state = FINISHED;
|
||||
case (IN_READ_HEADER): {
|
||||
const size_t headerLength = BinaryMessage::getHeaderLength();
|
||||
if (_readBuffer.length() >= headerLength) {
|
||||
const size_t foundLength = BinaryMessage::decodeLength((uint8_t*) (_readBuffer.c_str() + 4));
|
||||
_result->setContentLength(foundLength);
|
||||
|
||||
if (foundLength + headerLength == _readBuffer.length()) {
|
||||
_result->getBody().write(_readBuffer.c_str() + headerLength, foundLength);
|
||||
_readBuffer.erase_front(_readBuffer.length());
|
||||
_result->setResultType(SimpleHttpResult::COMPLETE);
|
||||
_state = FINISHED;
|
||||
}
|
||||
else {
|
||||
_readBuffer.erase_front(headerLength);
|
||||
_state = IN_READ_BODY;
|
||||
readBody();
|
||||
}
|
||||
}
|
||||
else {
|
||||
setErrorMessage("return message truncated", errno);
|
||||
close();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case (IN_READ_BODY):
|
||||
readBody();
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -151,6 +175,17 @@ namespace triagens {
|
|||
_result->clear();
|
||||
}
|
||||
}
|
||||
|
||||
bool SimpleBinaryClient::readBody () {
|
||||
if (_readBuffer.length() >= _result->getContentLength()) {
|
||||
_result->getBody().write(_readBuffer.c_str(), _readBuffer.length());
|
||||
_readBuffer.erase_front(_readBuffer.length());
|
||||
_result->setResultType(SimpleHttpResult::COMPLETE);
|
||||
_state = FINISHED;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
SimpleHttpResult* SimpleBinaryClient::getResult () {
|
||||
switch (_state) {
|
||||
|
@ -193,18 +228,11 @@ namespace triagens {
|
|||
///////////////////// fill the write buffer //////////////////////////////
|
||||
_writeBuffer.clear();
|
||||
|
||||
// write signature
|
||||
const char* signature = BinaryMessage::getSignature();
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
_writeBuffer.appendChar(signature[i]);
|
||||
}
|
||||
// write some dummy output (will be overwritten later)
|
||||
_writeBuffer.appendText("00000000");
|
||||
|
||||
// write length
|
||||
uint8_t length[4];
|
||||
BinaryMessage::encodeLength(bodyLength, &length[0]);
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
_writeBuffer.appendChar((char) length[i]);
|
||||
}
|
||||
uint8_t* outPtr = (uint8_t*) _writeBuffer.c_str();
|
||||
BinaryMessage::writeHeader(bodyLength, outPtr);
|
||||
|
||||
_writeBuffer.appendText(body, bodyLength);
|
||||
|
||||
|
|
|
@ -96,6 +96,12 @@ namespace triagens {
|
|||
|
||||
private:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief read remainder of body until done
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool readBody ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get the result
|
||||
/// the caller has to delete the result object
|
||||
|
|
Loading…
Reference in New Issue