1
0
Fork 0

Bug fix/remove io task (#9056)

This commit is contained in:
Jan 2019-05-22 14:34:49 +02:00 committed by GitHub
parent 0e24c18253
commit 79258e072a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 80 additions and 154 deletions

View File

@ -363,7 +363,6 @@ SET(ARANGOD_SOURCES
GeneralServer/GeneralServer.cpp
GeneralServer/GeneralServerFeature.cpp
GeneralServer/HttpCommTask.cpp
GeneralServer/IoTask.cpp
GeneralServer/ListenTask.cpp
GeneralServer/RestHandler.cpp
GeneralServer/RestHandlerFactory.cpp

View File

@ -37,8 +37,7 @@ FreeMemoryTask::~FreeMemoryTask() {}
bool FreeMemoryTask::dispatch() {
_manager->prepareTask(_environment);
auto self = shared_from_this();
return _manager->post([self, this]() -> void { run(); });
return _manager->post([self = shared_from_this()]() -> void { self->run(); });
}
void FreeMemoryTask::run() {
@ -67,8 +66,7 @@ MigrateTask::~MigrateTask() {}
bool MigrateTask::dispatch() {
_manager->prepareTask(_environment);
auto self = shared_from_this();
return _manager->post([self, this]() -> void { run(); });
return _manager->post([self = shared_from_this()]() -> void { self->run(); });
}
void MigrateTask::run() {

View File

@ -458,8 +458,9 @@ bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
auto const lane = handler->getRequestLane();
bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), this, handler]() {
handleRequestDirectly(basics::ConditionalLocking::DoLock, handler);
bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self = shared_from_this(), handler]() {
auto thisPtr = static_cast<GeneralCommTask*>(self.get());
thisPtr->handleRequestDirectly(basics::ConditionalLocking::DoLock, handler);
});
if (!ok) {
@ -479,11 +480,15 @@ void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptr<RestHan
return;
}
handler->runHandler([self = shared_from_this(), this](rest::RestHandler* handler) {
handler->runHandler([self = shared_from_this()](rest::RestHandler* handler) {
auto thisPtr = static_cast<GeneralCommTask*>(self.get());
RequestStatistics* stat = handler->stealStatistics();
auto h = handler->shared_from_this();
// Pass the response the io context
_peer->post([self, this, stat, h = std::move(h)]() { addResponse(*(h->response()), stat); });
thisPtr->_peer->post([self, stat, h = std::move(h)]() {
auto thisPtr = static_cast<GeneralCommTask*>(self.get());
thisPtr->addResponse(*(h->response()), stat);
});
});
}

View File

@ -39,7 +39,7 @@ using namespace arangodb::rest;
GeneralListenTask::GeneralListenTask(GeneralServer& server, GeneralServer::IoContext& context,
Endpoint* endpoint, ProtocolType connectionType)
: ListenTask(server, context, "GeneralListenTask", endpoint),
: ListenTask(server, context, endpoint),
_connectionType(connectionType) {
_keepAliveTimeout = GeneralServerFeature::keepAliveTimeout();

View File

@ -1,36 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#include "IoTask.h"
using namespace arangodb::rest;
namespace {
std::atomic_uint_fast64_t NEXT_IO_TASK_ID(static_cast<uint64_t>(TRI_microtime() * 100000.0));
}
IoTask::IoTask(GeneralServer& server,
GeneralServer::IoContext& context,
char const* name)
: _context(context), _server(server), _taskId(++NEXT_IO_TASK_ID), _name(name) {}

View File

@ -1,63 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_SCHEDULER_IO_TASK_H
#define ARANGOD_SCHEDULER_IO_TASK_H 1
#include "Basics/Common.h"
#include "GeneralServer/GeneralServer.h"
namespace arangodb {
namespace rest {
class IoTask : public std::enable_shared_from_this<IoTask> {
IoTask(IoTask const&) = delete;
IoTask& operator=(IoTask const&) = delete;
public:
IoTask(GeneralServer& server,
GeneralServer::IoContext&,
char const* name);
virtual ~IoTask() = default;
public:
// doesn't seem to be called right now, but can be used for debugging
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
char const* name() const { return _name; }
#endif
uint64_t id() const { return _taskId; }
protected:
GeneralServer::IoContext& _context;
GeneralServer& _server;
uint64_t const _taskId;
private:
char const* _name;
};
} // namespace rest
} // namespace arangodb
#endif

View File

@ -27,6 +27,7 @@
#include "Basics/MutexLocker.h"
#include "GeneralServer/Acceptor.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/Socket.h"
#include "Logger/Logger.h"
using namespace arangodb;
@ -38,10 +39,11 @@ using namespace arangodb::rest;
ListenTask::ListenTask(GeneralServer& server,
GeneralServer::IoContext& context,
char const* name,
Endpoint* endpoint)
: IoTask(server, context, name),
: _server(server),
_context(context),
_endpoint(endpoint),
_acceptFailures(0),
_bound(false),
_acceptor(Acceptor::factory(server, context, endpoint)) {}

View File

@ -26,27 +26,26 @@
#define ARANGOD_SCHEDULER_LISTEN_TASK_H 1
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/IoTask.h"
#include "GeneralServer/Task.h"
#include "Basics/Mutex.h"
#include "Endpoint/ConnectionInfo.h"
#include "Endpoint/Endpoint.h"
#include "GeneralServer/Acceptor.h"
#include "GeneralServer/Socket.h"
#include "GeneralServer/GeneralServer.h"
namespace arangodb {
class Socket;
class ListenTask : public rest::IoTask {
class ListenTask : public std::enable_shared_from_this<ListenTask> {
public:
static size_t const MAX_ACCEPT_ERRORS = 128;
public:
ListenTask(rest::GeneralServer& server,
rest::GeneralServer::IoContext&,
char const* name,
Endpoint*);
~ListenTask();
virtual ~ListenTask();
public:
virtual void handleConnected(std::unique_ptr<Socket>, ConnectionInfo&&) = 0;
@ -59,8 +58,14 @@ class ListenTask : public rest::IoTask {
private:
void accept();
protected:
rest::GeneralServer& _server;
rest::GeneralServer::IoContext& _context;
private:
Endpoint* _endpoint;
size_t _acceptFailures = 0;
size_t _acceptFailures;
bool _bound;
std::unique_ptr<Acceptor> _acceptor;

View File

@ -39,6 +39,10 @@
using namespace arangodb::basics;
using namespace arangodb::rest;
namespace {
std::atomic_uint_fast64_t NEXT_TASK_ID(static_cast<uint64_t>(TRI_microtime() * 100000.0));
}
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -50,7 +54,10 @@ SocketTask::SocketTask(GeneralServer& server,
arangodb::ConnectionInfo&& connectionInfo,
double keepAliveTimeout,
bool skipInit = false)
: IoTask(server, context, name),
: _server(server),
_context(context),
_name(name),
_taskId(++NEXT_TASK_ID),
_peer(std::move(socket)),
_connectionInfo(std::move(connectionInfo)),
_connectionStatistics(nullptr),
@ -125,7 +132,7 @@ bool SocketTask::start() {
<< _connectionInfo.serverAddress << ":" << _connectionInfo.serverPort << " <-> "
<< _connectionInfo.clientAddress << ":" << _connectionInfo.clientPort;
_peer->post([self = shared_from_this(), this]() { asyncReadSome(); });
_peer->post([self = shared_from_this()]() { self->asyncReadSome(); });
return true;
}
@ -186,7 +193,7 @@ void SocketTask::closeStream() {
// strand::dispatch may execute this immediately if this
// is called on a thread inside the same strand
_peer->post([self = shared_from_this(), this] { closeStreamNoLock(); });
_peer->post([self = shared_from_this()] { self->closeStreamNoLock(); });
}
// caller must hold the _lock
@ -234,11 +241,11 @@ void SocketTask::resetKeepAlive() {
}
_keepAliveTimerActive.store(true, std::memory_order_relaxed);
_keepAliveTimer->async_wait([self = shared_from_this(), this](const asio_ns::error_code& error) {
_keepAliveTimer->async_wait([self = shared_from_this()](asio_ns::error_code const& error) {
if (!error) { // error will be true if timer was canceled
LOG_TOPIC("5c1e0", ERR, Logger::COMMUNICATION)
<< "keep alive timout - closing stream!";
closeStream();
self->closeStream();
}
});
}
@ -414,22 +421,25 @@ void SocketTask::asyncReadSome() {
TRI_ASSERT(_peer != nullptr);
_peer->asyncRead(asio_ns::buffer(_readBuffer.end(), READ_BLOCK_SIZE),
[self = shared_from_this(), this](const asio_ns::error_code& ec, std::size_t transferred) {
if (_abandoned.load(std::memory_order_acquire)) {
[self = shared_from_this()](asio_ns::error_code const& ec, std::size_t transferred) {
auto thisPtr = self.get();
if (thisPtr->_abandoned.load(std::memory_order_acquire)) {
return;
} else if (ec) {
}
if (ec) {
LOG_TOPIC("29dca", DEBUG, Logger::COMMUNICATION)
<< "read on stream failed with: " << ec.message();
closeStream();
thisPtr->closeStream();
return;
}
_readBuffer.increaseLength(transferred);
thisPtr->_readBuffer.increaseLength(transferred);
if (processAll()) {
_peer->post([self, this]() { asyncReadSome(); });
if (thisPtr->processAll()) {
thisPtr->_peer->post([self]() { self->asyncReadSome(); });
}
compactify();
thisPtr->compactify();
});
}
@ -505,22 +515,24 @@ void SocketTask::asyncWriteSome() {
// so the code could have blocked at this point or not all data
// was written in one go, begin writing at offset (written)
_peer->asyncWrite(asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written),
[self = shared_from_this(), this](const asio_ns::error_code& ec, std::size_t transferred) {
if (_abandoned.load(std::memory_order_acquire)) {
[self = shared_from_this()](asio_ns::error_code const& ec, std::size_t transferred) {
auto thisPtr = self.get();
if (thisPtr->_abandoned.load(std::memory_order_acquire)) {
return;
}
if (ec) {
LOG_TOPIC("8ed36", DEBUG, Logger::COMMUNICATION)
<< "write failed with: " << ec.message();
closeStream();
thisPtr->closeStream();
return;
}
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, transferred);
RequestStatistics::ADD_SENT_BYTES(thisPtr->_writeBuffer._statistics, transferred);
if (completedWriteBuffer()) {
if (!_abandoned.load(std::memory_order_acquire)) {
asyncWriteSome();
if (thisPtr->completedWriteBuffer()) {
if (!thisPtr->_abandoned.load(std::memory_order_acquire)) {
thisPtr->asyncWriteSome();
}
}
});
@ -581,5 +593,5 @@ void SocketTask::returnStringBuffer(StringBuffer* buffer) {
void SocketTask::triggerProcessAll() {
// try to process remaining request data
_peer->post([self = shared_from_this(), this] { processAll(); });
_peer->post([self = shared_from_this()] { self->processAll(); });
}

View File

@ -25,24 +25,21 @@
#ifndef ARANGOD_SCHEDULER_SOCKET_TASK_H
#define ARANGOD_SCHEDULER_SOCKET_TASK_H 1
#include "GeneralServer/Task.h"
#include "Basics/Mutex.h"
#include "Basics/SmallVector.h"
#include "Basics/StringBuffer.h"
#include "Endpoint/ConnectionInfo.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/Socket.h"
#include "Statistics/RequestStatistics.h"
#include "GeneralServer/IoTask.h"
#include <list>
namespace arangodb {
class ConnectionStatistics;
namespace rest {
class SocketTask : public IoTask {
class SocketTask : public std::enable_shared_from_this<SocketTask> {
friend class HttpCommTask;
friend class GeneralServer;
@ -65,6 +62,13 @@ class SocketTask : public IoTask {
// whether or not this task can mix sync and async I/O
virtual bool canUseMixedIO() const = 0;
// doesn't seem to be called right now, but can be used for debugging
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
char const* name() const { return _name; }
#endif
uint64_t id() const { return _taskId; }
protected:
// caller will hold the _lock
@ -171,6 +175,11 @@ class SocketTask : public IoTask {
void asyncWriteSome();
protected:
GeneralServer& _server;
GeneralServer::IoContext& _context;
char const* _name;
uint64_t const _taskId;
std::unique_ptr<Socket> _peer;
ConnectionInfo _connectionInfo;

View File

@ -340,13 +340,12 @@ void Syncer::JobSynchronizer::request(std::function<void()> const& cb) {
}
try {
auto self = shared_from_this();
SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [this, self, cb]() {
SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [self = shared_from_this(), cb]() {
// whatever happens next, when we leave this here, we need to indicate
// that there is no more posted job.
// otherwise the calling thread may block forever waiting on the
// posted jobs to finish
auto guard = scopeGuard([this]() { jobDone(); });
auto guard = scopeGuard([self]() { self->jobDone(); });
cb();
});

View File

@ -73,7 +73,7 @@ RestStatus RestAdminExecuteHandler::execute() {
}
try {
LOG_TOPIC("c838e", WARN, Logger::FIXME) << "about to execute: '" << Logger::CHARS(body, bodySize) << "'";
LOG_TOPIC("c838e", DEBUG, Logger::SECURITY) << "about to execute: '" << Logger::CHARS(body, bodySize) << "'";
// get a V8 context
bool const allowUseDatabase = ActionFeature::ACTION->allowUseDatabase();

View File

@ -233,8 +233,7 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
query->setTransactionContext(createAQLTransactionContext());
std::shared_ptr<aql::SharedQueryState> ss = query->sharedState();
auto self = shared_from_this();
ss->setContinueHandler([this, self, ss] { continueHandlerExecution(); });
ss->setContinueHandler([self = shared_from_this(), ss] { self->continueHandlerExecution(); });
registerQuery(std::move(query));
return processQuery();
@ -526,9 +525,8 @@ RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code,
aql::ExecutionState state;
Result r;
auto self = shared_from_this();
std::tie(state, r) =
cursor->dump(builder, [this, self]() { continueHandlerExecution(); });
cursor->dump(builder, [self = shared_from_this()]() { self->continueHandlerExecution(); });
if (state == aql::ExecutionState::WAITING) {
builder.clear();
_leasedCursor = cursor;

View File

@ -35,7 +35,6 @@
#include "Basics/cpu-relax.h"
#include "GeneralServer/Acceptor.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/Task.h"
#include "Logger/Logger.h"
#include "Random/RandomGenerator.h"
#include "Rest/GeneralResponse.h"

View File

@ -32,7 +32,6 @@
#include "Cluster/ServerState.h"
#include "GeneralServer/Acceptor.h"
#include "GeneralServer/RestHandler.h"
#include "GeneralServer/Task.h"
#include "Logger/Logger.h"
#include "Random/RandomGenerator.h"
#include "Rest/GeneralResponse.h"