From 61d49924be9119c5d39f503e2c1db9598f811e6c Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 20 Sep 2012 13:11:33 +0200 Subject: [PATCH] more binary handling --- arangod/RestHandler/RestBatchHandler.cpp | 10 ++-- arangosh/V8Client/BenchmarkThread.h | 20 ++++++- lib/BinaryServer/BinaryCommTask.h | 20 +++++-- lib/BinaryServer/BinaryMessage.h | 8 ++- lib/SimpleHttpClient/SimpleBinaryClient.cpp | 62 +++++++++++++++------ lib/SimpleHttpClient/SimpleBinaryClient.h | 6 ++ 6 files changed, 96 insertions(+), 30 deletions(-) diff --git a/arangod/RestHandler/RestBatchHandler.cpp b/arangod/RestHandler/RestBatchHandler.cpp index 16bd8250cb..a76660314c 100644 --- a/arangod/RestHandler/RestBatchHandler.cpp +++ b/arangod/RestHandler/RestBatchHandler.cpp @@ -171,6 +171,7 @@ Handler::status_e RestBatchHandler::execute() { if (status == Handler::HANDLER_DONE) { + // capture output of handler PB_ArangoBatchMessage* batch = _outputMessages->mutable_messages(i); handler->getResponse()->write(batch); } @@ -183,13 +184,12 @@ Handler::status_e RestBatchHandler::execute() { } } - size_t messageSize = _outputMessages->ByteSize(); - // allocate output buffer + uint32_t messageSize = _outputMessages->ByteSize(); char* output = (char*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(char) * messageSize, false); if (output == NULL) { generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY, "out of memory"); - return Handler::HANDLER_DONE; //FAILED; + return Handler::HANDLER_DONE; } _response = new HttpResponse(HttpResponse::OK); @@ -201,10 +201,10 @@ Handler::status_e RestBatchHandler::execute() { delete _response; generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY, "out of memory"); - return Handler::HANDLER_DONE; //FAILED; + return Handler::HANDLER_DONE; } - _response->body().appendText(output, messageSize); + _response->body().appendText(output, (size_t) messageSize); TRI_Free(TRI_UNKNOWN_MEM_ZONE, output); // success diff --git a/arangosh/V8Client/BenchmarkThread.h b/arangosh/V8Client/BenchmarkThread.h index 51efc0f05f..a66f66803b 100644 --- a/arangosh/V8Client/BenchmarkThread.h +++ b/arangosh/V8Client/BenchmarkThread.h @@ -247,9 +247,25 @@ namespace triagens { _operationsCounter->incFailures(); return; } + + if (_endpoint->isBinary()) { + PB_ArangoMessage returnMessage; - if (result->getHttpReturnCode() >= 400) { - _operationsCounter->incFailures(); + if (! returnMessage.ParseFromArray(result->getBody().str().c_str(), result->getContentLength())) { + _operationsCounter->incFailures(); + } + else { + for (int i = 0; i < returnMessage.messages_size(); ++i) { + if (returnMessage.messages(i).blobresponse().status() >= 400) { + _operationsCounter->incFailures(); + } + } + } + } + else { + if (result->getHttpReturnCode() >= 400) { + _operationsCounter->incFailures(); + } } delete result; } diff --git a/lib/BinaryServer/BinaryCommTask.h b/lib/BinaryServer/BinaryCommTask.h index 4d535c6e4b..b452c56d22 100644 --- a/lib/BinaryServer/BinaryCommTask.h +++ b/lib/BinaryServer/BinaryCommTask.h @@ -256,13 +256,25 @@ namespace triagens { void addResponse (HttpResponse* response) { triagens::basics::StringBuffer * buffer; - response->setHeader("connection", strlen("connection"), "Close"); - + if (this->_closeRequested) { + response->setHeader("connection", strlen("connection"), "Close"); + } + else { + // keep-alive is the default + response->setHeader("connection", strlen("connection"), "Keep-Alive"); + } + // save header buffer = new triagens::basics::StringBuffer(TRI_UNKNOWN_MEM_ZONE); - response->writeHeader(buffer); + buffer->appendText("\0\0\0\0\0\0\0\0", 8); + uint8_t* outPointer = (uint8_t*) buffer->c_str(); + + BinaryMessage::writeHeader((uint32_t) response->body().length(), outPointer); + buffer->appendText(response->body()); + // LOGGER_TRACE << "binary body length: " << response->body().length() << ", hash: " << TRI_FnvHashPointer(response->body().c_str(), response->body().length()); + this->_writeBuffers.push_back(buffer); #ifdef TRI_ENABLE_FIGURES @@ -271,8 +283,6 @@ namespace triagens { #endif - LOGGER_TRACE << "HTTP WRITE FOR " << static_cast(this) << ":\n" << buffer->c_str(); - // clear body response->body().clear(); diff --git a/lib/BinaryServer/BinaryMessage.h b/lib/BinaryServer/BinaryMessage.h index a36a933d3d..656dd96a77 100644 --- a/lib/BinaryServer/BinaryMessage.h +++ b/lib/BinaryServer/BinaryMessage.h @@ -120,7 +120,13 @@ namespace triagens { /// @brief encode the size_t length in a char[4] //////////////////////////////////////////////////////////////////////////////// - static void encodeLength (const uint32_t length, uint8_t* out) { + static void writeHeader (const uint32_t length, uint8_t* out) { + static const char* signature = getSignature(); + + for (size_t i = 0; i < 4; ++i) { + *(out++) = (uint8_t) signature[i]; + } + *(out++) = ((uint32_t) length >> 24); *(out++) = ((uint32_t) length >> 16) & 0xff; *(out++) = ((uint32_t) length >> 8) & 0xff; diff --git a/lib/SimpleHttpClient/SimpleBinaryClient.cpp b/lib/SimpleHttpClient/SimpleBinaryClient.cpp index d1e11c6e63..203d7cf546 100644 --- a/lib/SimpleHttpClient/SimpleBinaryClient.cpp +++ b/lib/SimpleHttpClient/SimpleBinaryClient.cpp @@ -102,15 +102,39 @@ namespace triagens { break; } - case (IN_READ_HEADER): { + case (IN_READ_HEADER): + case (IN_READ_BODY): { if (_connection->handleRead(remainingTime, _readBuffer)) { switch (_state) { - case (IN_READ_HEADER): - // _result->setData(_readBuffer.c_str(), _readBuffer.length()); - // _result->getBody().write(_readBuffer.c_str(), _result->getContentLength()); - _result->setResultType(SimpleHttpResult::COMPLETE); - _state = FINISHED; + case (IN_READ_HEADER): { + const size_t headerLength = BinaryMessage::getHeaderLength(); + if (_readBuffer.length() >= headerLength) { + const size_t foundLength = BinaryMessage::decodeLength((uint8_t*) (_readBuffer.c_str() + 4)); + _result->setContentLength(foundLength); + + if (foundLength + headerLength == _readBuffer.length()) { + _result->getBody().write(_readBuffer.c_str() + headerLength, foundLength); + _readBuffer.erase_front(_readBuffer.length()); + _result->setResultType(SimpleHttpResult::COMPLETE); + _state = FINISHED; + } + else { + _readBuffer.erase_front(headerLength); + _state = IN_READ_BODY; + readBody(); + } + } + else { + setErrorMessage("return message truncated", errno); + close(); + } break; + } + + case (IN_READ_BODY): + readBody(); + break; + default: break; } @@ -151,6 +175,17 @@ namespace triagens { _result->clear(); } } + + bool SimpleBinaryClient::readBody () { + if (_readBuffer.length() >= _result->getContentLength()) { + _result->getBody().write(_readBuffer.c_str(), _readBuffer.length()); + _readBuffer.erase_front(_readBuffer.length()); + _result->setResultType(SimpleHttpResult::COMPLETE); + _state = FINISHED; + } + + return true; + } SimpleHttpResult* SimpleBinaryClient::getResult () { switch (_state) { @@ -193,18 +228,11 @@ namespace triagens { ///////////////////// fill the write buffer ////////////////////////////// _writeBuffer.clear(); - // write signature - const char* signature = BinaryMessage::getSignature(); - for (int i = 0; i < 4; ++i) { - _writeBuffer.appendChar(signature[i]); - } + // write some dummy output (will be overwritten later) + _writeBuffer.appendText("00000000"); - // write length - uint8_t length[4]; - BinaryMessage::encodeLength(bodyLength, &length[0]); - for (int i = 0; i < 4; ++i) { - _writeBuffer.appendChar((char) length[i]); - } + uint8_t* outPtr = (uint8_t*) _writeBuffer.c_str(); + BinaryMessage::writeHeader(bodyLength, outPtr); _writeBuffer.appendText(body, bodyLength); diff --git a/lib/SimpleHttpClient/SimpleBinaryClient.h b/lib/SimpleHttpClient/SimpleBinaryClient.h index 1ad08a1edf..e3fef7ae7d 100644 --- a/lib/SimpleHttpClient/SimpleBinaryClient.h +++ b/lib/SimpleHttpClient/SimpleBinaryClient.h @@ -96,6 +96,12 @@ namespace triagens { private: + //////////////////////////////////////////////////////////////////////////////// + /// @brief read remainder of body until done + //////////////////////////////////////////////////////////////////////////////// + + bool readBody (); + //////////////////////////////////////////////////////////////////////////////// /// @brief get the result /// the caller has to delete the result object