1
0
Fork 0

Merge remote-tracking branch 'origin/obi-velocystream-frank' into obi-velocystream

* origin/obi-velocystream-frank:
  fixed ownership of handler

Conflicts:
	arangod/GeneralServer/VppCommTask.cpp
This commit is contained in:
Jan Christoph Uhde 2016-08-19 09:58:27 +02:00
commit 61cb8f561f
47 changed files with 751 additions and 1267 deletions

View File

@ -95,8 +95,8 @@ bool RestActionHandler::cancel() { return _action->cancel(&_dataLock, &_data); }
////////////////////////////////////////////////////////////////////////////////
TRI_action_result_t RestActionHandler::executeAction() {
TRI_action_result_t result =
_action->execute(_vocbase, _request, _response, &_dataLock, &_data);
TRI_action_result_t result = _action->execute(
_vocbase, _request.get(), _response.get(), &_dataLock, &_data);
if (!result.isValid) {
if (result.canceled) {

View File

@ -257,6 +257,7 @@ add_executable(${BIN_ARANGOD}
RestServer/UnitTestsFeature.cpp
RestServer/UpgradeFeature.cpp
RestServer/VocbaseContext.cpp
RestServer/WorkMonitorFeature.cpp
RestServer/arangod.cpp
Scheduler/ListenTask.cpp
Scheduler/PeriodicTask.cpp

View File

@ -23,13 +23,13 @@
#include "Cluster/ClusterComm.h"
#include "Logger/Logger.h"
#include "Basics/ConditionLocker.h"
#include "Basics/HybridLogicalClock.h"
#include "Basics/StringUtils.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Dispatcher/DispatcherThread.h"
#include "Logger/Logger.h"
//#include "Rest/FakeRequest.h"
#include "SimpleHttpClient/ConnectionManager.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
@ -113,12 +113,12 @@ void ClusterCommResult::setDestination(std::string const& dest,
}
}
}
/// @brief stringify the internal error state
std::string ClusterCommResult::stringifyErrorMessage() const {
// append status string
std::string result(stringifyStatus(status));
if (!serverID.empty()) {
result.append(", cluster node: '");
result.append(serverID);
@ -130,7 +130,7 @@ std::string ClusterCommResult::stringifyErrorMessage() const {
result.append(shardID);
result.push_back('\'');
}
if (!endpoint.empty()) {
result.append(", endpoint: '");
result.append(endpoint);
@ -145,7 +145,7 @@ std::string ClusterCommResult::stringifyErrorMessage() const {
return result;
}
/// @brief return an error code for a result
int ClusterCommResult::getErrorCode() const {
switch (status) {
@ -160,17 +160,17 @@ int ClusterCommResult::getErrorCode() const {
case CL_COMM_ERROR:
return TRI_ERROR_INTERNAL;
case CL_COMM_DROPPED:
return TRI_ERROR_INTERNAL;
case CL_COMM_BACKEND_UNAVAILABLE:
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
}
return TRI_ERROR_INTERNAL;
}
/// @brief stringify a cluster comm status
char const* ClusterCommResult::stringifyStatus(ClusterCommOpStatus status) {
switch (status) {
@ -967,14 +967,15 @@ void ClusterComm::asyncAnswer(std::string& coordinatorHeader,
/// DBServer node.
////////////////////////////////////////////////////////////////////////////////
std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
GeneralRequest* answer) {
std::string ClusterComm::processAnswer(
std::string const& coordinatorHeader,
std::unique_ptr<GeneralRequest>&& answer) {
if (answer == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
TRI_ASSERT(answer != nullptr);
// First take apart the header to get the operaitonID:
// First take apart the header to get the operationID:
OperationID operationID;
size_t start = 0;
size_t pos;
@ -1003,9 +1004,9 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
if (i != receivedByOpID.end()) {
TRI_ASSERT(answer != nullptr);
ClusterCommOperation* op = *(i->second);
op->result.answer.reset(answer);
op->result.answer = std::move(answer);
op->result.answer_code = GeneralResponse::responseCode(
answer->header("x-arango-response-code"));
op->result.answer->header("x-arango-response-code"));
op->result.status = CL_COMM_RECEIVED;
// Do we have to do a callback?
if (nullptr != op->callback.get()) {
@ -1028,9 +1029,9 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
if (i != toSendByOpID.end()) {
TRI_ASSERT(answer != nullptr);
ClusterCommOperation* op = *(i->second);
op->result.answer.reset(answer);
op->result.answer = std::move(answer);
op->result.answer_code = GeneralResponse::responseCode(
answer->header("x-arango-response-code"));
op->result.answer->header("x-arango-response-code"));
op->result.status = CL_COMM_RECEIVED;
if (nullptr != op->callback) {
if ((*op->callback)(&op->result)) {
@ -1044,7 +1045,6 @@ std::string ClusterComm::processAnswer(std::string& coordinatorHeader,
}
} else {
// Nothing known about the request, get rid of it:
delete answer;
return std::string("operation was already dropped by sender");
}
}
@ -1437,17 +1437,18 @@ void ClusterCommThread::run() {
}
} else {
if (nullptr != op->body.get()) {
LOG(DEBUG) << "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype)
.c_str() << " request to DB server '"
<< op->result.serverID << "' at endpoint '"
<< op->result.endpoint << "': " << op->body->c_str();
LOG(DEBUG)
<< "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype).c_str()
<< " request to DB server '" << op->result.serverID
<< "' at endpoint '" << op->result.endpoint
<< "': " << op->body->c_str();
} else {
LOG(DEBUG) << "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype)
.c_str() << " request to DB server '"
<< op->result.serverID << "' at endpoint '"
<< op->result.endpoint << "'";
LOG(DEBUG)
<< "sending "
<< arangodb::HttpRequest::translateMethod(op->reqtype).c_str()
<< " request to DB server '" << op->result.serverID
<< "' at endpoint '" << op->result.endpoint << "'";
}
auto client =

View File

@ -428,8 +428,8 @@ class ClusterComm {
/// @brief process an answer coming in on the HTTP socket
//////////////////////////////////////////////////////////////////////////////
std::string processAnswer(std::string& coordinatorHeader,
GeneralRequest* answer);
std::string processAnswer(std::string const& coordinatorHeader,
std::unique_ptr<GeneralRequest>&& answer);
//////////////////////////////////////////////////////////////////////////////
/// @brief send an answer HTTP request to a coordinator

View File

@ -97,7 +97,7 @@ AsyncJobResult::AsyncJobResult(IdType jobId, GeneralResponse* response,
AsyncJobResult::~AsyncJobResult() {}
AsyncJobManager::AsyncJobManager(callback_fptr callback)
: _lock(), _jobs(), callback(callback) {}
: _lock(), _jobs(), _callback(callback) {}
AsyncJobManager::~AsyncJobManager() {
// remove all results that haven't been fetched
@ -280,9 +280,8 @@ void AsyncJobManager::initAsyncJob(HttpServerJob* job, char const* hdr) {
/// @brief finishes the execution of an async job
////////////////////////////////////////////////////////////////////////////////
void AsyncJobManager::finishAsyncJob(AsyncJobResult::IdType jobId,
GeneralResponse* response) {
double const now = TRI_microtime();
void AsyncJobManager::finishAsyncJob(
AsyncJobResult::IdType jobId, std::unique_ptr<GeneralResponse> response) {
AsyncCallbackContext* ctx = nullptr;
{
@ -290,33 +289,25 @@ void AsyncJobManager::finishAsyncJob(AsyncJobResult::IdType jobId,
auto it = _jobs.find(jobId);
if (it == _jobs.end()) {
delete response;
return;
}
ctx = (*it).second._ctx;
if (nullptr != ctx) {
_jobs.erase(it);
} else {
(*it).second._response = response;
(*it).second._response = response.release();
(*it).second._status = AsyncJobResult::JOB_DONE;
(*it).second._stamp = now;
ctx = (*it).second._ctx;
if (ctx != nullptr) {
// we have found a context object, so we can immediately remove the job
// from the list of "done" jobs
_jobs.erase(it);
}
(*it).second._stamp = TRI_microtime();
}
}
// If there is a callback context, the job is no longer in the
// list of "done" jobs, so we have to free the response and the
// callback context:
if (nullptr != ctx && nullptr != callback) {
callback(ctx->getCoordinatorHeader(), response);
delete ctx;
if (response != nullptr) {
delete response;
if (nullptr != ctx) {
if (nullptr != _callback) {
_callback(ctx->getCoordinatorHeader(), response.get());
}
delete ctx;
}
}

View File

@ -40,61 +40,22 @@ class HttpServerJob;
struct AsyncJobResult {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief job states
//////////////////////////////////////////////////////////////////////////////
typedef enum { JOB_UNDEFINED, JOB_PENDING, JOB_DONE } Status;
//////////////////////////////////////////////////////////////////////////////
/// @brief id typedef
//////////////////////////////////////////////////////////////////////////////
typedef uint64_t IdType;
//////////////////////////////////////////////////////////////////////////////
/// @brief constructor for an unspecified job result
//////////////////////////////////////////////////////////////////////////////
public:
AsyncJobResult();
//////////////////////////////////////////////////////////////////////////////
/// @brief constructor for a specific job result
//////////////////////////////////////////////////////////////////////////////
AsyncJobResult(IdType jobId, GeneralResponse* response, double stamp,
Status status, AsyncCallbackContext* ctx);
~AsyncJobResult();
//////////////////////////////////////////////////////////////////////////////
/// @brief job id
//////////////////////////////////////////////////////////////////////////////
public:
IdType _jobId;
//////////////////////////////////////////////////////////////////////////////
/// @brief the full HTTP response object of the job, can be 0
//////////////////////////////////////////////////////////////////////////////
GeneralResponse* _response;
//////////////////////////////////////////////////////////////////////////////
/// @brief job creation stamp
//////////////////////////////////////////////////////////////////////////////
double _stamp;
//////////////////////////////////////////////////////////////////////////////
/// @brief job status
//////////////////////////////////////////////////////////////////////////////
Status _status;
//////////////////////////////////////////////////////////////////////////////
/// @brief callback context object (normally 0, used in cluster operations)
//////////////////////////////////////////////////////////////////////////////
AsyncCallbackContext* _ctx;
};
@ -178,7 +139,8 @@ class AsyncJobManager {
/// @brief finishes the execution of an async job
//////////////////////////////////////////////////////////////////////////////
void finishAsyncJob(AsyncJobResult::IdType jobId, GeneralResponse*);
void finishAsyncJob(AsyncJobResult::IdType jobId,
std::unique_ptr<GeneralResponse>);
private:
//////////////////////////////////////////////////////////////////////////////
@ -197,7 +159,7 @@ class AsyncJobManager {
/// @brief function pointer for callback registered at initialization
//////////////////////////////////////////////////////////////////////////////
callback_fptr callback;
callback_fptr _callback;
};
}
}

View File

@ -20,6 +20,7 @@
///
/// @author Achim Brandt
/// @author Dr. Frank Celler
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "GeneralCommTask.h"
@ -27,16 +28,15 @@
#include "Basics/HybridLogicalClock.h"
#include "Basics/MutexLocker.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringBuffer.h"
#include "Meta/conversion.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/RestHandlerFactory.h"
#include "Logger/Logger.h"
#include "Meta/conversion.h"
#include "Rest/VppResponse.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Rest/VppResponse.h"
using namespace arangodb;
using namespace arangodb::basics;
@ -45,65 +45,15 @@ using namespace arangodb::rest;
GeneralCommTask::GeneralCommTask(GeneralServer* server, TRI_socket_t socket,
ConnectionInfo&& info, double keepAliveTimeout)
: Task("GeneralCommTask"),
SocketTask(socket, keepAliveTimeout),
_server(server),
_connectionInfo(std::move(info)) {
LOG(TRACE) << "connection established, client "
<< TRI_get_fd_or_handle_of_socket(socket) << ", server ip "
<< _connectionInfo.serverAddress << ", server port "
<< _connectionInfo.serverPort << ", client ip "
<< _connectionInfo.clientAddress << ", client port "
<< _connectionInfo.clientPort;
}
GeneralCommTask::~GeneralCommTask() {
LOG(TRACE) << "connection closed, client "
<< TRI_get_fd_or_handle_of_socket(_commSocket);
for (auto& i : _writeBuffers) {
delete i;
}
for (auto& i : _writeBuffersStats) {
TRI_ReleaseRequestStatistics(i);
}
}
SocketTask(socket, std::move(info), keepAliveTimeout),
_server(server) {}
void GeneralCommTask::signalTask(TaskData* data) {
// used to output text
// data response
if (data->_type == TaskData::TASK_DATA_RESPONSE) {
data->RequestStatisticsAgent::transferTo(this);
GeneralResponse* response = data->_response.get();
if (response != nullptr) {
processResponse(response);
processRead();
} else {
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR, 0);
}
}
// buffer response
else if (data->_type == TaskData::TASK_DATA_BUFFER) {
std::unique_ptr<GeneralResponse> response;
if (transportType() == Endpoint::TransportType::VPP) {
response = std::unique_ptr<VppResponse>(new VppResponse(
GeneralResponse::ResponseCode::OK, 0 /*id unset FIXME?*/));
} else {
response = std::unique_ptr<HttpResponse>(
new HttpResponse(GeneralResponse::ResponseCode::OK));
}
data->RequestStatisticsAgent::transferTo(this);
velocypack::Slice slice(data->_buffer->data());
// FIXME (obi) contentType - text set header/meta information?
response->setPayload(slice, true, VPackOptions::Defaults);
processResponse(response.get());
processRead();
processResponse(data->_response.get());
}
// data chunk
@ -113,85 +63,35 @@ void GeneralCommTask::signalTask(TaskData* data) {
// do not know, what to do - give up
else {
_scheduler->destroyTask(this);
}
}
bool GeneralCommTask::handleRead() {
bool res = true;
if (!_closeRequested) {
res = fillReadBuffer();
// process as much data as we got; there might be more than one
// request in the buffer
while (processRead()) {
if (_closeRequested) {
break;
}
}
} else {
// if we don't close here, the scheduler thread may fall into a
// busy wait state, consuming 100% CPU!
_clientClosed = true;
}
if (_clientClosed) {
res = false;
} else if (!res) {
_clientClosed = true;
}
return res;
}
void GeneralCommTask::fillWriteBuffer() {
if (!hasWriteBuffer() && !_writeBuffers.empty()) {
StringBuffer* buffer = _writeBuffers.front();
_writeBuffers.pop_front();
void GeneralCommTask::executeRequest(
std::unique_ptr<GeneralRequest>&& request,
std::unique_ptr<GeneralResponse>&& response) {
// check for an async request (before the handler steals the request)
bool found = false;
std::string const& asyncExecution =
request->header(StaticStrings::Async, found);
TRI_ASSERT(buffer != nullptr);
// store the message id for error handling
auto messageId = response->messageId();
// REVIEW (fc)
TRI_request_statistics_t* statistics = nullptr;
if (!_writeBuffersStats.empty()) {
statistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
}
setWriteBuffer(buffer, statistics);
}
}
void GeneralCommTask::executeRequest(GeneralRequest* request,
GeneralResponse* response) {
// create a handler, this takes ownership of request and response
WorkItem::uptr<RestHandler> handler(
GeneralServerFeature::HANDLER_FACTORY->createHandler(request, response));
GeneralServerFeature::HANDLER_FACTORY->createHandler(
std::move(request), std::move(response)));
if (handler == nullptr) {
LOG(TRACE) << "no handler is known, giving up";
httpClearRequest();
delete response;
handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND,
request->messageId());
handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND, messageId);
return;
}
handler->setTaskId(_taskId, _loop);
// check for an async request
bool found = false;
std::string const& asyncExecution =
request->header(StaticStrings::Async, found);
// TODO(fc)
// the responsibility for the request has been moved to the handler
// so we give up ownage here by setting _request = nullptr
httpNullRequest(); // http specific - should be removed FIXME
// async execution
// asynchronous request
bool ok = false;
if (found && (asyncExecution == "true" || asyncExecution == "store")) {
@ -200,52 +100,42 @@ void GeneralCommTask::executeRequest(GeneralRequest* request,
if (asyncExecution == "store") {
// persist the responses
ok = _server->handleRequestAsync(this, handler, &jobId);
ok = _server->handleRequestAsync(this, std::move(handler), &jobId);
} else {
// don't persist the responses
ok = _server->handleRequestAsync(this, handler, nullptr);
ok = _server->handleRequestAsync(this, std::move(handler));
}
if (ok) {
HttpResponse response(GeneralResponse::ResponseCode::ACCEPTED);
std::unique_ptr<GeneralResponse> response =
createResponse(GeneralResponse::ResponseCode::ACCEPTED, messageId);
if (jobId > 0) {
// return the job id we just created
response.setHeaderNC(StaticStrings::AsyncId, StringUtils::itoa(jobId));
response->setHeaderNC(StaticStrings::AsyncId, StringUtils::itoa(jobId));
}
processResponse(&response);
processResponse(response.get());
return;
}
}
// synchronous request
else {
ok = _server->handleRequest(this, handler);
ok = _server->handleRequest(this, std::move(handler));
}
if (!ok) {
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR,
request->messageId());
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR, messageId);
}
}
void GeneralCommTask::processResponse(GeneralResponse* response) {
if (response == nullptr) {
handleSimpleError(GeneralResponse::ResponseCode::SERVER_ERROR,
response->messageId());
LOG_TOPIC(WARN, Logger::COMMUNICATION)
<< "processResponse received a nullptr, closing connection";
_clientClosed = true;
} else {
addResponse(response, false);
addResponse(response);
}
}
// TODO(fc) MOVE TO SOCKET TASK
bool GeneralCommTask::handleEvent(EventToken token, EventType events) {
// destroy this task if client is closed
bool result = SocketTask::handleEvent(token, events);
if (_clientClosed) {
_scheduler->destroyTask(this);
}
return result;
}

View File

@ -132,30 +132,18 @@ class GeneralCommTask : public SocketTask, public RequestStatisticsAgent {
GeneralCommTask(GeneralServer*, TRI_socket_t, ConnectionInfo&&,
double keepAliveTimeout);
// write data from request into _writeBuffers and call fillWriteBuffer
virtual void addResponse(GeneralResponse*, bool error) = 0;
virtual void addResponse(GeneralResponse*) = 0;
virtual arangodb::Endpoint::TransportType transportType() = 0;
protected:
virtual ~GeneralCommTask();
// is called in a loop as long as it returns true.
// Return false if there is not enough data to do
// any more processing and all available data has
// been evaluated.
virtual bool processRead() = 0;
virtual void handleChunk(char const*, size_t) = 0;
protected:
virtual void httpClearRequest(){}; // should be removed
virtual void httpNullRequest(){}; // should be removed
void executeRequest(GeneralRequest*, GeneralResponse*);
virtual std::unique_ptr<GeneralResponse> createResponse(
GeneralResponse::ResponseCode, uint64_t messageId) = 0;
// TODO(fc) move to SocketTask
// main callback of this class - called by base SocketTask - this version
// calls the SocketTask's handleEvent (called in httpsHandler directly)
virtual bool handleEvent(EventToken token, EventType events) override;
protected:
void executeRequest(std::unique_ptr<GeneralRequest>&&,
std::unique_ptr<GeneralResponse>&&);
void processResponse(GeneralResponse*);
@ -164,33 +152,20 @@ class GeneralCommTask : public SocketTask, public RequestStatisticsAgent {
virtual void handleSimpleError(GeneralResponse::ResponseCode, int code,
std::string const& errorMessage,
uint64_t messageId) = 0;
void fillWriteBuffer(); // fills SocketTasks _writeBuffer
// _writeBufferStatistics from
// _writeBuffers/_writeBuffersStats
private:
void handleTimeout() override final { _clientClosed = true; }
bool handleRead() override final;
void signalTask(TaskData*) override;
protected:
// for asynchronous requests
GeneralServer* const _server;
// information about the client
ConnectionInfo _connectionInfo;
// protocol to use http, vpp
char const* _protocol = "unknown";
GeneralRequest::ProtocolVersion _protocolVersion =
GeneralRequest::ProtocolVersion::UNKNOWN;
// true if a close has been requested by the client
bool _closeRequested = false;
std::deque<basics::StringBuffer*> _writeBuffers;
std::deque<TRI_request_statistics_t*> _writeBuffersStats;
};
}
}

View File

@ -115,7 +115,7 @@ void GeneralServer::stopListening() {
////////////////////////////////////////////////////////////////////////////////
bool GeneralServer::handleRequestAsync(GeneralCommTask* task,
WorkItem::uptr<RestHandler>& handler,
WorkItem::uptr<RestHandler> handler,
uint64_t* jobId) {
bool startThread = handler->needsOwnThread();
@ -148,7 +148,7 @@ bool GeneralServer::handleRequestAsync(GeneralCommTask* task,
LOG(WARN) << "unable to add job to the job queue: "
<< TRI_errno_string(res);
}
// todo send info to async work manager?
// TODO send info to async work manager?
return false;
}
@ -161,12 +161,10 @@ bool GeneralServer::handleRequestAsync(GeneralCommTask* task,
////////////////////////////////////////////////////////////////////////////////
bool GeneralServer::handleRequest(GeneralCommTask* task,
WorkItem::uptr<RestHandler>& handler) {
WorkItem::uptr<RestHandler> handler) {
// direct handlers
if (handler->isDirect()) {
HandlerWorkStack work(handler);
handleRequestDirectly(work.handler(), task);
handleRequestDirectly(task, std::move(handler));
return true;
}
@ -233,23 +231,23 @@ bool GeneralServer::openEndpoint(Endpoint* endpoint) {
/// @brief handle request directly
////////////////////////////////////////////////////////////////////////////////
void GeneralServer::handleRequestDirectly(RestHandler* handler,
GeneralCommTask* task) {
task->RequestStatisticsAgent::transferTo(handler);
RestHandler::status result = handler->executeFull();
handler->RequestStatisticsAgent::transferTo(task);
void GeneralServer::handleRequestDirectly(GeneralCommTask* task,
WorkItem::uptr<RestHandler> handler) {
HandlerWorkStack work(std::move(handler));
task->RequestStatisticsAgent::transferTo(work.handler());
RestHandler::status result = work.handler()->executeFull();
work.handler()->RequestStatisticsAgent::transferTo(task);
switch (result) {
case RestHandler::status::FAILED:
case RestHandler::status::DONE: {
// auto response = dynamic_cast<HttpResponse*>(handler->response());
// task->addResponse(response, false);
task->addResponse(handler->response(), false);
task->addResponse(work.handler()->response());
break;
}
case RestHandler::status::ASYNC:
// do nothing, just wait
handler.release();
break;
}
}

View File

@ -71,21 +71,19 @@ class GeneralServer : protected TaskManager {
// creates a job for asynchronous execution
bool handleRequestAsync(GeneralCommTask*,
arangodb::WorkItem::uptr<RestHandler>&,
uint64_t* jobId);
arangodb::WorkItem::uptr<RestHandler>,
uint64_t* jobId = nullptr);
// executes the handler directly or add it to the queue
bool handleRequest(GeneralCommTask*, arangodb::WorkItem::uptr<RestHandler>&);
bool handleRequest(GeneralCommTask*, arangodb::WorkItem::uptr<RestHandler>);
protected:
// opens a listen port
bool openEndpoint(Endpoint* endpoint);
// handles request directly
void handleRequestDirectly(RestHandler* handler, GeneralCommTask* task);
// registers a task
void registerHandler(RestHandler* handler, GeneralCommTask* task);
void handleRequestDirectly(GeneralCommTask*,
arangodb::WorkItem::uptr<RestHandler>);
protected:
// active listen tasks

View File

@ -24,13 +24,13 @@
#include "HttpCommTask.h"
#include "Meta/conversion.h"
#include "Basics/HybridLogicalClock.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/RestHandlerFactory.h"
#include "VocBase/ticks.h" //clock
#include "Meta/conversion.h"
#include "VocBase/ticks.h"
using namespace arangodb;
using namespace arangodb::basics;
@ -54,30 +54,54 @@ HttpCommTask::HttpCommTask(GeneralServer* server, TRI_socket_t sock,
_denyCredentials(true),
_acceptDeflate(false),
_newRequest(true),
_requestType(GeneralRequest::RequestType::ILLEGAL),
_fullUrl(),
_origin(),
_requestType(GeneralRequest::RequestType::ILLEGAL), // TODO(fc) remove
_fullUrl(), // TODO(fc) remove
_origin(), // TODO(fc) remove
_sinceCompactification(0),
_originalBodyLength(0) {
_originalBodyLength(0) { // TODO(fc) remove
_protocol = "http";
connectionStatisticsAgentSetHttp();
}
void HttpCommTask::handleSimpleError(GeneralResponse::ResponseCode code,
uint64_t id) {
(void)id; // id is not used for this protocol
HttpResponse response(code);
addResponse(&response, true);
uint64_t /* messageId */) {
std::unique_ptr<GeneralResponse> response(new HttpResponse(code));
addResponse(response.get());
}
void HttpCommTask::addResponse(HttpResponse* response, bool isError) {
void HttpCommTask::handleSimpleError(GeneralResponse::ResponseCode code,
int errorNum,
std::string const& errorMessage,
uint64_t /* messageId */) {
std::unique_ptr<GeneralResponse> response(new HttpResponse(code));
VPackBuilder builder;
builder.openObject();
builder.add(StaticStrings::Error, VPackValue(true));
builder.add(StaticStrings::ErrorNum, VPackValue(errorNum));
builder.add(StaticStrings::ErrorMessage, VPackValue(errorMessage));
builder.add(StaticStrings::Code, VPackValue((int)code));
builder.close();
try {
response->setPayload(builder.slice(), true, VPackOptions::Defaults);
addResponse(response.get());
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::COMMUNICATION)
<< "handleSimpleError received an exception, closing connection:"
<< ex.what();
_clientClosed = true;
} catch (...) {
LOG_TOPIC(WARN, Logger::COMMUNICATION)
<< "handleSimpleError received an exception, closing connection";
_clientClosed = true;
}
}
void HttpCommTask::addResponse(HttpResponse* response) {
_requestPending = false;
_isChunked = false;
if (isError) {
resetState(true);
}
// CORS response handling
if (!_origin.empty()) {
// the request contained an Origin header. We have to send back the
@ -94,16 +118,15 @@ void HttpCommTask::addResponse(HttpResponse* response, bool isError) {
response->setHeaderNC(StaticStrings::AccessControlAllowCredentials,
(_denyCredentials ? "false" : "true"));
}
// CORS request handling EOF
// set "connection" header
// keep-alive is the default
// set "connection" header, keep-alive is the default
response->setConnectionType(_closeRequested
? HttpResponse::CONNECTION_CLOSE
: HttpResponse::CONNECTION_KEEP_ALIVE);
size_t const responseBodyLength = response->bodySize();
// TODO(fc) should be handled by the response / request
if (_requestType == GeneralRequest::RequestType::HEAD) {
// clear body if this is an HTTP HEAD request
// HEAD must not return a body
@ -114,6 +137,8 @@ void HttpCommTask::addResponse(HttpResponse* response, bool isError) {
auto buffer = std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE,
responseBodyLength + 128, false);
// TODO: move this to HttpResponse
// write header
response->writeHeader(buffer.get());
@ -133,23 +158,20 @@ void HttpCommTask::addResponse(HttpResponse* response, bool isError) {
buffer->ensureNullTerminated();
_writeBuffers.push_back(buffer.get());
auto b = buffer.release();
if (!b->empty()) {
if (!buffer->empty()) {
LOG_TOPIC(TRACE, Logger::REQUESTS)
<< "\"http-request-response\",\"" << (void*)this << "\",\""
<< StringUtils::escapeUnicode(std::string(b->c_str(), b->length()))
<< StringUtils::escapeUnicode(
std::string(buffer->c_str(), buffer->length()))
<< "\"";
}
// clear body
response->body().clear();
double const totalTime = RequestStatisticsAgent::elapsedSinceReadStart();
_writeBuffersStats.push_back(RequestStatisticsAgent::steal());
// append write buffer and statistics
addWriteBuffer(std::move(buffer), this);
// and give some request information
LOG_TOPIC(INFO, Logger::REQUESTS)
<< "\"http-request-end\",\"" << (void*)this << "\",\""
<< _connectionInfo.clientAddress << "\",\""
@ -159,13 +181,15 @@ void HttpCommTask::addResponse(HttpResponse* response, bool isError) {
<< _originalBodyLength << "," << responseBodyLength << ",\"" << _fullUrl
<< "\"," << Logger::FIXED(totalTime, 6);
// start output
fillWriteBuffer();
// clear body
response->body().clear();
}
// reads data from the socket
bool HttpCommTask::processRead() {
if (_requestPending || _readBuffer->c_str() == nullptr) {
TRI_ASSERT(_readBuffer->c_str() != nullptr);
if (_requestPending) {
return false;
}
@ -246,15 +270,16 @@ bool HttpCommTask::processRead() {
// check that we know, how to serve this request and update the connection
// information, i. e. client and server addresses and ports and create a
// request context for that request
_request = new HttpRequest(
_incompleteRequest.reset(new HttpRequest(
_connectionInfo, _readBuffer->c_str() + _startPosition,
_readPosition - _startPosition, _allowMethodOverride);
_readPosition - _startPosition, _allowMethodOverride));
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(_request);
_request->setClientTaskId(_taskId);
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(
_incompleteRequest.get());
_incompleteRequest->setClientTaskId(_taskId);
// check HTTP protocol version
_protocolVersion = _request->protocolVersion();
_protocolVersion = _incompleteRequest->protocolVersion();
if (_protocolVersion != GeneralRequest::ProtocolVersion::HTTP_1_0 &&
_protocolVersion != GeneralRequest::ProtocolVersion::HTTP_1_1) {
@ -266,7 +291,7 @@ bool HttpCommTask::processRead() {
}
// check max URL length
_fullUrl = _request->fullUrl();
_fullUrl = _incompleteRequest->fullUrl();
if (_fullUrl.size() > 16384) {
handleSimpleError(GeneralResponse::ResponseCode::REQUEST_URI_TOO_LONG,
@ -276,7 +301,7 @@ bool HttpCommTask::processRead() {
// update the connection information, i. e. client and server addresses
// and ports
_request->setProtocol(_protocol);
_incompleteRequest->setProtocol(_protocol);
LOG(TRACE) << "server port " << _connectionInfo.serverPort
<< ", client port " << _connectionInfo.clientPort;
@ -287,7 +312,7 @@ bool HttpCommTask::processRead() {
// keep track of the original value of the "origin" request header (if
// any), we need this value to handle CORS requests
_origin = _request->header(StaticStrings::Origin);
_origin = _incompleteRequest->header(StaticStrings::Origin);
if (!_origin.empty()) {
// default is to allow nothing
@ -323,7 +348,7 @@ bool HttpCommTask::processRead() {
// store the original request's type. we need it later when responding
// (original request object gets deleted before responding)
_requestType = _request->requestType();
_requestType = _incompleteRequest->requestType();
requestStatisticsAgentSetRequestType(_requestType);
@ -345,7 +370,8 @@ bool HttpCommTask::processRead() {
_requestType == GeneralRequest::RequestType::OPTIONS ||
_requestType == GeneralRequest::RequestType::DELETE_REQ);
if (!checkContentLength(expectContentLength)) {
if (!checkContentLength(_incompleteRequest.get(),
expectContentLength)) {
return false;
}
@ -382,7 +408,7 @@ bool HttpCommTask::processRead() {
if (_readRequestBody) {
bool found;
std::string const& expect =
_request->header(StaticStrings::Expect, found);
_incompleteRequest->header(StaticStrings::Expect, found);
if (found && StringUtils::trim(expect) == "100-continue") {
LOG(TRACE) << "received a 100-continue request";
@ -392,12 +418,7 @@ bool HttpCommTask::processRead() {
TRI_CHAR_LENGTH_PAIR("HTTP/1.1 100 (Continue)\r\n\r\n"));
buffer->ensureNullTerminated();
_writeBuffers.push_back(buffer.get());
buffer.release();
_writeBuffersStats.push_back(nullptr);
fillWriteBuffer();
addWriteBuffer(std::move(buffer));
}
}
} else {
@ -412,14 +433,15 @@ bool HttpCommTask::processRead() {
// readRequestBody might have changed, so cannot use else
if (_readRequestBody) {
if (_readBuffer->length() - _bodyPosition < _bodyLength) {
setKeepAliveTimeout(_keepAliveTimeout);
armKeepAliveTimeout();
// let client send more
return false;
}
// read "bodyLength" from read buffer and add this body to "httpRequest"
requestAsHttp()->setBody(_readBuffer->c_str() + _bodyPosition, _bodyLength);
_incompleteRequest->setBody(_readBuffer->c_str() + _bodyPosition,
_bodyLength);
LOG(TRACE) << "" << std::string(_readBuffer->c_str() + _bodyPosition,
_bodyLength);
@ -447,21 +469,21 @@ bool HttpCommTask::processRead() {
bool const isOptionsRequest =
(_requestType == GeneralRequest::RequestType::OPTIONS);
resetState(false);
resetState();
// .............................................................................
// keep-alive handling
// .............................................................................
std::string connectionType =
StringUtils::tolower(_request->header(StaticStrings::Connection));
std::string connectionType = StringUtils::tolower(
_incompleteRequest->header(StaticStrings::Connection));
if (connectionType == "close") {
// client has sent an explicit "Connection: Close" header. we should close
// the connection
LOG(DEBUG) << "connection close requested by client";
_closeRequested = true;
} else if (requestAsHttp()->isHttp10() && connectionType != "keep-alive") {
} else if (_incompleteRequest->isHttp10() && connectionType != "keep-alive") {
// HTTP 1.0 request, and no "Connection: Keep-Alive" header sent
// we should close the connection
LOG(DEBUG) << "no keep-alive, connection close requested by client";
@ -480,16 +502,17 @@ bool HttpCommTask::processRead() {
// authenticate
// .............................................................................
GeneralResponse::ResponseCode authResult = authenticateRequest();
GeneralResponse::ResponseCode authResult =
authenticateRequest(_incompleteRequest.get());
// authenticated or an OPTIONS request. OPTIONS requests currently go
// unauthenticated
if (authResult == GeneralResponse::ResponseCode::OK || isOptionsRequest) {
// handle HTTP OPTIONS requests directly
if (isOptionsRequest) {
processCorsOptions();
processCorsOptions(std::move(_incompleteRequest));
} else {
processRequest();
processRequest(std::move(_incompleteRequest));
}
}
// not found
@ -509,24 +532,19 @@ bool HttpCommTask::processRead() {
response.setHeaderNC(StaticStrings::WwwAuthenticate, std::move(realm));
clearRequest();
processResponse(&response);
}
_incompleteRequest.reset(nullptr);
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief processes a request
////////////////////////////////////////////////////////////////////////////////
void HttpCommTask::processRequest() {
void HttpCommTask::processRequest(std::unique_ptr<HttpRequest> request) {
// check for deflate
bool found;
auto httpRequest = requestAsHttp();
std::string const& acceptEncoding =
httpRequest->header(StaticStrings::AcceptEncoding, found);
request->header(StaticStrings::AcceptEncoding, found);
if (found) {
if (acceptEncoding.find("deflate") != std::string::npos) {
@ -534,7 +552,7 @@ void HttpCommTask::processRequest() {
}
}
if (httpRequest != nullptr) {
{
LOG_TOPIC(DEBUG, Logger::REQUESTS)
<< "\"http-request-begin\",\"" << (void*)this << "\",\""
<< _connectionInfo.clientAddress << "\",\""
@ -542,7 +560,7 @@ void HttpCommTask::processRequest() {
<< HttpRequest::translateVersion(_protocolVersion) << "\"," << _fullUrl
<< "\"";
std::string const& body = httpRequest->body();
std::string const& body = request->body();
if (!body.empty()) {
LOG_TOPIC(DEBUG, Logger::REQUESTS)
@ -553,7 +571,8 @@ void HttpCommTask::processRequest() {
// check for an HLC time stamp
std::string const& timeStamp =
_request->header(StaticStrings::HLCHeader, found);
request->header(StaticStrings::HLCHeader, found);
if (found) {
uint64_t timeStampInt =
arangodb::basics::HybridLogicalClock::decodeTimeStampWithCheck(
@ -564,39 +583,32 @@ void HttpCommTask::processRequest() {
}
// create a handler and execute
HttpResponse* response =
new HttpResponse(GeneralResponse::ResponseCode::SERVER_ERROR);
std::unique_ptr<GeneralResponse> response(
new HttpResponse(GeneralResponse::ResponseCode::SERVER_ERROR));
response->setContentType(meta::enumToEnum<GeneralResponse::ContentType>(
_request->contentTypeResponse()));
executeRequest(_request, response);
}
request->contentTypeResponse()));
////////////////////////////////////////////////////////////////////////////////
/// @brief chunking is finished
////////////////////////////////////////////////////////////////////////////////
executeRequest(std::move(request), std::move(response));
}
void HttpCommTask::finishedChunked() {
auto buffer = std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, 6, true);
buffer->appendText(TRI_CHAR_LENGTH_PAIR("0\r\n\r\n"));
buffer->ensureNullTerminated();
_writeBuffers.push_back(buffer.get());
buffer.release();
_writeBuffersStats.push_back(nullptr);
_isChunked = false;
_requestPending = false;
fillWriteBuffer();
processRead();
addWriteBuffer(std::move(buffer));
}
////////////////////////////////////////////////////////////////////////////////
/// check the content-length header of a request and fail it is broken
////////////////////////////////////////////////////////////////////////////////
bool HttpCommTask::checkContentLength(bool expectContentLength) {
int64_t const bodyLength = _request->contentLength();
bool HttpCommTask::checkContentLength(HttpRequest* request,
bool expectContentLength) {
int64_t const bodyLength = request->contentLength();
if (bodyLength < 0) {
// bad request, body length is < 0. this is a client error
@ -635,7 +647,7 @@ bool HttpCommTask::checkContentLength(bool expectContentLength) {
return true;
}
void HttpCommTask::processCorsOptions() {
void HttpCommTask::processCorsOptions(std::unique_ptr<HttpRequest> request) {
HttpResponse response(GeneralResponse::ResponseCode::OK);
response.setHeaderNC(StaticStrings::Allow, StaticStrings::CorsMethods);
@ -643,7 +655,7 @@ void HttpCommTask::processCorsOptions() {
if (!_origin.empty()) {
LOG(TRACE) << "got CORS preflight request";
std::string const allowHeaders = StringUtils::trim(
_request->header(StaticStrings::AccessControlRequestHeaders));
request->header(StaticStrings::AccessControlRequestHeaders));
// send back which HTTP methods are allowed for the resource
// we'll allow all
@ -666,100 +678,81 @@ void HttpCommTask::processCorsOptions() {
response.setHeaderNC(StaticStrings::AccessControlMaxAge,
StaticStrings::N1800);
}
clearRequest();
processResponse(&response);
}
void HttpCommTask::handleChunk(char const* data, size_t len) {
if (!_isChunked) {
return;
}
if (0 == len) {
finishedChunked();
} else {
StringBuffer* buffer = new StringBuffer(TRI_UNKNOWN_MEM_ZONE, len);
std::unique_ptr<StringBuffer> buffer(
new StringBuffer(TRI_UNKNOWN_MEM_ZONE, len));
buffer->appendHex(len);
buffer->appendText(TRI_CHAR_LENGTH_PAIR("\r\n"));
buffer->appendText(data, len);
buffer->appendText(TRI_CHAR_LENGTH_PAIR("\r\n"));
sendChunk(buffer);
addWriteBuffer(std::move(buffer));
}
}
void HttpCommTask::completedWriteBuffer() {
_writeBuffer = nullptr;
_writeLength = 0;
if (_writeBufferStatistics != nullptr) {
_writeBufferStatistics->_writeEnd = TRI_StatisticsTime();
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
_writeBufferStatistics = nullptr;
}
fillWriteBuffer();
if (!_clientClosed && _closeRequested && !hasWriteBuffer() &&
_writeBuffers.empty() && !_isChunked) {
_clientClosed = true;
}
std::unique_ptr<GeneralResponse> HttpCommTask::createResponse(
GeneralResponse::ResponseCode responseCode, uint64_t /* messageId */) {
return std::unique_ptr<GeneralResponse>(new HttpResponse(responseCode));
}
void HttpCommTask::resetState(bool close) {
if (close) {
clearRequest();
void HttpCommTask::resetState() {
_requestPending = true;
_requestPending = false;
_isChunked = false;
_closeRequested = true;
bool compact = false;
if (_sinceCompactification > RunCompactEvery) {
compact = true;
} else if (_readBuffer->length() > MaximalPipelineSize) {
compact = true;
}
if (compact) {
_readBuffer->erase_front(_bodyPosition + _bodyLength);
_sinceCompactification = 0;
_readPosition = 0;
_bodyPosition = 0;
_bodyLength = 0;
} else {
_requestPending = true;
bool compact = false;
if (_sinceCompactification > RunCompactEvery) {
compact = true;
} else if (_readBuffer->length() > MaximalPipelineSize) {
compact = true;
}
if (compact) {
_readBuffer->erase_front(_bodyPosition + _bodyLength);
_readPosition = _bodyPosition + _bodyLength;
if (_readPosition == _readBuffer->length()) {
_sinceCompactification = 0;
_readPosition = 0;
} else {
_readPosition = _bodyPosition + _bodyLength;
if (_readPosition == _readBuffer->length()) {
_sinceCompactification = 0;
_readPosition = 0;
_readBuffer->reset();
}
_readBuffer->reset();
}
_bodyPosition = 0;
_bodyLength = 0;
}
_bodyPosition = 0;
_bodyLength = 0;
_newRequest = true;
_readRequestBody = false;
}
GeneralResponse::ResponseCode HttpCommTask::authenticateRequest() {
auto context = (_request == nullptr) ? nullptr : _request->requestContext();
GeneralResponse::ResponseCode HttpCommTask::authenticateRequest(
HttpRequest* request) {
auto context = request->requestContext();
if (context == nullptr && _request != nullptr) {
if (context == nullptr) {
bool res =
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(_request);
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(request);
if (!res) {
return GeneralResponse::ResponseCode::NOT_FOUND;
}
context = _request->requestContext();
context = request->requestContext();
}
if (context == nullptr) {
@ -768,53 +761,3 @@ GeneralResponse::ResponseCode HttpCommTask::authenticateRequest() {
return context->authenticate();
}
void HttpCommTask::sendChunk(StringBuffer* buffer) {
if (_isChunked) {
TRI_ASSERT(buffer != nullptr);
_writeBuffers.push_back(buffer);
_writeBuffersStats.push_back(nullptr);
fillWriteBuffer();
} else {
delete buffer;
}
}
// convert internal GeneralRequest to HttpRequest
HttpRequest* HttpCommTask::requestAsHttp() {
HttpRequest* request = dynamic_cast<HttpRequest*>(_request);
if (request == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
return request;
};
void HttpCommTask::handleSimpleError(GeneralResponse::ResponseCode responseCode,
int errorNum,
std::string const& errorMessage,
uint64_t messageId) {
(void)messageId;
HttpResponse response(responseCode);
VPackBuilder builder;
builder.openObject();
builder.add(StaticStrings::Error, VPackValue(true));
builder.add(StaticStrings::ErrorNum, VPackValue(errorNum));
builder.add(StaticStrings::ErrorMessage, VPackValue(errorMessage));
builder.add(StaticStrings::Code, VPackValue((int)responseCode));
builder.close();
try {
response.setPayload(builder.slice(), true, VPackOptions::Defaults);
processResponse(&response);
} catch (...) {
addResponse(&response, true);
}
}
void HttpCommTask::clearRequest() {
if (_request) {
delete _request;
_request = nullptr;
}
}

View File

@ -15,15 +15,14 @@ class HttpCommTask : public GeneralCommTask {
public:
HttpCommTask(GeneralServer*, TRI_socket_t, ConnectionInfo&&, double timeout);
bool processRead() override;
// convert from GeneralResponse to httpResponse ad dispatch request to class
// internal addResponse
void addResponse(GeneralResponse* response, bool isError) override {
void addResponse(GeneralResponse* response) override {
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(response);
if (httpResponse != nullptr) {
addResponse(httpResponse, isError);
if (httpResponse == nullptr) {
throw std::logic_error("invalid response or response Type");
}
addResponse(httpResponse);
};
arangodb::Endpoint::TransportType transportType() override {
@ -31,45 +30,35 @@ class HttpCommTask : public GeneralCommTask {
};
protected:
~HttpCommTask() { clearRequest(); }
bool processRead() override;
void handleChunk(char const*, size_t) override final;
void completedWriteBuffer() override final;
// clears the request object, REVIEW/TODO(fc)
void clearRequest();
void httpClearRequest() override { clearRequest(); }
void httpNullRequest() override { _request = nullptr; }
std::unique_ptr<GeneralResponse> createResponse(
GeneralResponse::ResponseCode, uint64_t messageId) override final;
void handleSimpleError(GeneralResponse::ResponseCode code,
uint64_t id = 1) override final;
uint64_t messageId = 1) override final;
void handleSimpleError(GeneralResponse::ResponseCode, int code,
std::string const& errorMessage,
uint64_t messageId = 1) override final;
private:
void processRequest();
// resets the internal state this method can be called to clean up when the
// request handling aborts prematurely
void resetState(bool close);
void processRequest(std::unique_ptr<HttpRequest>);
void processCorsOptions(std::unique_ptr<HttpRequest>);
void addResponse(HttpResponse*, bool isError);
void resetState();
void addResponse(HttpResponse*);
HttpRequest* requestAsHttp();
void finishedChunked();
// check the content-length header of a request and fail it is broken
bool checkContentLength(bool expectContentLength);
void processCorsOptions(); // handles CORS options
bool checkContentLength(HttpRequest*, bool expectContentLength);
std::string authenticationRealm() const; // returns the authentication realm
GeneralResponse::ResponseCode
authenticateRequest(); // checks the authentication
void sendChunk(basics::StringBuffer*); // sends more chunked data
GeneralResponse::ResponseCode authenticateRequest(HttpRequest*);
private:
// the request with possible incomplete body
// REVIEW(fc)
GeneralRequest* _request = nullptr;
size_t _readPosition; // current read position
size_t _startPosition; // start position of current request
size_t _bodyPosition; // start of the body position
@ -80,6 +69,7 @@ class HttpCommTask : public GeneralCommTask {
// CORS)
bool _acceptDeflate; // whether the client accepts deflate algorithm
bool _newRequest; // new request started
// TODO(fc) remove
GeneralRequest::RequestType _requestType; // type of request (GET, POST, ...)
std::string _fullUrl; // value of requested URL
std::string _origin; // value of the HTTP origin header the client sent (if
@ -96,6 +86,8 @@ class HttpCommTask : public GeneralCommTask {
// true if request is complete but not handled
bool _requestPending = false;
std::unique_ptr<HttpRequest> _incompleteRequest;
};
}
}

View File

@ -89,7 +89,7 @@ void HttpServerJob::work() {
data->_taskId = _handler->taskId();
data->_loop = _handler->eventLoop();
data->_type = TaskData::TASK_DATA_RESPONSE;
data->_response.reset(_handler->stealResponse());
data->_response = _handler->stealResponse();
_handler->RequestStatisticsAgent::transferTo(data.get());

View File

@ -377,9 +377,6 @@ bool HttpsCommTask::trySSLWrite() {
}
if (len == 0) {
delete _writeBuffer;
_writeBuffer = nullptr;
completedWriteBuffer();
} else if (nr > 0) {
// nr might have been negative here

View File

@ -24,10 +24,10 @@
#include "PathHandler.h"
#include "Basics/FileUtils.h"
#include "Logger/Logger.h"
#include "Basics/mimetypes.h"
#include "Basics/StringBuffer.h"
#include "Basics/StringUtils.h"
#include "Basics/mimetypes.h"
#include "Logger/Logger.h"
#include "Rest/HttpRequest.h"
#include "Rest/HttpResponse.h"
@ -66,8 +66,8 @@ PathHandler::PathHandler(GeneralRequest* request, GeneralResponse* response,
// -----------------------------------------------------------------------------
RestHandler::status PathHandler::execute() {
// TODO needs to generalized
auto response = dynamic_cast<HttpResponse*>(_response);
// TODO needs to handle VPP
auto response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);

View File

@ -55,9 +55,7 @@ class PathHandler : public RestHandler {
public:
static RestHandler* create(GeneralRequest* request, GeneralResponse* response,
void* data) {
Options* options = static_cast<Options*>(data);
return new PathHandler(request, response, options);
return new PathHandler(request, response, static_cast<Options*>(data));
}
public:

View File

@ -50,11 +50,6 @@ RestHandler::RestHandler(GeneralRequest* request, GeneralResponse* response)
}
}
RestHandler::~RestHandler() {
delete _request;
delete _response;
}
void RestHandler::setTaskId(uint64_t id, EventLoop loop) {
_taskId = id;
_loop = loop;
@ -122,18 +117,6 @@ RestHandler::status RestHandler::executeFull() {
return result;
}
GeneralRequest* RestHandler::stealRequest() {
GeneralRequest* tmp = _request;
_request = nullptr;
return tmp;
}
GeneralResponse* RestHandler::stealResponse() {
GeneralResponse* tmp = _response;
_response = nullptr;
return tmp;
}
void RestHandler::setResponseCode(GeneralResponse::ResponseCode code) {
TRI_ASSERT(_response != nullptr);
_response->reset(code);

View File

@ -36,6 +36,7 @@
namespace arangodb {
class GeneralRequest;
class WorkMonitor;
namespace rest {
class RestHandlerFactory;
@ -48,7 +49,7 @@ class RestHandler : public RequestStatisticsAgent, public arangodb::WorkItem {
RestHandler(GeneralRequest*, GeneralResponse*);
protected:
~RestHandler();
~RestHandler() = default;
public:
enum class status { DONE, FAILED, ASYNC };
@ -95,16 +96,18 @@ class RestHandler : public RequestStatisticsAgent, public arangodb::WorkItem {
status executeFull();
// return a pointer to the request
GeneralRequest const* request() const { return _request; }
GeneralRequest const* request() const { return _request.get(); }
// steal the pointer to the request
GeneralRequest* stealRequest();
std::unique_ptr<GeneralRequest> stealRequest() { return std::move(_request); }
// returns the response
GeneralResponse* response() const { return _response; }
GeneralResponse* response() const { return _response.get(); }
// steal the response
GeneralResponse* stealResponse();
std::unique_ptr<GeneralResponse> stealResponse() {
return std::move(_response);
}
protected:
// sets response Code
@ -120,12 +123,8 @@ class RestHandler : public RequestStatisticsAgent, public arangodb::WorkItem {
// event loop
EventLoop _loop;
// the request
GeneralRequest* _request;
// OBI-TODO make private
// the response
GeneralResponse* _response;
std::unique_ptr<GeneralRequest> _request;
std::unique_ptr<GeneralResponse> _response;
private:
bool _needsOwnThread = false;

View File

@ -83,8 +83,9 @@ bool RestHandlerFactory::setRequestContext(GeneralRequest* request) {
/// @brief creates a new handler
////////////////////////////////////////////////////////////////////////////////
RestHandler* RestHandlerFactory::createHandler(GeneralRequest* request,
GeneralResponse* response) {
RestHandler* RestHandlerFactory::createHandler(
std::unique_ptr<GeneralRequest> request,
std::unique_ptr<GeneralResponse> response) {
std::string const& path = request->requestPath();
// In the bootstrap phase, we would like that coordinators answer the
@ -96,7 +97,7 @@ RestHandler* RestHandlerFactory::createHandler(GeneralRequest* request,
path.find("/_api/agency/agency-callbacks") == std::string::npos &&
path.find("/_api/aql") == std::string::npos)) {
LOG(DEBUG) << "Maintenance mode: refused path: " << path;
return new MaintenanceHandler(request, response);
return new MaintenanceHandler(request.release(), response.release());
}
}
@ -176,7 +177,7 @@ RestHandler* RestHandlerFactory::createHandler(GeneralRequest* request,
// no match
if (i == ii.end()) {
if (_notFound != nullptr) {
return _notFound(request, response, nullptr);
return _notFound(request.release(), response.release(), nullptr);
}
LOG(TRACE) << "no not-found handler, giving up";
@ -184,7 +185,8 @@ RestHandler* RestHandlerFactory::createHandler(GeneralRequest* request,
}
LOG(TRACE) << "found handler for path '" << *modifiedPath << "'";
return i->second.first(request, response, i->second.second);
return i->second.first(request.release(), response.release(),
i->second.second);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -65,7 +65,8 @@ class RestHandlerFactory {
bool setRequestContext(GeneralRequest*);
// creates a new handler
RestHandler* createHandler(GeneralRequest*, GeneralResponse*);
RestHandler* createHandler(std::unique_ptr<GeneralRequest>,
std::unique_ptr<GeneralResponse>);
// adds a path and constructor to the factory
void addHandler(std::string const& path, create_fptr, void* data = nullptr);

View File

@ -23,17 +23,17 @@
#include "VppCommTask.h"
#include "Basics/StringBuffer.h"
#include "Basics/HybridLogicalClock.h"
#include "Basics/StringBuffer.h"
#include "Basics/VelocyPackHelper.h"
#include "Meta/conversion.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/RestHandlerFactory.h"
#include "Logger/LoggerFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Logger/LoggerFeature.h"
#include "VocBase/ticks.h"
#include <velocypack/Validator.h>
@ -167,13 +167,7 @@ VppCommTask::VppCommTask(GeneralServer* server, TRI_socket_t sock,
// connectionStatisticsAgentSetVpp();
}
void VppCommTask::addResponse(VppResponse* response, bool isError) {
if (isError) {
// FIXME (obi)
// what do we need to do?
// clean read buffer? reset process read cursor
}
void VppCommTask::addResponse(VppResponse* response) {
VPackMessageNoOwnBuffer response_message = response->prepareForNetwork();
uint64_t& id = response_message._id;
@ -203,11 +197,8 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
// adds chunk header infromation and creates SingBuffer* that can be
// used with _writeBuffers
auto buffer = createChunkForNetworkSingle(slices, id);
_writeBuffers.push_back(buffer.get());
buffer.release();
fillWriteBuffer(); // move data from _writebuffers to _writebuffer
// implemented in base
addWriteBuffer(std::move(buffer));
}
VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
@ -375,10 +366,9 @@ bool VppCommTask::processRead() {
} else {
// check auth
// the handler will take ownersip of this pointer
VppRequest* request = new VppRequest(_connectionInfo, std::move(message),
chunkHeader._messageID);
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(request);
std::unique_ptr<VppRequest> request(new VppRequest(
_connectionInfo, std::move(message), chunkHeader._messageID));
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(request.get());
// make sure we have a dabase
if (request->requestContext() == nullptr) {
handleSimpleError(GeneralResponse::ResponseCode::NOT_FOUND,
@ -386,11 +376,12 @@ bool VppCommTask::processRead() {
TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND),
chunkHeader._messageID);
} else {
request->setClientTaskId(_taskId);
_protocolVersion = request->protocolVersion();
executeRequest(request, new VppResponse(
GeneralResponse::ResponseCode::SERVER_ERROR,
chunkHeader._messageID));
request->setClientTaskId(_taskId);
_protocolVersion = request->protocolVersion();
std::unique_ptr<VppResponse> response(new VppResponse(
GeneralResponse::ResponseCode::SERVER_ERROR, chunkHeader._messageID));
executeRequest(std::move(request), std::move(response));
}
}
}
@ -404,10 +395,6 @@ bool VppCommTask::processRead() {
return doExecute;
}
void VppCommTask::completedWriteBuffer() {
// REVIEW (fc)
}
void VppCommTask::resetState(bool close, GeneralResponse::ResponseCode code) {
// REVIEW (fc)
_processReadVariables._readBufferCursor = nullptr;
@ -422,27 +409,33 @@ void VppCommTask::resetState(bool close, GeneralResponse::ResponseCode code) {
_closeRequested = close;
}
// GeneralResponse::ResponseCode VppCommTask::authenticateRequest() {
// auto context = (_request == nullptr) ? nullptr :
// _request->requestContext();
//
// if (context == nullptr && _request != nullptr) {
// bool res =
// GeneralServerFeature::HANDLER_FACTORY->setRequestContext(_request);
//
// if (!res) {
// return GeneralResponse::ResponseCode::NOT_FOUND;
// }
//
// context = _request->requestContext();
// }
//
// if (context == nullptr) {
// return GeneralResponse::ResponseCode::SERVER_ERROR;
// }
//
// return context->authenticate();
// }
GeneralResponse::ResponseCode VppCommTask::authenticateRequest(GeneralRequest* request) {
auto context = (request == nullptr) ? nullptr :
request->requestContext();
if (context == nullptr && request != nullptr) {
bool res =
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(request);
if (!res) {
return GeneralResponse::ResponseCode::NOT_FOUND;
}
context = request->requestContext();
}
if (context == nullptr) {
return GeneralResponse::ResponseCode::SERVER_ERROR;
}
return context->authenticate();
}
std::unique_ptr<GeneralResponse> VppCommTask::createResponse(
GeneralResponse::ResponseCode responseCode, uint64_t messageId) {
return std::unique_ptr<GeneralResponse>(
new VppResponse(responseCode, messageId));
}
void VppCommTask::handleSimpleError(GeneralResponse::ResponseCode responseCode,
int errorNum,
@ -462,6 +455,6 @@ void VppCommTask::handleSimpleError(GeneralResponse::ResponseCode responseCode,
response.setPayload(builder.slice(), true, VPackOptions::Defaults);
processResponse(&response);
} catch (...) {
addResponse(&response, true);
_clientClosed = true;
}
}

View File

@ -28,8 +28,8 @@
#include "GeneralServer/GeneralCommTask.h"
#include "lib/Rest/VppMessage.h"
#include "lib/Rest/VppResponse.h"
#include "lib/Rest/VppRequest.h"
#include "lib/Rest/VppResponse.h"
#include <stdexcept>
@ -40,18 +40,14 @@ class VppCommTask : public GeneralCommTask {
public:
VppCommTask(GeneralServer*, TRI_socket_t, ConnectionInfo&&, double timeout);
// read data check if chunk and message are complete
// if message is complete execute a request
bool processRead() override;
// convert from GeneralResponse to vppResponse ad dispatch request to class
// internal addResponse
void addResponse(GeneralResponse* response, bool isError) override {
void addResponse(GeneralResponse* response) override {
VppResponse* vppResponse = dynamic_cast<VppResponse*>(response);
if (vppResponse == nullptr) {
throw std::logic_error("invalid response or response Type");
}
addResponse(vppResponse, isError);
addResponse(vppResponse);
};
arangodb::Endpoint::TransportType transportType() override {
@ -59,13 +55,19 @@ class VppCommTask : public GeneralCommTask {
};
protected:
void completedWriteBuffer() override final;
virtual void handleChunk(char const*, size_t) {}
// read data check if chunk and message are complete
// if message is complete execute a request
bool processRead() override;
void handleChunk(char const*, size_t) override final {}
std::unique_ptr<GeneralResponse> createResponse(
GeneralResponse::ResponseCode, uint64_t messageId) override final;
void handleSimpleError(GeneralResponse::ResponseCode code,
uint64_t id) override {
VppResponse response(code, id);
addResponse(&response, true);
addResponse(&response);
}
void handleSimpleError(GeneralResponse::ResponseCode, int code,
std::string const& errorMessage,
@ -77,7 +79,8 @@ class VppCommTask : public GeneralCommTask {
void resetState(bool close, GeneralResponse::ResponseCode code =
GeneralResponse::ResponseCode::SERVER_ERROR);
void addResponse(VppResponse*, bool isError);
void addResponse(VppResponse*);
GeneralResponse::ResponseCode authenticateRequest(GeneralRequest* request);
private:
using MessageID = uint64_t;

View File

@ -48,13 +48,16 @@ RestBatchHandler::~RestBatchHandler() {}
////////////////////////////////////////////////////////////////////////////////
RestHandler::status RestBatchHandler::execute() {
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response);
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
std::cout << "please fix this for vpack" << std::endl;
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
HttpRequest const* httpRequest = dynamic_cast<HttpRequest const*>(_request);
HttpRequest const* httpRequest =
dynamic_cast<HttpRequest const*>(_request.get());
if (httpRequest == nullptr) {
std::cout << "please fix this for vpack" << std::endl;
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
@ -148,8 +151,9 @@ RestHandler::status RestBatchHandler::execute() {
// set up request object for the part
LOG(TRACE) << "part header is: " << std::string(headerStart, headerLength);
HttpRequest* request = new HttpRequest(_request->connectionInfo(),
headerStart, headerLength, false);
std::unique_ptr<HttpRequest> request(new HttpRequest(
_request->connectionInfo(), headerStart, headerLength, false));
// we do not have a client task id here
request->setClientTaskId(0);
@ -177,25 +181,22 @@ RestHandler::status RestBatchHandler::execute() {
{
std::unique_ptr<HttpResponse> response(
new HttpResponse(GeneralResponse::ResponseCode::SERVER_ERROR));
handler = GeneralServerFeature::HANDLER_FACTORY->createHandler(
request, response.get());
std::move(request), std::move(response));
if (handler == nullptr) {
delete request;
generateError(GeneralResponse::ResponseCode::BAD, TRI_ERROR_INTERNAL,
"could not create handler for batch part processing");
return status::FAILED;
}
response.release();
}
// start to work for this handler
{
HandlerWorkStack work(handler);
RestHandler::status result = handler->executeFull();
RestHandler::status result = work.handler()->executeFull();
if (result == status::FAILED) {
generateError(GeneralResponse::ResponseCode::BAD, TRI_ERROR_INTERNAL,
@ -268,7 +269,8 @@ RestHandler::status RestBatchHandler::execute() {
////////////////////////////////////////////////////////////////////////////////
bool RestBatchHandler::getBoundaryBody(std::string* result) {
HttpRequest const* req = dynamic_cast<HttpRequest const*>(_request);
HttpRequest const* req = dynamic_cast<HttpRequest const*>(_request.get());
if (req == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}

View File

@ -139,7 +139,7 @@ void RestCursorHandler::processQuery(VPackSlice const& slice) {
setResponseCode(GeneralResponse::ResponseCode::CREATED);
// TODO needs to generalized
auto* response = dynamic_cast<HttpResponse*>(_response);
auto* response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
@ -202,7 +202,7 @@ void RestCursorHandler::processQuery(VPackSlice const& slice) {
}
// TODO generalize
auto* httpResponse = dynamic_cast<HttpResponse*>(_response);
auto* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
@ -479,7 +479,7 @@ void RestCursorHandler::modifyCursor() {
setResponseCode(GeneralResponse::ResponseCode::OK);
// TODO needs to generalized
auto* response = dynamic_cast<HttpResponse*>(_response);
auto* response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);

View File

@ -265,7 +265,7 @@ void RestExportHandler::createCursor() {
setResponseCode(GeneralResponse::ResponseCode::CREATED);
// TODO needs to generalized
auto* response = dynamic_cast<HttpResponse*>(_response);
auto* response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
@ -340,7 +340,7 @@ void RestExportHandler::modifyCursor() {
setResponseCode(GeneralResponse::ResponseCode::OK);
// TODO this needs to be generalized
auto* response = dynamic_cast<HttpResponse*>(_response);
auto* response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);

View File

@ -309,10 +309,12 @@ bool RestImportHandler::createFromJson(std::string const &type) {
// auto detect import type by peeking at first non-whitespace character
// http required here
HttpRequest *req = dynamic_cast<HttpRequest *>(_request);
HttpRequest *req = dynamic_cast<HttpRequest *>(_request.get());
if (req == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
std::string const &body = req->body();
char const *ptr = body.c_str();
@ -367,7 +369,8 @@ bool RestImportHandler::createFromJson(std::string const &type) {
if (linewise) {
// http required here
HttpRequest *req = dynamic_cast<HttpRequest *>(_request);
HttpRequest *req = dynamic_cast<HttpRequest *>(_request.get());
if (req == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -453,10 +456,12 @@ bool RestImportHandler::createFromJson(std::string const &type) {
else {
// the entire request body is one JSON document
std::shared_ptr<VPackBuilder> parsedDocuments;
HttpRequest *req = dynamic_cast<HttpRequest *>(_request);
HttpRequest *req = dynamic_cast<HttpRequest *>(_request.get());
if (req == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
try {
parsedDocuments = VPackParser::fromJson(req->body());
} catch (VPackException const &) {
@ -558,10 +563,12 @@ bool RestImportHandler::createFromKeyValueList() {
lineNumber = StringUtils::int64(lineNumValue);
}
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request);
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
if(httpRequest == nullptr){
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
std::string const &bodyStr = httpRequest->body();
char const *current = bodyStr.c_str();
char const *bodyEnd = current + bodyStr.size();

View File

@ -101,13 +101,8 @@ void RestJobHandler::putJob() {
TRI_ASSERT(status == AsyncJobResult::JOB_DONE);
TRI_ASSERT(response != nullptr);
// delete our own response
if (_response != nullptr) {
delete _response;
}
// return the original response
_response = response;
_response.reset(response);
// plus a new header
static std::string const xArango = "x-arango-async-id";

View File

@ -37,7 +37,7 @@ bool RestPleaseUpgradeHandler::isDirect() const { return true; }
RestHandler::status RestPleaseUpgradeHandler::execute() {
// TODO needs to generalized
auto response = dynamic_cast<HttpResponse*>(_response);
auto response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);

View File

@ -439,7 +439,8 @@ void RestReplicationHandler::handleCommandLoggerState() {
// "server" part
builder.add("server", VPackValue(VPackValueType::Object));
builder.add("version", VPackValue(ARANGODB_VERSION));
builder.add("serverId", VPackValue(std::to_string(ServerIdFeature::getId())));
builder.add("serverId",
VPackValue(std::to_string(ServerIdFeature::getId())));
builder.close();
// "clients" part
@ -788,9 +789,10 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
std::string const& dbname = _request->databaseName();
auto headers = std::make_shared<std::unordered_map<std::string, std::string>>(
arangodb::getForwardableRequestHeaders(_request));
arangodb::getForwardableRequestHeaders(_request.get()));
std::unordered_map<std::string, std::string> values = _request->values();
std::string params;
for (auto const& i : values) {
if (i.first != "DBserver") {
if (params.empty()) {
@ -807,7 +809,8 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
// Set a few variables needed for our work:
ClusterComm* cc = ClusterComm::instance();
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request);
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
if (httpRequest == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -844,7 +847,8 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
setResponseCode(static_cast<GeneralResponse::ResponseCode>(
res->result->getHttpReturnCode()));
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response);
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (_response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -997,7 +1001,8 @@ void RestReplicationHandler::handleCommandLoggerFollow() {
setResponseCode(GeneralResponse::ResponseCode::OK);
}
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response);
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -1098,7 +1103,7 @@ void RestReplicationHandler::handleCommandDetermineOpenTransactions() {
setResponseCode(GeneralResponse::ResponseCode::OK);
}
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response);
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (_response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -1149,9 +1154,9 @@ void RestReplicationHandler::handleCommandInventory() {
// collections and indexes
std::shared_ptr<VPackBuilder> collectionsBuilder;
try {
collectionsBuilder = _vocbase->inventory(
tick, &filterCollection, (void*)&includeSystem, true,
RestReplicationHandler::sortCollections);
collectionsBuilder =
_vocbase->inventory(tick, &filterCollection, (void*)&includeSystem,
true, RestReplicationHandler::sortCollections);
VPackSlice const collections = collectionsBuilder->slice();
TRI_ASSERT(collections.isArray());
@ -1318,9 +1323,13 @@ int RestReplicationHandler::createCollection(VPackSlice const& slice,
TRI_ASSERT(params.doCompact() ==
arangodb::basics::VelocyPackHelper::getBooleanValue(
slice, "doCompact", true));
TRI_ASSERT(params.waitForSync() ==
arangodb::basics::VelocyPackHelper::getBooleanValue(
slice, "waitForSync", application_features::ApplicationServer::getFeature<DatabaseFeature>("Database")->waitForSync()));
TRI_ASSERT(
params.waitForSync() ==
arangodb::basics::VelocyPackHelper::getBooleanValue(
slice, "waitForSync",
application_features::ApplicationServer::getFeature<DatabaseFeature>(
"Database")
->waitForSync()));
TRI_ASSERT(params.isVolatile() ==
arangodb::basics::VelocyPackHelper::getBooleanValue(
slice, "isVolatile", false));
@ -2175,7 +2184,7 @@ int RestReplicationHandler::processRestoreDataBatch(
VPackBuilder builder;
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request);
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
if (httpRequest == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -2519,7 +2528,7 @@ void RestReplicationHandler::handleCommandRestoreDataCoordinator() {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request);
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
if (httpRequest == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -3204,7 +3213,7 @@ void RestReplicationHandler::handleCommandDump() {
}
// TODO needs to generalized
auto response = dynamic_cast<HttpResponse*>(_response);
auto response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
@ -3394,7 +3403,8 @@ void RestReplicationHandler::handleCommandMakeSlave() {
return;
}
res = TRI_ConfigureReplicationApplier(_vocbase->replicationApplier(), &config);
res =
TRI_ConfigureReplicationApplier(_vocbase->replicationApplier(), &config);
if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res);

View File

@ -283,7 +283,7 @@ void RestSimpleHandler::removeByKeys(VPackSlice const& slice) {
void RestSimpleHandler::lookupByKeys(VPackSlice const& slice) {
// TODO needs to generalized
auto response = dynamic_cast<HttpResponse*>(_response);
auto response = dynamic_cast<HttpResponse*>(_response.get());
if (response == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);

View File

@ -43,7 +43,8 @@ RestUploadHandler::~RestUploadHandler() {}
RestHandler::status RestUploadHandler::execute() {
// cast is ok because http requst is required
HttpRequest* request = dynamic_cast<HttpRequest*>(_request);
HttpRequest* request = dynamic_cast<HttpRequest*>(_request.get());
if (request == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -135,7 +136,8 @@ RestHandler::status RestUploadHandler::execute() {
bool RestUploadHandler::parseMultiPart(char const*& body, size_t& length) {
// cast is ok because http requst is required
HttpRequest* request = dynamic_cast<HttpRequest*>(_request);
HttpRequest* request = dynamic_cast<HttpRequest*>(_request.get());
if (request == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}

View File

@ -54,7 +54,7 @@ RestHandler::status WorkMonitorHandler::execute() {
return status::DONE;
}
WorkMonitor::requestWorkOverview(_taskId);
WorkMonitor::requestWorkOverview(this);
return status::ASYNC;
}

View File

@ -33,6 +33,6 @@ WorkMonitorFeature::WorkMonitorFeature(
requiresElevatedPrivileges(false);
}
void WorkMonitorFeature::start() { InitializeWorkMonitor(); }
void WorkMonitorFeature::start() { WorkMonitor::initialize(); }
void WorkMonitorFeature::unprepare() { ShutdownWorkMonitor(); }
void WorkMonitorFeature::unprepare() { WorkMonitor::shutdown(); }

View File

@ -41,7 +41,6 @@
#include "ApplicationFeatures/TempFeature.h"
#include "ApplicationFeatures/V8PlatformFeature.h"
#include "ApplicationFeatures/VersionFeature.h"
#include "ApplicationFeatures/WorkMonitorFeature.h"
#include "Basics/ArangoGlobalContext.h"
#include "Cluster/ClusterFeature.h"
#include "Dispatcher/DispatcherFeature.h"
@ -67,6 +66,7 @@
#include "RestServer/ServerIdFeature.h"
#include "RestServer/UnitTestsFeature.h"
#include "RestServer/UpgradeFeature.h"
#include "RestServer/WorkMonitorFeature.h"
#include "Scheduler/SchedulerFeature.h"
#include "Ssl/SslFeature.h"
#include "Ssl/SslServerFeature.h"

View File

@ -24,45 +24,32 @@
#include "SocketTask.h"
#include <errno.h>
#include "Basics/StringBuffer.h"
#include "Basics/socket-utils.h"
#include "Logger/Logger.h"
#include "Scheduler/Scheduler.h"
#include <errno.h>
using namespace arangodb::basics;
using namespace arangodb::rest;
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task with a given socket
////////////////////////////////////////////////////////////////////////////////
SocketTask::SocketTask(TRI_socket_t socket, double keepAliveTimeout)
SocketTask::SocketTask(TRI_socket_t socket, ConnectionInfo&& info,
double keepAliveTimeout)
: Task("SocketTask"),
_keepAliveWatcher(nullptr),
_readWatcher(nullptr),
_writeWatcher(nullptr),
_commSocket(socket),
_keepAliveTimeout(keepAliveTimeout),
_writeBuffer(nullptr),
_writeBufferStatistics(nullptr),
_writeLength(0),
_readBuffer(nullptr),
_clientClosed(false),
_tid(0) {
_commSocket(socket),
_connectionInfo(std::move(info)) {
LOG(TRACE) << "connection established, client "
<< TRI_get_fd_or_handle_of_socket(socket) << ", server ip "
<< _connectionInfo.serverAddress << ", server port "
<< _connectionInfo.serverPort << ", client ip "
<< _connectionInfo.clientAddress << ", client port "
<< _connectionInfo.clientPort;
_readBuffer = new StringBuffer(TRI_UNKNOWN_MEM_ZONE, false);
ConnectionStatisticsAgent::acquire();
connectionStatisticsAgentSetStart();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a socket task
///
/// This method will close the underlying socket.
////////////////////////////////////////////////////////////////////////////////
SocketTask::~SocketTask() {
if (TRI_isvalidsocket(_commSocket)) {
TRI_CLOSE_SOCKET(_commSocket);
@ -75,26 +62,62 @@ SocketTask::~SocketTask() {
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
}
for (auto& i : _writeBuffers) {
delete i;
}
for (auto& i : _writeBuffersStats) {
TRI_ReleaseRequestStatistics(i);
}
delete _readBuffer;
connectionStatisticsAgentSetEnd();
ConnectionStatisticsAgent::release();
LOG(TRACE) << "connection closed, client "
<< TRI_get_fd_or_handle_of_socket(_commSocket);
}
void SocketTask::setKeepAliveTimeout(double timeout) {
if (_keepAliveWatcher != nullptr && timeout > 0.0) {
_scheduler->rearmTimer(_keepAliveWatcher, timeout);
void SocketTask::armKeepAliveTimeout() {
if (_keepAliveWatcher != nullptr && _keepAliveTimeout > 0.0) {
_scheduler->rearmTimer(_keepAliveWatcher, _keepAliveTimeout);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief fills the read buffer
////////////////////////////////////////////////////////////////////////////////
bool SocketTask::handleRead() {
bool res = true;
if (_closed) {
return false;
}
if (!_closeRequested) {
res = fillReadBuffer();
// process as much data as we got; there might be more than one
// request in the buffer
while (processRead()) {
if (_closeRequested) {
break;
}
}
} else {
// if we don't close here, the scheduler thread may fall into a
// busy wait state, consuming 100% CPU!
_clientClosed = true;
}
if (_clientClosed) {
res = false;
} else if (!res) {
_clientClosed = true;
}
return res;
}
bool SocketTask::fillReadBuffer() {
// reserve some memory for reading
if (_readBuffer->reserve(READ_BLOCK_SIZE + 1) == TRI_ERROR_OUT_OF_MEMORY) {
_clientClosed = true;
LOG(TRACE) << "out of memory";
return false;
}
@ -110,7 +133,6 @@ bool SocketTask::fillReadBuffer() {
if (nr == 0) {
LOG(TRACE) << "read returned 0";
_clientClosed = true;
return false;
}
@ -129,7 +151,7 @@ bool SocketTask::fillReadBuffer() {
if (myerrno != EWOULDBLOCK && (EWOULDBLOCK == EAGAIN || myerrno != EAGAIN)) {
LOG(DEBUG) << "read from socket failed with " << myerrno << ": "
<< strerror(myerrno);
_clientClosed = true;
return false;
}
@ -149,10 +171,6 @@ bool SocketTask::fillReadBuffer() {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a write
////////////////////////////////////////////////////////////////////////////////
bool SocketTask::handleWrite() {
size_t len = 0;
@ -194,13 +212,10 @@ bool SocketTask::handleWrite() {
}
if (len == 0) {
delete _writeBuffer;
_writeBuffer = nullptr;
completedWriteBuffer();
// rearm timer for keep-alive timeout
setKeepAliveTimeout(_keepAliveTimeout);
armKeepAliveTimeout();
} else {
_writeLength += nr;
}
@ -219,52 +234,66 @@ bool SocketTask::handleWrite() {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets an active write buffer
////////////////////////////////////////////////////////////////////////////////
void SocketTask::addWriteBuffer(std::unique_ptr<basics::StringBuffer> buffer,
RequestStatisticsAgent* statistics) {
TRI_request_statistics_t* stat =
statistics == nullptr ? nullptr : statistics->steal();
void SocketTask::setWriteBuffer(StringBuffer* buffer,
TRI_request_statistics_t* statistics) {
TRI_ASSERT(buffer != nullptr);
addWriteBuffer(buffer.release(), stat);
}
_writeBufferStatistics = statistics;
if (_writeBufferStatistics != nullptr) {
_writeBufferStatistics->_writeStart = TRI_StatisticsTime();
_writeBufferStatistics->_sentBytes += buffer->length();
}
_writeLength = 0;
if (buffer->empty()) {
void SocketTask::addWriteBuffer(basics::StringBuffer* buffer,
TRI_request_statistics_t* stat) {
if (_closed) {
delete buffer;
completedWriteBuffer();
} else {
delete _writeBuffer;
if (stat) {
TRI_ReleaseRequestStatistics(stat);
}
_writeBuffer = buffer;
}
if (_clientClosed) {
return;
}
// we might have a new write buffer or none at all
TRI_ASSERT(_tid == Thread::currentThreadId());
if (_writeBuffer == nullptr) {
_scheduler->stopSocketEvents(_writeWatcher);
} else {
_scheduler->startSocketEvents(_writeWatcher);
if (_writeBuffer != nullptr) {
_writeBuffers.push_back(buffer);
_writeBuffersStats.push_back(stat);
return;
}
_writeBuffer = buffer;
_writeBufferStatistics = stat;
_scheduler->startSocketEvents(_writeWatcher);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks for presence of an active write buffer
////////////////////////////////////////////////////////////////////////////////
void SocketTask::completedWriteBuffer() {
delete _writeBuffer;
bool SocketTask::hasWriteBuffer() const { return _writeBuffer != nullptr; }
_writeBuffer = nullptr;
_writeLength = 0;
if (_writeBufferStatistics != nullptr) {
_writeBufferStatistics->_writeEnd = TRI_StatisticsTime();
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
_writeBufferStatistics = nullptr;
}
if (_writeBuffers.empty()) {
if (_closeRequested) {
_clientClosed = true;
}
return;
}
StringBuffer* buffer = _writeBuffers.front();
_writeBuffers.pop_front();
TRI_request_statistics_t* statistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
addWriteBuffer(buffer, statistics);
}
bool SocketTask::setup(Scheduler* scheduler, EventLoop loop) {
#ifdef _WIN32
@ -327,10 +356,6 @@ bool SocketTask::setup(Scheduler* scheduler, EventLoop loop) {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief cleans up the task by unregistering all watchers
////////////////////////////////////////////////////////////////////////////////
void SocketTask::cleanup() {
if (_scheduler != nullptr) {
if (_keepAliveWatcher != nullptr) {
@ -349,6 +374,10 @@ void SocketTask::cleanup() {
_keepAliveWatcher = nullptr;
_readWatcher = nullptr;
_writeWatcher = nullptr;
_closed = true;
_clientClosed = false;
_closeRequested = false;
}
bool SocketTask::handleEvent(EventToken token, EventType revents) {
@ -359,9 +388,9 @@ bool SocketTask::handleEvent(EventToken token, EventType revents) {
LOG(TRACE) << "got keep-alive timeout signal, closing connection";
_scheduler->clearTimer(token);
// this will close the connection and destroy the task
handleTimeout();
_scheduler->destroyTask(this);
return false;
}
@ -388,5 +417,9 @@ bool SocketTask::handleEvent(EventToken token, EventType revents) {
}
}
if (_clientClosed) {
_scheduler->destroyTask(this);
}
return result;
}

View File

@ -25,17 +25,11 @@
#ifndef ARANGOD_SCHEDULER_SOCKET_TASK_H
#define ARANGOD_SCHEDULER_SOCKET_TASK_H 1
#include "Basics/Common.h"
#include "Scheduler/Task.h"
#include "Basics/Thread.h"
#include "Statistics/StatisticsAgent.h"
#ifdef _WIN32
#include "Basics/win-utils.h"
#endif
#include "Basics/socket-utils.h"
namespace arangodb {
@ -45,116 +39,85 @@ class StringBuffer;
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @brief base class for input-output tasks from sockets
////////////////////////////////////////////////////////////////////////////////
class SocketTask : virtual public Task, public ConnectionStatisticsAgent {
private:
explicit SocketTask(SocketTask const&);
SocketTask& operator=(SocketTask const&);
explicit SocketTask(SocketTask const&) = delete;
SocketTask& operator=(SocketTask const&) = delete;
private:
static size_t const READ_BLOCK_SIZE = 10000;
public:
// @brief constructs a new task with a given socket
SocketTask(TRI_socket_t, double);
SocketTask(TRI_socket_t, ConnectionInfo&&, double);
protected:
// This method will close the underlying socket.
~SocketTask();
public:
// set a request timeout
void setKeepAliveTimeout(double);
void armKeepAliveTimeout();
protected:
//////////////////////////////////////////////////////////////////////////////
/// @brief fills the read buffer
///
/// The function should be called by the input task if the scheduler has
/// indicated that new data is available. It will return true, if data could
/// be read and false if the connection has been closed.
//////////////////////////////////////////////////////////////////////////////
virtual bool fillReadBuffer();
virtual bool handleRead() = 0; // called by handleEvent
virtual bool handleRead(); // called by handleEvent
virtual bool handleWrite(); // called by handleEvent
//////////////////////////////////////////////////////////////////////////////
/// @brief called if write buffer has been sent
///
/// This called is called if the current write buffer has been sent
/// completly to the client.
//////////////////////////////////////////////////////////////////////////////
virtual void completedWriteBuffer() = 0;
// handles a keep-alive timeout
virtual void handleTimeout() = 0;
// is called in a loop as long as it returns true.
// Return false if there is not enough data to do
// any more processing and all available data has
// been evaluated.
virtual bool processRead() = 0;
protected:
//////////////////////////////////////////////////////////////////////////////
/// @brief sets an active write buffer
//////////////////////////////////////////////////////////////////////////////
void addWriteBuffer(std::unique_ptr<basics::StringBuffer> buffer) {
addWriteBuffer(std::move(buffer), (RequestStatisticsAgent*)nullptr);
}
void setWriteBuffer(basics::StringBuffer*, TRI_request_statistics_t*);
void addWriteBuffer(std::unique_ptr<basics::StringBuffer>,
RequestStatisticsAgent*);
//////////////////////////////////////////////////////////////////////////////
/// @brief checks for presence of an active write buffer
//////////////////////////////////////////////////////////////////////////////
void addWriteBuffer(basics::StringBuffer*, TRI_request_statistics_t*);
bool hasWriteBuffer() const;
void completedWriteBuffer();
protected:
bool setup(Scheduler*, EventLoop) override;
/// @brief cleans up the task by unregistering all watchers
void cleanup() override;
// calls handleRead and handleWrite
bool handleEvent(EventToken token, EventType) override;
protected:
/// @brief event for keep-alive timeout
EventToken _keepAliveWatcher;
double const _keepAliveTimeout;
/// @brief event for read
EventToken _readWatcher;
protected:
// connection closed
bool _closed = false;
/// @brief event for write
EventToken _writeWatcher;
// client has closed the connection, immediately close the underlying socket
bool _clientClosed = false;
/// @brief communication socket
// the client has requested, close the connection after all data is written
bool _closeRequested = false;
protected:
TRI_socket_t _commSocket;
ConnectionInfo _connectionInfo;
/// @brief keep-alive timeout in seconds
double _keepAliveTimeout;
basics::StringBuffer* _readBuffer = nullptr;
/// @brief the current write buffer
basics::StringBuffer* _writeBuffer;
basics::StringBuffer* _writeBuffer = nullptr;
TRI_request_statistics_t* _writeBufferStatistics = nullptr;
/// @brief the current write buffer statistics
TRI_request_statistics_t* _writeBufferStatistics;
std::deque<basics::StringBuffer*> _writeBuffers;
std::deque<TRI_request_statistics_t*> _writeBuffersStats;
/// @brief number of bytes already written
size_t _writeLength;
EventToken _keepAliveWatcher = nullptr;
EventToken _readWatcher = nullptr;
EventToken _writeWatcher = nullptr;
/// @brief read buffer
/// The function fillReadBuffer stores the data in this buffer.
basics::StringBuffer* _readBuffer;
size_t _writeLength = 0;
/// @brief client has closed the connection
bool _clientClosed;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief current thread identifier
//////////////////////////////////////////////////////////////////////////////
TRI_tid_t _tid;
TRI_tid_t _tid = 0;
};
}
}

View File

@ -32,10 +32,6 @@
#include "Statistics/StatisticsAgent.h"
namespace arangodb {
namespace velocypack {
class Builder;
}
namespace rest {
class Scheduler;
@ -43,7 +39,6 @@ class TaskData : public RequestStatisticsAgent {
public:
static uint64_t const TASK_DATA_RESPONSE = 1000;
static uint64_t const TASK_DATA_CHUNK = 1001;
static uint64_t const TASK_DATA_BUFFER = 1002;
public:
uint64_t _taskId;
@ -51,7 +46,6 @@ class TaskData : public RequestStatisticsAgent {
uint64_t _type;
std::string _data;
std::unique_ptr<GeneralResponse> _response;
std::shared_ptr<velocypack::Buffer<uint8_t>> _buffer;
};
////////////////////////////////////////////////////////////////////////////////

View File

@ -31,27 +31,14 @@
namespace arangodb {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @brief request statistics agent
////////////////////////////////////////////////////////////////////////////////
template <typename STAT, typename FUNC>
class StatisticsAgent {
StatisticsAgent(StatisticsAgent const&) = delete;
StatisticsAgent& operator=(StatisticsAgent const&) = delete;
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new agent
//////////////////////////////////////////////////////////////////////////////
StatisticsAgent() : _statistics(nullptr), _lastReadStart(0.0) {}
//////////////////////////////////////////////////////////////////////////////
/// @brief destructs an agent
//////////////////////////////////////////////////////////////////////////////
~StatisticsAgent() {
if (_statistics != nullptr) {
FUNC::release(_statistics);
@ -59,10 +46,6 @@ class StatisticsAgent {
}
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief acquires a new statistics block
//////////////////////////////////////////////////////////////////////////////
STAT* acquire() {
if (_statistics != nullptr) {
return _statistics;
@ -72,10 +55,6 @@ class StatisticsAgent {
return _statistics = FUNC::acquire();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief releases a statistics block
//////////////////////////////////////////////////////////////////////////////
void release() {
if (_statistics != nullptr) {
FUNC::release(_statistics);
@ -83,19 +62,11 @@ class StatisticsAgent {
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief transfers statistics information to another agent
//////////////////////////////////////////////////////////////////////////////
void transferTo(StatisticsAgent* agent) {
agent->replace(_statistics);
_statistics = nullptr;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief transfers statistics information
//////////////////////////////////////////////////////////////////////////////
STAT* steal() {
STAT* statistics = _statistics;
_statistics = nullptr;
@ -103,10 +74,6 @@ class StatisticsAgent {
return statistics;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief returns the time elapsed since read started
//////////////////////////////////////////////////////////////////////////////
double elapsedSinceReadStart() {
if (_lastReadStart != 0.0) {
return TRI_StatisticsTime() - _lastReadStart;
@ -116,23 +83,10 @@ class StatisticsAgent {
}
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief statistics
//////////////////////////////////////////////////////////////////////////////
STAT* _statistics;
//////////////////////////////////////////////////////////////////////////////
/// @brief last read
//////////////////////////////////////////////////////////////////////////////
double _lastReadStart;
protected:
//////////////////////////////////////////////////////////////////////////////
/// @brief replaces a statistics block
//////////////////////////////////////////////////////////////////////////////
void replace(STAT* statistics) {
if (_statistics != nullptr) {
FUNC::release(_statistics);
@ -141,15 +95,6 @@ class StatisticsAgent {
_statistics = statistics;
}
};
}
}
namespace arangodb {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @brief request statistics description
////////////////////////////////////////////////////////////////////////////////
struct RequestStatisticsAgentDesc {
static TRI_request_statistics_t* acquire() {
@ -161,18 +106,10 @@ struct RequestStatisticsAgentDesc {
}
};
////////////////////////////////////////////////////////////////////////////////
/// @brief request statistics agent
////////////////////////////////////////////////////////////////////////////////
class RequestStatisticsAgent
: public StatisticsAgent<TRI_request_statistics_t,
RequestStatisticsAgentDesc> {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the request type
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetRequestType(GeneralRequest::RequestType b) {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -181,10 +118,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @Brief sets the async flag
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetAsync() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -193,10 +126,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the read start
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetReadStart() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr && _statistics->_readStart == 0.0) {
@ -205,10 +134,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the read end
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetReadEnd() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -217,10 +142,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the write start
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetWriteStart() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -229,10 +150,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the write end
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetWriteEnd() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -241,10 +158,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the queue start
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetQueueStart() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -253,10 +166,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the queue end
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetQueueEnd() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -265,10 +174,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the request start
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetRequestStart() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -277,10 +182,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the request end
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetRequestEnd() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -289,10 +190,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets execution error
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetExecuteError() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -301,10 +198,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets ignore flag
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentSetIgnore() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -313,10 +206,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief adds bytes received
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentAddReceivedBytes(size_t b) {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -325,10 +214,6 @@ class RequestStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief adds bytes sent
//////////////////////////////////////////////////////////////////////////////
void requestStatisticsAgentAddSentBytes(size_t b) {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -337,15 +222,6 @@ class RequestStatisticsAgent
}
}
};
}
}
namespace arangodb {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @brief connection statistics description
////////////////////////////////////////////////////////////////////////////////
struct ConnectionStatisticsAgentDesc {
static TRI_connection_statistics_t* acquire() {
@ -357,18 +233,21 @@ struct ConnectionStatisticsAgentDesc {
}
};
////////////////////////////////////////////////////////////////////////////////
/// @brief connection statistics agent
////////////////////////////////////////////////////////////////////////////////
class ConnectionStatisticsAgent
: public StatisticsAgent<TRI_connection_statistics_t,
ConnectionStatisticsAgentDesc> {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the connection type
//////////////////////////////////////////////////////////////////////////////
ConnectionStatisticsAgent() {
acquire();
connectionStatisticsAgentSetStart();
}
virtual ~ConnectionStatisticsAgent() {
connectionStatisticsAgentSetEnd();
release();
}
public:
void connectionStatisticsAgentSetHttp() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -378,10 +257,6 @@ class ConnectionStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the connection start
//////////////////////////////////////////////////////////////////////////////
void connectionStatisticsAgentSetStart() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {
@ -390,10 +265,6 @@ class ConnectionStatisticsAgent
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the connection end
//////////////////////////////////////////////////////////////////////////////
void connectionStatisticsAgentSetEnd() {
if (StatisticsFeature::enabled()) {
if (_statistics != nullptr) {

View File

@ -40,9 +40,97 @@
using namespace arangodb;
using namespace arangodb::rest;
////////////////////////////////////////////////////////////////////////////////
/// @brief pushes a handler
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::run() {
uint32_t const maxSleep = 100 * 1000;
uint32_t const minSleep = 100;
uint32_t s = minSleep;
// clean old entries and create summary if requested
while (!isStopping()) {
try {
bool found = false;
WorkDescription* desc;
while (freeableWorkDescription.pop(desc)) {
found = true;
if (desc != nullptr) {
deleteWorkDescription(desc, false);
}
}
if (found) {
s = minSleep;
} else if (s < maxSleep) {
s *= 2;
}
{
MUTEX_LOCKER(guard, cancelLock);
if (!cancelIds.empty()) {
for (auto thread : threads) {
cancelWorkDescriptions(thread);
}
cancelIds.clear();
}
}
rest::RestHandler* handler;
while (workOverview.pop(handler)) {
VPackBuilder builder;
builder.add(VPackValue(VPackValueType::Object));
builder.add("time", VPackValue(TRI_microtime()));
builder.add("work", VPackValue(VPackValueType::Array));
{
MUTEX_LOCKER(guard, threadsLock);
for (auto& thread : threads) {
WorkDescription* desc = thread->workDescription();
if (desc != nullptr) {
builder.add(VPackValue(VPackValueType::Object));
vpackWorkDescription(&builder, desc);
builder.close();
}
}
}
builder.close();
builder.close();
sendWorkOverview(handler, builder.steal());
}
} catch (...) {
// must prevent propagation of exceptions from here
}
usleep(s);
}
// indicate that we stopped the work monitor, freeWorkDescription
// should directly delete old entries
stopped.store(true);
// cleanup old entries
WorkDescription* desc;
while (freeableWorkDescription.pop(desc)) {
if (desc != nullptr) {
deleteWorkDescription(desc, false);
}
}
while (emptyWorkDescription.pop(desc)) {
if (desc != nullptr) {
delete desc;
}
}
}
void WorkMonitor::pushHandler(RestHandler* handler) {
TRI_ASSERT(handler != nullptr);
@ -53,10 +141,6 @@ void WorkMonitor::pushHandler(RestHandler* handler) {
activateWorkDescription(desc);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pops and releases a handler
////////////////////////////////////////////////////////////////////////////////
WorkDescription* WorkMonitor::popHandler(RestHandler* handler, bool free) {
WorkDescription* desc = deactivateWorkDescription();
@ -77,10 +161,6 @@ WorkDescription* WorkMonitor::popHandler(RestHandler* handler, bool free) {
return desc;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief cancels a query
////////////////////////////////////////////////////////////////////////////////
bool WorkMonitor::cancelAql(WorkDescription* desc) {
auto type = desc->_type;
@ -108,20 +188,12 @@ bool WorkMonitor::cancelAql(WorkDescription* desc) {
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handler deleter
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::deleteHandler(WorkDescription* desc) {
TRI_ASSERT(desc->_type == WorkType::HANDLER);
WorkItem::deleter()((WorkItem*)desc->_data.handler);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief thread description
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::vpackHandler(VPackBuilder* b, WorkDescription* desc) {
RestHandler* handler = desc->_data.handler;
GeneralRequest const* request = handler->request();
@ -159,22 +231,30 @@ void WorkMonitor::vpackHandler(VPackBuilder* b, WorkDescription* desc) {
}
void WorkMonitor::sendWorkOverview(
uint64_t taskId, std::shared_ptr<velocypack::Buffer<uint8_t>> buffer) {
auto answer = std::make_unique<TaskData>();
RestHandler* handler, std::shared_ptr<velocypack::Buffer<uint8_t>> buffer) {
auto response = handler->response();
answer->_taskId = taskId;
answer->_loop = SchedulerFeature::SCHEDULER->lookupLoopById(taskId);
answer->_type = TaskData::TASK_DATA_BUFFER;
answer->_buffer = buffer;
velocypack::Slice slice(buffer->data());
response->setResponseCode(GeneralResponse::ResponseCode::OK);
response->setPayload(slice, true, VPackOptions::Defaults);
SchedulerFeature::SCHEDULER->signalTask(answer);
auto data = std::make_unique<TaskData>();
data->_taskId = handler->taskId();
data->_loop = handler->eventLoop();
data->_type = TaskData::TASK_DATA_RESPONSE;
data->_response = handler->stealResponse();
SchedulerFeature::SCHEDULER->signalTask(data);
delete static_cast<WorkItem*>(handler);
}
HandlerWorkStack::HandlerWorkStack(RestHandler* handler) : _handler(handler) {
WorkMonitor::pushHandler(_handler);
}
HandlerWorkStack::HandlerWorkStack(WorkItem::uptr<RestHandler>& handler) {
HandlerWorkStack::HandlerWorkStack(WorkItem::uptr<RestHandler> handler) {
_handler = handler.release();
WorkMonitor::pushHandler(_handler);
}

View File

@ -25,27 +25,15 @@
#define ARANGODB_BASICS_WORK_ITEM_H 1
namespace arangodb {
////////////////////////////////////////////////////////////////////////////////
/// @brief allow unique_ptr to call protected destructor
////////////////////////////////////////////////////////////////////////////////
class WorkItem {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief unique_ptr_deleter
//////////////////////////////////////////////////////////////////////////////
friend class WorkMonitor;
public:
struct deleter {
deleter() = default;
void operator()(WorkItem* ptr) const { delete ptr; }
};
//////////////////////////////////////////////////////////////////////////////
/// @brief unique_ptr
//////////////////////////////////////////////////////////////////////////////
template <typename X>
using uptr = std::unique_ptr<X, deleter>;

View File

@ -28,53 +28,23 @@
#include <velocypack/velocypack-aliases.h>
#include "Logger/Logger.h"
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/tri-strings.h"
#include <boost/lockfree/queue.hpp>
using namespace arangodb;
using namespace arangodb::basics;
////////////////////////////////////////////////////////////////////////////////
/// @brief list of free descriptions
////////////////////////////////////////////////////////////////////////////////
std::atomic<bool> WorkMonitor::stopped(true);
static boost::lockfree::queue<WorkDescription*> EMPTY_WORK_DESCRIPTION(128);
boost::lockfree::queue<WorkDescription*> WorkMonitor::emptyWorkDescription(128);
boost::lockfree::queue<WorkDescription*> WorkMonitor::freeableWorkDescription(128);
boost::lockfree::queue<rest::RestHandler*> WorkMonitor::workOverview(128);
////////////////////////////////////////////////////////////////////////////////
/// @brief list of freeable descriptions
////////////////////////////////////////////////////////////////////////////////
Mutex WorkMonitor::cancelLock;
std::set<uint64_t> WorkMonitor::cancelIds;
static boost::lockfree::queue<WorkDescription*> FREEABLE_WORK_DESCRIPTION(128);
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks that want an overview
////////////////////////////////////////////////////////////////////////////////
static boost::lockfree::queue<uint64_t> WORK_OVERVIEW(128);
////////////////////////////////////////////////////////////////////////////////
/// @brief stopped flag
////////////////////////////////////////////////////////////////////////////////
static std::atomic<bool> WORK_MONITOR_STOPPED(true);
////////////////////////////////////////////////////////////////////////////////
/// @brief guard for THREADS
///
/// The order in this file must be: WORK_DESCRIPTION, THREADS_LOCK, THREADS,
/// WORK_MONITOR.
////////////////////////////////////////////////////////////////////////////////
static Mutex THREADS_LOCK;
////////////////////////////////////////////////////////////////////////////////
/// @brief all known threads
////////////////////////////////////////////////////////////////////////////////
static std::set<Thread*> THREADS;
Mutex WorkMonitor::threadsLock;
std::set<Thread*> WorkMonitor::threads;
////////////////////////////////////////////////////////////////////////////////
/// @brief singleton
@ -82,18 +52,6 @@ static std::set<Thread*> THREADS;
static WorkMonitor WORK_MONITOR;
////////////////////////////////////////////////////////////////////////////////
/// @brief lock for canceled ids
////////////////////////////////////////////////////////////////////////////////
static Mutex CANCEL_LOCK;
////////////////////////////////////////////////////////////////////////////////
/// @brief list of canceled ids
////////////////////////////////////////////////////////////////////////////////
static std::set<uint64_t> CANCEL_IDS;
////////////////////////////////////////////////////////////////////////////////
/// @brief current work description as thread local variable
////////////////////////////////////////////////////////////////////////////////
@ -118,10 +76,10 @@ WorkMonitor::WorkMonitor() : Thread("WorkMonitor") {}
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::freeWorkDescription(WorkDescription* desc) {
if (WORK_MONITOR_STOPPED.load()) {
if (stopped.load()) {
deleteWorkDescription(desc, true);
} else {
FREEABLE_WORK_DESCRIPTION.push(desc);
freeableWorkDescription.push(desc);
}
}
@ -130,7 +88,7 @@ void WorkMonitor::freeWorkDescription(WorkDescription* desc) {
////////////////////////////////////////////////////////////////////////////////
bool WorkMonitor::pushThread(Thread* thread) {
if (WORK_MONITOR_STOPPED.load()) {
if (stopped.load()) {
return false;
}
@ -145,8 +103,8 @@ bool WorkMonitor::pushThread(Thread* thread) {
activateWorkDescription(desc);
{
MUTEX_LOCKER(guard, THREADS_LOCK);
THREADS.insert(thread);
MUTEX_LOCKER(guard, threadsLock);
threads.insert(thread);
}
} catch (...) {
throw;
@ -160,7 +118,7 @@ bool WorkMonitor::pushThread(Thread* thread) {
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::popThread(Thread* thread) {
if (WORK_MONITOR_STOPPED.load()) {
if (stopped.load()) {
return;
}
@ -174,8 +132,8 @@ void WorkMonitor::popThread(Thread* thread) {
freeWorkDescription(desc);
{
MUTEX_LOCKER(guard, THREADS_LOCK);
THREADS.erase(thread);
MUTEX_LOCKER(guard, threadsLock);
threads.erase(thread);
}
} catch (...) {
// just to prevent throwing exceptions from here, as this method
@ -183,10 +141,6 @@ void WorkMonitor::popThread(Thread* thread) {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pushes a custom task
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::pushAql(TRI_vocbase_t* vocbase, uint64_t queryId,
char const* text, size_t length) {
TRI_ASSERT(vocbase != nullptr);
@ -207,10 +161,6 @@ void WorkMonitor::pushAql(TRI_vocbase_t* vocbase, uint64_t queryId,
activateWorkDescription(desc);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pushes a custom task
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::pushAql(TRI_vocbase_t* vocbase, uint64_t queryId) {
TRI_ASSERT(vocbase != nullptr);
@ -223,10 +173,6 @@ void WorkMonitor::pushAql(TRI_vocbase_t* vocbase, uint64_t queryId) {
activateWorkDescription(desc);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pops a custom task
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::popAql() {
WorkDescription* desc = deactivateWorkDescription();
@ -242,10 +188,6 @@ void WorkMonitor::popAql() {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pushes a custom task
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::pushCustom(char const* type, char const* text,
size_t length) {
TRI_ASSERT(type != nullptr);
@ -266,10 +208,6 @@ void WorkMonitor::pushCustom(char const* type, char const* text,
activateWorkDescription(desc);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pushes a custom task
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::pushCustom(char const* type, uint64_t id) {
TRI_ASSERT(type != nullptr);
@ -285,10 +223,6 @@ void WorkMonitor::pushCustom(char const* type, uint64_t id) {
activateWorkDescription(desc);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief pops a custom task
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::popCustom() {
WorkDescription* desc = deactivateWorkDescription();
@ -303,117 +237,13 @@ void WorkMonitor::popCustom() {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief requests a work overview
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::requestWorkOverview(uint64_t taskId) {
WORK_OVERVIEW.push(taskId);
void WorkMonitor::requestWorkOverview(rest::RestHandler* handler) {
workOverview.push(handler);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief requests cancel of work
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::cancelWork(uint64_t id) {
MUTEX_LOCKER(guard, CANCEL_LOCK);
CANCEL_IDS.insert(id);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief the main event loop, wait for requests and delete old descriptions
////////////////////////////////////////////////////////////////////////////////
void WorkMonitor::run() {
uint32_t const maxSleep = 100 * 1000;
uint32_t const minSleep = 100;
uint32_t s = minSleep;
// clean old entries and create summary if requested
while (!isStopping()) {
try {
bool found = false;
WorkDescription* desc;
while (FREEABLE_WORK_DESCRIPTION.pop(desc)) {
found = true;
if (desc != nullptr) {
deleteWorkDescription(desc, false);
}
}
if (found) {
s = minSleep;
} else if (s < maxSleep) {
s *= 2;
}
uint64_t taskId;
{
MUTEX_LOCKER(guard, CANCEL_LOCK);
if (!CANCEL_IDS.empty()) {
for (auto thread : THREADS) {
cancelWorkDescriptions(thread);
}
CANCEL_IDS.clear();
}
}
while (WORK_OVERVIEW.pop(taskId)) {
VPackBuilder builder;
builder.add(VPackValue(VPackValueType::Object));
builder.add("time", VPackValue(TRI_microtime()));
builder.add("work", VPackValue(VPackValueType::Array));
{
MUTEX_LOCKER(guard, THREADS_LOCK);
for (auto& thread : THREADS) {
WorkDescription* desc = thread->workDescription();
if (desc != nullptr) {
builder.add(VPackValue(VPackValueType::Object));
vpackWorkDescription(&builder, desc);
builder.close();
}
}
}
builder.close();
builder.close();
sendWorkOverview(taskId, builder.steal());
}
} catch (...) {
// must prevent propagation of exceptions from here
}
usleep(s);
}
// indicate that we stopped the work monitor, freeWorkDescription
// should directly delete old entries
WORK_MONITOR_STOPPED.store(true);
// cleanup old entries
WorkDescription* desc;
while (FREEABLE_WORK_DESCRIPTION.pop(desc)) {
if (desc != nullptr) {
deleteWorkDescription(desc, false);
}
}
while (EMPTY_WORK_DESCRIPTION.pop(desc)) {
if (desc != nullptr) {
delete desc;
}
}
MUTEX_LOCKER(guard, cancelLock);
cancelIds.insert(id);
}
WorkDescription* WorkMonitor::createWorkDescription(WorkType type) {
@ -422,7 +252,7 @@ WorkDescription* WorkMonitor::createWorkDescription(WorkType type) {
? CURRENT_WORK_DESCRIPTION
: Thread::CURRENT_THREAD->workDescription();
if (EMPTY_WORK_DESCRIPTION.pop(desc) && desc != nullptr) {
if (emptyWorkDescription.pop(desc) && desc != nullptr) {
desc->_type = type;
desc->_prev.store(prev);
desc->_destroy = true;
@ -461,7 +291,7 @@ void WorkMonitor::deleteWorkDescription(WorkDescription* desc, bool stopped) {
// while the work monitor thread is still active, push the item on the
// work monitor's cleanup list for destruction
EMPTY_WORK_DESCRIPTION.push(desc);
emptyWorkDescription.push(desc);
}
void WorkMonitor::activateWorkDescription(WorkDescription* desc) {
@ -536,7 +366,7 @@ void WorkMonitor::cancelWorkDescriptions(Thread* thread) {
uint64_t id = desc->_id;
if (CANCEL_IDS.find(id) != CANCEL_IDS.end()) {
if (cancelIds.find(id) != cancelIds.end()) {
for (auto it = path.rbegin(); it < path.rend(); ++it) {
bool descent = true;
WorkDescription* d = *it;
@ -566,6 +396,16 @@ void WorkMonitor::cancelWorkDescriptions(Thread* thread) {
}
}
void WorkMonitor::initialize() {
stopped.store(false);
WORK_MONITOR.start();
}
void WorkMonitor::shutdown() {
stopped.store(true);
WORK_MONITOR.beginShutdown();
}
AqlWorkStack::AqlWorkStack(TRI_vocbase_t* vocbase, uint64_t queryId,
char const* text, size_t length) {
WorkMonitor::pushAql(vocbase, queryId, text, length);
@ -587,21 +427,3 @@ CustomWorkStack::CustomWorkStack(char const* type, uint64_t id) {
}
CustomWorkStack::~CustomWorkStack() { WorkMonitor::popCustom(); }
////////////////////////////////////////////////////////////////////////////////
/// @brief starts the work monitor
////////////////////////////////////////////////////////////////////////////////
void arangodb::InitializeWorkMonitor() {
WORK_MONITOR_STOPPED.store(false);
WORK_MONITOR.start();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the work monitor
////////////////////////////////////////////////////////////////////////////////
void arangodb::ShutdownWorkMonitor() {
WORK_MONITOR_STOPPED.store(true);
WORK_MONITOR.beginShutdown();
}

View File

@ -26,10 +26,13 @@
#include "Basics/Thread.h"
#include <velocypack/Builder.h>
#include <boost/lockfree/queue.hpp>
#include <velocypack/Buffer.h>
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include "Basics/Mutex.h"
#include "Basics/WorkDescription.h"
#include "Basics/WorkItem.h"
@ -59,14 +62,18 @@ class WorkMonitor : public Thread {
static void popCustom();
static void pushHandler(rest::RestHandler*);
static WorkDescription* popHandler(rest::RestHandler*, bool free);
static void requestWorkOverview(uint64_t taskId);
static void requestWorkOverview(rest::RestHandler* handler);
static void cancelWork(uint64_t id);
public:
static void initialize();
static void shutdown();
protected:
void run() override;
private:
static void sendWorkOverview(uint64_t,
static void sendWorkOverview(rest::RestHandler*,
std::shared_ptr<velocypack::Buffer<uint8_t>>);
static bool cancelAql(WorkDescription*);
static void deleteHandler(WorkDescription* desc);
@ -78,42 +85,39 @@ class WorkMonitor : public Thread {
static WorkDescription* deactivateWorkDescription();
static void vpackWorkDescription(VPackBuilder*, WorkDescription*);
static void cancelWorkDescriptions(Thread* thread);
};
////////////////////////////////////////////////////////////////////////////////
/// @brief auto push and pop for RestHandler
////////////////////////////////////////////////////////////////////////////////
private:
static std::atomic<bool> stopped;
static boost::lockfree::queue<WorkDescription*> emptyWorkDescription;
static boost::lockfree::queue<WorkDescription*> freeableWorkDescription;
static boost::lockfree::queue<rest::RestHandler*> workOverview;
static Mutex cancelLock;
static std::set<uint64_t> cancelIds;
static Mutex threadsLock;
static std::set<Thread*> threads;
};
class HandlerWorkStack {
HandlerWorkStack(const HandlerWorkStack&) = delete;
HandlerWorkStack& operator=(const HandlerWorkStack&) = delete;
public:
// TODO(fc) remove the pointer version
explicit HandlerWorkStack(rest::RestHandler*);
explicit HandlerWorkStack(WorkItem::uptr<rest::RestHandler>&);
explicit HandlerWorkStack(WorkItem::uptr<rest::RestHandler>);
~HandlerWorkStack();
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief returns the handler
//////////////////////////////////////////////////////////////////////////////
rest::RestHandler* handler() const { return _handler; }
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief handler
//////////////////////////////////////////////////////////////////////////////
rest::RestHandler* _handler;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief auto push and pop for Aql
////////////////////////////////////////////////////////////////////////////////
class AqlWorkStack {
AqlWorkStack(const AqlWorkStack&) = delete;
AqlWorkStack& operator=(const AqlWorkStack&) = delete;
@ -126,10 +130,6 @@ class AqlWorkStack {
~AqlWorkStack();
};
////////////////////////////////////////////////////////////////////////////////
/// @brief auto push and pop for Custom
////////////////////////////////////////////////////////////////////////////////
class CustomWorkStack {
CustomWorkStack(const CustomWorkStack&) = delete;
CustomWorkStack& operator=(const CustomWorkStack&) = delete;
@ -141,18 +141,5 @@ class CustomWorkStack {
~CustomWorkStack();
};
////////////////////////////////////////////////////////////////////////////////
/// @brief starts the work monitor
////////////////////////////////////////////////////////////////////////////////
void InitializeWorkMonitor();
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the work monitor
////////////////////////////////////////////////////////////////////////////////
void ShutdownWorkMonitor();
}
#endif

View File

@ -26,6 +26,8 @@
using namespace arangodb;
using namespace arangodb::velocypack;
void WorkMonitor::run() { TRI_ASSERT(false); }
bool WorkMonitor::cancelAql(WorkDescription* desc) {
TRI_ASSERT(false);
return true;
@ -38,6 +40,7 @@ void WorkMonitor::vpackHandler(arangodb::velocypack::Builder*,
TRI_ASSERT(false);
}
void WorkMonitor::sendWorkOverview(uint64_t, std::shared_ptr<Buffer<uint8_t>>) {
void WorkMonitor::sendWorkOverview(rest::RestHandler*,
std::shared_ptr<Buffer<uint8_t>>) {
TRI_ASSERT(false);
}

View File

@ -133,7 +133,6 @@ add_library(${LIB_ARANGO} STATIC
ApplicationFeatures/TempFeature.cpp
ApplicationFeatures/V8PlatformFeature.cpp
ApplicationFeatures/VersionFeature.cpp
ApplicationFeatures/WorkMonitorFeature.cpp
Basics/ArangoGlobalContext.cpp
Basics/AttributeNameParser.cpp
Basics/Barrier.cpp

View File

@ -31,8 +31,9 @@ namespace meta {
namespace details {
template <typename E>
using enable_enum_t = typename std::enable_if<
std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
using enable_enum_t =
typename std::enable_if<std::is_enum<E>::value,
typename std::underlying_type<E>::type>::type;
}
template <typename E>

View File

@ -25,14 +25,14 @@
#ifndef ARANGODB_REST_VPP_REQUEST_H
#define ARANGODB_REST_VPP_REQUEST_H 1
#include "Endpoint/ConnectionInfo.h"
#include "Rest/GeneralRequest.h"
#include "Rest/VppMessage.h"
#include "Endpoint/ConnectionInfo.h"
#include <velocypack/Buffer.h>
#include <velocypack/Builder.h>
#include <velocypack/Dumper.h>
#include <velocypack/Options.h>
#include <velocypack/Buffer.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
@ -66,7 +66,7 @@ class VppRequest : public GeneralRequest {
~VppRequest() {}
public:
virtual uint64_t messageId() { return _messageId; }
uint64_t messageId() override { return _messageId; }
VPackSlice payload(arangodb::velocypack::Options const*) override;
int64_t contentLength() const override {