1
0
Fork 0
arangodb/arangod/GeneralServer/GeneralCommTask.cpp

261 lines
7.3 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Achim Brandt
/// @author Dr. Frank Celler
////////////////////////////////////////////////////////////////////////////////
#include "GeneralCommTask.h"
#include "Basics/HybridLogicalClock.h"
#include "Basics/MutexLocker.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringBuffer.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/RestHandlerFactory.h"
#include "Logger/Logger.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::rest;
GeneralCommTask::GeneralCommTask(GeneralServer* server, TRI_socket_t socket,
ConnectionInfo&& info, double keepAliveTimeout)
: Task("GeneralCommTask"),
SocketTask(socket, keepAliveTimeout),
_server(server),
_connectionInfo(std::move(info)) {
LOG(TRACE) << "connection established, client "
<< TRI_get_fd_or_handle_of_socket(socket) << ", server ip "
<< _connectionInfo.serverAddress << ", server port "
<< _connectionInfo.serverPort << ", client ip "
<< _connectionInfo.clientAddress << ", client port "
<< _connectionInfo.clientPort;
}
GeneralCommTask::~GeneralCommTask() {
LOG(TRACE) << "connection closed, client "
<< TRI_get_fd_or_handle_of_socket(_commSocket);
for (auto& i : _writeBuffers) {
delete i;
}
for (auto& i : _writeBuffersStats) {
TRI_ReleaseRequestStatistics(i);
}
clearRequest();
}
void GeneralCommTask::signalTask(TaskData* data) {
// data response
if (data->_type == TaskData::TASK_DATA_RESPONSE) {
data->RequestStatisticsAgent::transferTo(this);
GeneralResponse* response = data->_response.get();
if (response != nullptr) {
processResponse(response);
processRead();
} else {
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR);
}
}
// buffer response
else if (data->_type == TaskData::TASK_DATA_BUFFER) {
data->RequestStatisticsAgent::transferTo(this);
HttpResponse response(GeneralResponse::ResponseCode::OK);
velocypack::Slice slice(data->_buffer->data());
response.setPayload(_request, slice, true, VPackOptions::Defaults);
processResponse(&response);
processRead();
}
// data chunk
else if (data->_type == TaskData::TASK_DATA_CHUNK) {
handleChunk(data->_data.c_str(), data->_data.size());
}
// do not know, what to do - give up
else {
_scheduler->destroyTask(this);
}
}
bool GeneralCommTask::handleRead() {
bool res = true;
if (!_closeRequested) {
res = fillReadBuffer();
// process as much data as we got; there might be more than one
// request in the buffer
while (processRead()) {
if (_closeRequested) {
break;
}
}
} else {
// if we don't close here, the scheduler thread may fall into a
// busy wait state, consuming 100% CPU!
_clientClosed = true;
}
if (_clientClosed) {
res = false;
} else if (!res) {
_clientClosed = true;
}
return res;
}
void GeneralCommTask::fillWriteBuffer() {
if (!hasWriteBuffer() && !_writeBuffers.empty()) {
StringBuffer* buffer = _writeBuffers.front();
_writeBuffers.pop_front();
TRI_ASSERT(buffer != nullptr);
TRI_request_statistics_t* statistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
setWriteBuffer(buffer, statistics);
}
}
void GeneralCommTask::executeRequest(GeneralRequest* request,
GeneralResponse* response) {
// create a handler for this request
WorkItem::uptr<RestHandler> handler(
GeneralServerFeature::HANDLER_FACTORY->createHandler(request, response));
if (handler == nullptr) {
LOG(TRACE) << "no handler is known, giving up";
clearRequest();
delete response;
handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND);
return;
}
handler->setTaskId(_taskId, _loop);
// check for an async request
bool found = false;
std::string const& asyncExecution =
request->header(StaticStrings::Async, found);
// the responsibility for the request has been moved to the handler
// TODO(fc) _request should
_request = nullptr;
// async execution
bool ok = false;
if (found && (asyncExecution == "true" || asyncExecution == "store")) {
requestStatisticsAgentSetAsync();
uint64_t jobId = 0;
if (asyncExecution == "store") {
// persist the responses
ok = _server->handleRequestAsync(this, handler, &jobId);
} else {
// don't persist the responses
ok = _server->handleRequestAsync(this, handler, nullptr);
}
if (ok) {
HttpResponse response(GeneralResponse::ResponseCode::ACCEPTED);
if (jobId > 0) {
// return the job id we just created
response.setHeaderNC(StaticStrings::AsyncId, StringUtils::itoa(jobId));
}
processResponse(&response);
return;
}
}
// synchronous request
else {
ok = _server->handleRequest(this, handler);
}
if (!ok) {
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR);
}
}
void GeneralCommTask::processResponse(GeneralResponse* response) {
if (response == nullptr) {
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR);
} else {
addResponse(response, false);
}
}
void GeneralCommTask::handleSimpleError(GeneralResponse::ResponseCode code) {
HttpResponse response(code);
addResponse(&response, true);
}
void GeneralCommTask::handleSimpleError(
GeneralResponse::ResponseCode responseCode, int errorNum,
std::string const& errorMessage) {
HttpResponse response(responseCode);
VPackBuilder builder;
builder.openObject();
builder.add(StaticStrings::Error, VPackValue(true));
builder.add(StaticStrings::ErrorNum, VPackValue(errorNum));
builder.add(StaticStrings::ErrorMessage, VPackValue(errorMessage));
builder.add(StaticStrings::Code, VPackValue((int)responseCode));
builder.close();
try {
response.setPayload(_request, builder.slice(), true,
VPackOptions::Defaults);
clearRequest();
processResponse(&response);
} catch (...) {
addResponse(&response, true);
}
}
// TODO(fc) MOVE TO SOCKET TASK
bool GeneralCommTask::handleEvent(EventToken token, EventType events) {
// destroy this task if client is closed
bool result = SocketTask::handleEvent(token, events);
if (_clientClosed) {
_scheduler->destroyTask(this);
}
return result;
}