1
0
Fork 0

added pipelining

This commit is contained in:
Frank Celler 2014-10-19 23:17:21 +02:00
parent 82b38b022d
commit ad50ad066e
2 changed files with 161 additions and 118 deletions

View File

@ -230,6 +230,11 @@ namespace triagens {
res = processRead();
}
}
else if (! closed) {
if (this->_readPosition == 0 && this->_readBuffer->c_str() != this->_readBuffer->end()) {
res = processRead();
}
}
if (closed) {
res = false;
@ -242,6 +247,22 @@ namespace triagens {
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleWrite (bool& closed) {
bool res = SocketTask::handleWrite(closed);
if (! closed) {
if (this->_readPosition == 0 && this->_readBuffer->c_str() != this->_readBuffer->end()) {
res = processRead();
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////

View File

@ -108,27 +108,54 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
/// @brief reset the internal state
///
/// this method can be called to clean up when the request handling aborts
/// prematurely
////////////////////////////////////////////////////////////////////////////////
void resetState () {
if (this->_request != 0) {
delete this->_request;
this->_request = 0;
void resetState (bool close) {
if (close) {
if (this->_request != 0) {
delete this->_request;
this->_request = nullptr;
}
this->_requestPending = false;
this->_closeRequested = true;
}
else {
this->_readBuffer->erase_front(this->_bodyPosition + this->_bodyLength);
this->_requestPending = true;
}
this->_readPosition = 0;
this->_bodyPosition = 0;
this->_bodyLength = 0;
this->_readRequestBody = false;
this->_requestPending = false;
this->_httpVersion = HttpRequest::HTTP_UNKNOWN;
this->_requestType = HttpRequest::HTTP_REQUEST_ILLEGAL;
this->_fullUrl = "";
this->_denyCredentials = false;
this->_acceptDeflate = false;
this->_readRequestBody = false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief decide whether or not we should send back a www-authenticate header
////////////////////////////////////////////////////////////////////////////////
bool sendWwwAuthenticateHeader () const {
bool found;
string const value = this->_request->header("x-omit-www-authenticate", found);
return ! found;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get request compatibility
////////////////////////////////////////////////////////////////////////////////
int32_t getCompatibility () const {
if (this->_request != nullptr) {
return this->_request->compatibility();
}
return HttpRequest::MinCompatibility;
}
// -----------------------------------------------------------------------------
@ -143,22 +170,28 @@ namespace triagens {
bool processRead () {
if (this->_requestPending || this->_readBuffer->c_str() == 0) {
return true;
}
bool handleRequest = false;
// still trying to read the header fields
if (! this->_readRequestBody) {
#ifdef TRI_ENABLE_FIGURES
// starting a new request
if (this->_readPosition == 0 && this->_readBuffer->c_str() != this->_readBuffer->end()) {
#ifdef TRI_ENABLE_FIGURES
RequestStatisticsAgent::acquire();
RequestStatisticsAgentSetReadStart(this);
#endif
this->_httpVersion = HttpRequest::HTTP_UNKNOWN;
this->_requestType = HttpRequest::HTTP_REQUEST_ILLEGAL;
this->_fullUrl = "";
this->_denyCredentials = false;
this->_acceptDeflate = false;
}
#endif
const char * ptr = this->_readBuffer->c_str() + this->_readPosition;
const char * end = this->_readBuffer->end() - 3;
@ -170,18 +203,24 @@ namespace triagens {
}
// check if header is too large
size_t headerLength = ptr - this->_readBuffer->c_str();
if (headerLength > this->_maximalHeaderSize) {
LOG_WARNING("maximal header size is %d, request header size is %d", (int) this->_maximalHeaderSize, (int) headerLength);
// header is too large
HttpResponse response(HttpResponse::REQUEST_HEADER_FIELDS_TOO_LARGE, getCompatibility());
// we need to close the connection, because there is no way we
// know what to remove and then continue
this->resetState(true);
this->handleResponse(&response);
return true;
}
// header is complete
if (ptr < end) {
this->_readPosition = ptr - this->_readBuffer->c_str() + 4;
@ -194,10 +233,14 @@ namespace triagens {
if (this->_request == nullptr) {
LOG_ERROR("cannot generate request");
// internal server error
HttpResponse response(HttpResponse::SERVER_ERROR, getCompatibility());
// we need to close the connection, because there is no way we
// know how to remove the body and then continue
this->resetState(true);
this->handleResponse(&response);
this->resetState();
return true;
}
@ -209,28 +252,34 @@ namespace triagens {
if (_httpVersion != HttpRequest::HTTP_1_0 &&
_httpVersion != HttpRequest::HTTP_1_1) {
HttpResponse response(HttpResponse::HTTP_VERSION_NOT_SUPPORTED, getCompatibility());
// we need to close the connection, because there is no way we
// know what to remove and then continue
this->resetState(true);
this->handleResponse(&response);
this->resetState();
return true;
}
// check max URL length
_fullUrl = this->_request->fullUrl();
if (_fullUrl.size() > 16384) {
HttpResponse response(HttpResponse::REQUEST_URI_TOO_LONG, getCompatibility());
// we need to close the connection, because there is no way we
// know what to remove and then continue
this->resetState(true);
this->handleResponse(&response);
this->resetState();
return true;
}
// update the connection information, i. e. client and server addresses and ports
//this->_request->setConnectionInfo(this->_connectionInfo);
this->_request->setProtocol(S::protocol());
LOG_TRACE("server port %d, client port %d",
(int) this->_connectionInfo.serverPort,
(int) this->_connectionInfo.clientPort);
@ -238,14 +287,16 @@ namespace triagens {
// set body start to current position
this->_bodyPosition = this->_readPosition;
// keep track of the original value of the "origin" request header (if any)
// we need this value to handle CORS requests
this->_origin = this->_request->header("origin");
if (! this->_origin.empty()) {
// check for Access-Control-Allow-Credentials header
bool found;
string const& allowCredentials = this->_request->header("access-control-allow-credentials", found);
if (found) {
this->_denyCredentials = ! triagens::basics::StringUtils::boolean(allowCredentials);
}
@ -256,7 +307,6 @@ namespace triagens {
this->_requestType = this->_request->requestType();
#ifdef TRI_ENABLE_FIGURES
RequestStatisticsAgentSetRequestType(this, this->_requestType);
#endif
@ -270,11 +320,11 @@ namespace triagens {
case HttpRequest::HTTP_REQUEST_PUT:
case HttpRequest::HTTP_REQUEST_PATCH: {
// technically, sending a body for an HTTP DELETE request is not forbidden, but it is not explicitly supported
const bool expectContentLength = (this->_requestType == HttpRequest::HTTP_REQUEST_POST ||
this->_requestType == HttpRequest::HTTP_REQUEST_PUT ||
this->_requestType == HttpRequest::HTTP_REQUEST_PATCH ||
this->_requestType == HttpRequest::HTTP_REQUEST_OPTIONS ||
this->_requestType == HttpRequest::HTTP_REQUEST_DELETE);
const bool expectContentLength = (this->_requestType == HttpRequest::HTTP_REQUEST_POST
|| this->_requestType == HttpRequest::HTTP_REQUEST_PUT
|| this->_requestType == HttpRequest::HTTP_REQUEST_PATCH
|| this->_requestType == HttpRequest::HTTP_REQUEST_OPTIONS
|| this->_requestType == HttpRequest::HTTP_REQUEST_DELETE);
if (! checkContentLength(expectContentLength)) {
return true;
@ -283,6 +333,7 @@ namespace triagens {
if (this->_bodyLength == 0) {
handleRequest = true;
}
break;
}
@ -291,8 +342,11 @@ namespace triagens {
string(this->_readBuffer->c_str(), (this->_readPosition < 6 ? this->_readPosition : 6)).c_str());
// bad request, method not allowed
HttpResponse response(HttpResponse::METHOD_NOT_ALLOWED, getCompatibility());
// we need to close the connection, because there is no way we
// know what to remove and then continue
this->resetState(true);
this->handleResponse(&response);
this->resetState();
return true;
}
@ -307,9 +361,13 @@ namespace triagens {
if (scheduler != nullptr && ! scheduler->isActive()) {
// server is inactive and will intentionally respond with HTTP 503
LOG_TRACE("cannot serve request - server is inactive");
HttpResponse response(HttpResponse::SERVICE_UNAVAILABLE, getCompatibility());
// we need to close the connection, because there is no way we
// know what to remove and then continue
this->resetState(true);
this->handleResponse(&response);
this->resetState();
return true;
}
@ -344,23 +402,10 @@ namespace triagens {
// readRequestBody might have changed, so cannot use else
if (this->_readRequestBody) {
if (this->_bodyLength > this->_maximalBodySize) {
// request entity too large
LOG_WARNING("maximal body size is %d, request body size is %d",
(int) this->_maximalBodySize,
(int) this->_bodyLength);
HttpResponse response(HttpResponse::REQUEST_ENTITY_TOO_LARGE, getCompatibility());
this->handleResponse(&response);
this->resetState();
return true;
}
if (this->_readBuffer->length() - this->_bodyPosition < this->_bodyLength) {
// still more data to be read
SocketTask* socketTask = dynamic_cast<SocketTask*>(this);
if (socketTask) {
// set read request time-out
LOG_TRACE("waiting for rest of body to be received. request timeout set to 60 s");
@ -387,32 +432,8 @@ namespace triagens {
RequestStatisticsAgentSetReadEnd(this);
RequestStatisticsAgentAddReceivedBytes(this, this->_bodyPosition + this->_bodyLength);
this->_readBuffer->erase_front(this->_bodyPosition + this->_bodyLength);
if (this->_readBuffer->length() > 0) {
// we removed the front of the read buffer, but it still contains data.
// this means that the content-length header of the request must have been wrong
// (value in content-length header smaller than actual body size)
// check if there is invalid stuff left in the readbuffer
// whitespace is allowed
const char* p = this->_readBuffer->begin();
const char* e = this->_readBuffer->end();
while (p < e) {
const char c = *(p++);
if (c != '\n' && c != '\r' && c != ' ' && c != '\t' && c != '\0') {
LOG_WARNING("read buffer is not empty. probably got a wrong Content-Length header?");
HttpResponse response(HttpResponse::BAD, getCompatibility());
this->handleResponse(&response);
this->resetState();
return true;
}
}
}
this->_requestPending = true;
bool isOptions = (this->_requestType == HttpRequest::HTTP_REQUEST_OPTIONS);
this->resetState(false);
// .............................................................................
// keep-alive handling
@ -438,24 +459,20 @@ namespace triagens {
}
// we keep the connection open in all other cases (HTTP 1.1 or Keep-Alive header sent)
this->_readPosition = 0;
this->_bodyPosition = 0;
this->_bodyLength = 0;
auto const compatibility = this->_request->compatibility();
// .............................................................................
// authenticate
// .............................................................................
auto const compatibility = this->_request->compatibility();
HttpResponse::HttpResponseCode authResult = this->_server->getHandlerFactory()->authenticateRequest(this->_request);
// authenticated
// or an HTTP OPTIONS request. OPTIONS requests currently go unauthenticated
if (authResult == HttpResponse::OK || this->_requestType == HttpRequest::HTTP_REQUEST_OPTIONS) {
if (authResult == HttpResponse::OK || isOptions) {
// handle HTTP OPTIONS requests directly
if (this->_requestType == HttpRequest::HTTP_REQUEST_OPTIONS) {
if (isOptions) {
const string allowedMethods = "DELETE, GET, HEAD, PATCH, POST, PUT";
HttpResponse response(HttpResponse::OK, compatibility);
@ -485,27 +502,34 @@ namespace triagens {
this->handleResponse(&response);
this->resetState();
if (this->_request != 0) {
delete this->_request;
this->_request = nullptr;
}
// we're done
return true;
return processRead();
}
// end HTTP OPTIONS handling
HttpHandler* handler = this->_server->getHandlerFactory()->createHandler(this->_request);
bool ok = false;
if (handler == nullptr) {
LOG_TRACE("no handler is known, giving up");
HttpResponse response(HttpResponse::NOT_FOUND, compatibility);
this->handleResponse(&response);
this->resetState();
if (this->_request != 0) {
delete this->_request;
this->_request = nullptr;
}
}
else {
bool found;
string const& acceptEncoding = this->_request->header("accept-encoding", found);
if (found) {
if (acceptEncoding.find("deflate") != string::npos) {
_acceptDeflate = true;
@ -518,9 +542,7 @@ namespace triagens {
if (found && (asyncExecution == "true" || asyncExecution == "store")) {
// we have an async request
#ifdef TRI_ENABLE_FIGURES
RequestStatisticsAgentSetAsync(this);
#endif
@ -529,6 +551,7 @@ namespace triagens {
this->_request = nullptr;
uint64_t jobId = 0;
if (asyncExecution == "store") {
// persist the responses
ok = this->_server->handleRequestAsync(handler, &jobId);
@ -565,6 +588,8 @@ namespace triagens {
this->handleResponse(&response);
}
}
return processRead();
}
// not found
@ -581,7 +606,13 @@ namespace triagens {
.appendText("}");
this->handleResponse(&response);
this->resetState();
if (this->_request != 0) {
delete this->_request;
this->_request = nullptr;
}
return processRead();
}
// forbidden
@ -596,7 +627,13 @@ namespace triagens {
.appendText("}");
this->handleResponse(&response);
this->resetState();
if (this->_request != 0) {
delete this->_request;
this->_request = nullptr;
}
return processRead();
}
// not authenticated
@ -609,7 +646,11 @@ namespace triagens {
}
this->handleResponse(&response);
this->resetState();
if (this->_request != 0) {
delete this->_request;
this->_request = nullptr;
}
}
return processRead();
@ -623,6 +664,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
void addResponse (HttpResponse* response) {
// CORS response handling
if (! this->_origin.empty()) {
// the request contained an Origin header. We have to send back the
@ -645,7 +687,7 @@ namespace triagens {
}
// CORS request handling EOF
// set "connection" header
if (this->_closeRequested) {
response->setHeader("connection", strlen("connection"), "Close");
}
@ -673,6 +715,7 @@ namespace triagens {
// reserve some outbuffer size
triagens::basics::StringBuffer* buffer = new triagens::basics::StringBuffer(TRI_UNKNOWN_MEM_ZONE, responseBodyLength + 128);
// write header
response->writeHeader(buffer);
@ -684,9 +727,7 @@ namespace triagens {
this->_writeBuffers.push_back(buffer);
#ifdef TRI_ENABLE_FIGURES
this->_writeBuffersStats.push_back(RequestStatisticsAgent::transfer());
#endif
LOG_TRACE("HTTP WRITE FOR %p: %s", (void*) this, buffer->c_str());
@ -716,8 +757,11 @@ namespace triagens {
const int64_t bodyLength = this->_request->contentLength();
if (bodyLength < 0) {
// bad request, body length is < 0. this is a client error
HttpResponse response(HttpResponse::LENGTH_REQUIRED, getCompatibility());
this->resetState(true);
this->handleResponse(&response);
return false;
@ -730,9 +774,12 @@ namespace triagens {
}
if ((size_t) bodyLength > this->_maximalBodySize) {
// request entity too large
LOG_WARNING("maximal body size is %d, request body size is %d", (int) this->_maximalBodySize, (int) bodyLength);
// request entity too large
HttpResponse response(HttpResponse::REQUEST_ENTITY_TOO_LARGE, getCompatibility());
this->resetState(true);
this->handleResponse(&response);
return false;
@ -740,6 +787,7 @@ namespace triagens {
// set instance variable to content-length value
this->_bodyLength = (size_t) bodyLength;
if (this->_bodyLength > 0) {
// we'll read the body
this->_readRequestBody = true;
@ -749,32 +797,6 @@ namespace triagens {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief decide whether or not we should send back a www-authenticate header
////////////////////////////////////////////////////////////////////////////////
bool sendWwwAuthenticateHeader () const {
bool found;
string const value = this->_request->header("x-omit-www-authenticate", found);
if (found) {
return false;
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get request compatibility
////////////////////////////////////////////////////////////////////////////////
int32_t getCompatibility () const {
if (this->_request != nullptr) {
return this->_request->compatibility();
}
return HttpRequest::MinCompatibility;
}
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------