1
0
Fork 0
This commit is contained in:
Frank Celler 2012-01-04 09:38:42 +01:00
parent f27104c859
commit 897698dfe2
23 changed files with 3285 additions and 12 deletions

90
Scheduler/AsyncTask.cpp Normal file
View File

@ -0,0 +1,90 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle asynchronous events
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "AsyncTask.h"
#include "Scheduler/Scheduler.h"
using namespace triagens::rest;
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
AsyncTask::AsyncTask ()
: Task("AsyncTask"),
watcher(0) {
}
AsyncTask::~AsyncTask () {
}
// -----------------------------------------------------------------------------
// public methods
// -----------------------------------------------------------------------------
void AsyncTask::signal () {
scheduler->sendAsync(watcher);
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void AsyncTask::setup (Scheduler* scheduler, EventLoop loop) {
this->scheduler = scheduler;
this->loop = loop;
watcher = scheduler->installAsyncEvent(loop, this);
}
void AsyncTask::cleanup () {
scheduler->uninstallEvent(watcher);
watcher = 0;
}
bool AsyncTask::handleEvent (EventToken token, EventType revents) {
bool result = true;
if (watcher == token && (revents & EVENT_ASYNC)) {
result = handleAsync();
}
return result;
}
}
}

108
Scheduler/AsyncTask.h Normal file
View File

@ -0,0 +1,108 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle asynchronous events
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_ASYNC_TASK_H
#define TRIAGENS_FYN_REST_ASYNC_TASK_H 1
#include "Scheduler/Task.h"
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief task used to handle asyncs
////////////////////////////////////////////////////////////////////////////////
class AsyncTask : virtual public Task {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task
////////////////////////////////////////////////////////////////////////////////
AsyncTask ();
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief signals the task
///
/// Note that this method can only be called after the task has been registered.
////////////////////////////////////////////////////////////////////////////////
void signal ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~AsyncTask ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief handles the signal
////////////////////////////////////////////////////////////////////////////////
virtual bool handleAsync () = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken, EventType);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief event for async signals
////////////////////////////////////////////////////////////////////////////////
EventToken watcher;
};
}
}
#endif

View File

@ -0,0 +1,434 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to establish connections
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "ConnectionTask.h"
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <BasicsC/socket-utils.h>
#include <Logger/Logger.h>
#include "Scheduler/Scheduler.h"
using namespace triagens::basics;
using namespace triagens::rest;
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
ConnectionTask::ConnectionTask (string const& hostname, int port)
: Task("ConnectionTask"),
SocketTask(0),
watcher(0),
state(STATE_UNCONNECTED),
idle(true),
connectTimeout(0.0),
commTimeout(0.0),
hostname(hostname),
port(port),
netaddress(0),
netaddressLength(0),
resolved(false) {
resolveAddress();
connectSocket();
}
ConnectionTask::ConnectionTask (string const& hostname, int port, double connectTimeout)
: Task("ConnectionTask"),
SocketTask(0),
watcher(0),
state(STATE_UNCONNECTED),
idle(true),
connectTimeout(connectTimeout),
commTimeout(0.0),
hostname(hostname),
port(port),
netaddress(0),
netaddressLength(0),
resolved(false) {
resolveAddress();
connectSocket();
}
ConnectionTask::ConnectionTask (string const& hostname, int port, double connectTimeout, double commTimeout)
: Task("ConnectionTask"),
SocketTask(0),
watcher(0),
state(STATE_UNCONNECTED),
idle(true),
connectTimeout(connectTimeout),
commTimeout(commTimeout),
hostname(hostname),
port(port),
netaddress(0),
netaddressLength(0),
resolved(false) {
resolveAddress();
connectSocket();
}
ConnectionTask::ConnectionTask (socket_t fd)
: Task("ConnectionTask"),
SocketTask(fd),
watcher(0),
state(STATE_CONNECTED),
idle(true),
connectTimeout(0.0),
commTimeout(0.0),
hostname(""),
port(0),
netaddress(0),
netaddressLength(0),
resolved(true) {
}
ConnectionTask::ConnectionTask (socket_t fd, double commTimeout)
: Task("ConnectionTask"),
SocketTask(fd),
watcher(0),
state(STATE_CONNECTED),
idle(true),
connectTimeout(0.0),
commTimeout(commTimeout),
hostname(""),
port(0),
netaddress(0),
netaddressLength(0),
resolved(true) {
}
ConnectionTask::~ConnectionTask () {
if (netaddress != 0) {
delete[] netaddress;
}
if (state == STATE_CONNECTION_INPROGRESS) {
LOGGER_TRACE << "closing inprogrss socket " << commSocket;
int r = close(commSocket);
if (r < 0 ) {
LOGGER_ERROR << "close failed with " << errno << " (" << strerror(errno) << ") on " << commSocket;
}
commSocket = -1;
}
else if (state == STATE_CONNECTED) {
LOGGER_TRACE << "closing connected socket " << commSocket;
int r = close(commSocket);
if (r < 0 ) {
LOGGER_ERROR << "close failed with " << errno << " (" << strerror(errno) << ") on " << commSocket;
}
commSocket = -1;
}
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void ConnectionTask::setup (Scheduler* scheduler, EventLoop loop) {
this->scheduler = scheduler;
this->loop = loop;
// we must have resolved the hostname
if (! resolved) {
return;
}
// and started the connection process
if (! isConnecting()) {
return;
}
// initialize all watchers
SocketTask::setup(scheduler, loop);
// we are already connected
if (isConnected()) {
LOGGER_TRACE << "already connected, waiting for requests on socket " << commSocket;
watcher = 0;
}
// we are in the process of connecting
else {
scheduler->uninstallEvent(readWatcher);
readWatcher = 0;
if (0.0 < connectTimeout) {
watcher = scheduler->installTimerEvent(loop, this, connectTimeout);
}
else {
watcher = 0;
}
LOGGER_TRACE << "waiting for connect to complete on socket " << commSocket;
}
}
void ConnectionTask::cleanup () {
SocketTask::cleanup();
scheduler->uninstallEvent(watcher);
}
bool ConnectionTask::handleEvent (EventToken token, EventType revents) {
bool result = true;
// upps, should not happen
if (state == STATE_UNCONNECTED) {
LOGGER_WARNING << "received event in unconnected state";
}
// we are trying to connect
else if (state == STATE_CONNECTION_INPROGRESS) {
result = handleConnectionEvent(token, revents);
}
// we are connected
else if (state == STATE_CONNECTED) {
result = handleCommunicationEvent(token, revents);
}
return result;
}
// -----------------------------------------------------------------------------
// private methods
// -----------------------------------------------------------------------------
bool ConnectionTask::handleConnectionEvent (EventToken token, EventType revents) {
// we received an event on the write socket
if (token == writeWatcher && (revents & EVENT_SOCKET_WRITE)) {
scheduler->uninstallEvent(readWatcher);
scheduler->uninstallEvent(writeWatcher);
scheduler->uninstallEvent(watcher);
int valopt;
socklen_t lon = sizeof(valopt);
if (getsockopt(commSocket, SOL_SOCKET, SO_ERROR, (void*)(&valopt), &lon) < 0) {
LOGGER_ERROR << "getsockopt failed with " << errno << " (" << strerror(errno) << ") on " << commSocket;
close(commSocket);
commSocket = -1;
state = STATE_UNCONNECTED;
return handleConnectionFailure();
}
if (valopt != 0) {
LOGGER_DEBUG << "delayed connect failed with " << valopt << " (" << strerror(valopt) << ") on " << commSocket;
close(commSocket);
commSocket = -1;
state = STATE_UNCONNECTED;
return handleConnectionFailure();
}
else {
state = STATE_CONNECTED;
readWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_READ, this, commSocket);
writeWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_WRITE, this, commSocket);
if (0.0 < commTimeout) {
watcher = scheduler->installTimerEvent(loop, this, commTimeout);
}
else {
watcher = 0;
}
LOGGER_TRACE << "connection completed on socket " << commSocket;
return handleConnected();
}
}
// we received a timer event
if (token == watcher && (revents & EVENT_TIMER)) {
close(commSocket);
commSocket = -1;
state = STATE_UNCONNECTED;
return handleConnectionTimeout();
}
return true;
}
bool ConnectionTask::handleCommunicationEvent (EventToken token, EventType revents) {
// we received a timer event
if (token == watcher && (revents & EVENT_TIMER)) {
if (idle) {
if (watcher != 0) {
LOGGER_TRACE << "clearing timer because we are idle";
scheduler->clearTimer(watcher);
}
}
else if (0.0 < commTimeout) {
LOGGER_TRACE << "received a timer event";
return handleCommunicationTimeout();
}
}
if (revents & (EVENT_SOCKET_READ | EVENT_SOCKET_WRITE)) {
if (0.0 < commTimeout) {
scheduler->rearmTimer(watcher, commTimeout);
}
}
return SocketTask::handleEvent(token, revents);
}
void ConnectionTask::resolveAddress () {
resolved = false;
if (netaddress != 0) {
delete[] netaddress;
}
netaddress = TRI_GetHostByName(hostname.c_str(), &netaddressLength);
if (netaddress == 0) {
LOGGER_ERROR << "cannot resolve hostname '" << hostname << "'";
return;
}
resolved = true;
}
void ConnectionTask::connectSocket () {
if (! resolved) {
LOGGER_DEBUG << "connect failed beacuse hostname cannot be resolved";
state = STATE_UNCONNECTED;
return;
}
// close open socket from last run
if (state != STATE_UNCONNECTED) {
close(commSocket);
commSocket = -1;
}
state = STATE_UNCONNECTED;
// open a socket
commSocket = ::socket(AF_INET, SOCK_STREAM, 0);
if (commSocket < 0) {
LOGGER_ERROR << "socket failed with " << errno << " (" << strerror(errno) << ") on " << commSocket;
return;
}
LOGGER_TRACE << "created socket " << commSocket;
// we are in the process of connecting the socket
state = STATE_CONNECTION_INPROGRESS;
// use non-blocking io
TRI_SetNonBlockingSocket(commSocket);
// and try to connect the socket to the given address
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
if (sizeof(saddr.sin_addr.s_addr) < netaddressLength) {
close(commSocket);
commSocket = -1;
state = STATE_UNCONNECTED;
LOGGER_ERROR << "IPv6 address are not allowed";
return;
}
memcpy(&(saddr.sin_addr.s_addr), netaddress, netaddressLength);
saddr.sin_port = htons(port);
int res = ::connect(commSocket, reinterpret_cast<struct sockaddr*>(&saddr), sizeof(saddr));
if (res < 0) {
if (errno == EINPROGRESS) {
LOGGER_TRACE << "connection to '" << hostname << "' is in progress on " << commSocket;
return;
}
else {
close(commSocket);
commSocket = -1;
state = STATE_UNCONNECTED;
LOGGER_ERROR << "connect failed with " << errno << " (" << strerror(errno) << ") on " << commSocket;
return;
}
}
state = STATE_CONNECTED;
handleConnected();
}
}
}

252
Scheduler/ConnectionTask.h Normal file
View File

@ -0,0 +1,252 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to establish connections
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_CONNECTION_TASK_H
#define TRIAGENS_FYN_REST_CONNECTION_TASK_H 1
#include "Scheduler/SocketTask.h"
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief task used to establish connections
////////////////////////////////////////////////////////////////////////////////
class ConnectionTask : public SocketTask {
ConnectionTask (ConnectionTask const &);
ConnectionTask& operator= (ConnectionTask const &);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief error status
////////////////////////////////////////////////////////////////////////////////
enum error_e {
ERROR_NO_ERROR,
ERROR_CONNECTION_FAILURE,
ERROR_CONNECTION_TIMEOUT,
ERROR_CONNECTION_CLOSED,
ERROR_EXECUTION_FAILURE,
ERROR_EXECUTION_TIMEOUT,
ERROR_GENERAL_ERROR,
ERROR_SHUTDOWN_IN_PROGRESS
};
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief connection to given hostname and port
////////////////////////////////////////////////////////////////////////////////
ConnectionTask (string const& hostname, int port);
////////////////////////////////////////////////////////////////////////////////
/// @brief connection to given hostname and port
////////////////////////////////////////////////////////////////////////////////
ConnectionTask (string const& hostname, int port, double connectTimeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief connection to given hostname and port
////////////////////////////////////////////////////////////////////////////////
ConnectionTask (string const& hostname, int port, double connectTimeout, double commTimeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief connection using a given socket
////////////////////////////////////////////////////////////////////////////////
ConnectionTask (socket_t fd);
////////////////////////////////////////////////////////////////////////////////
/// @brief connection using a given socket
////////////////////////////////////////////////////////////////////////////////
ConnectionTask (socket_t fd, double commTimeout);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if hostname is resolved
///
/// Note that you can use this method only before the call to registerTask.
////////////////////////////////////////////////////////////////////////////////
bool isResolved () const {
return resolved;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if the socket is connected
///
/// Note that you can use this method only before the call to registerTask.
////////////////////////////////////////////////////////////////////////////////
bool isConnected () const {
return state == STATE_CONNECTED;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if the socket is created
///
/// Note that you can use this method only before the call to registerTask.
////////////////////////////////////////////////////////////////////////////////
bool isConnecting () const {
return state == STATE_CONNECTED || state == STATE_CONNECTION_INPROGRESS;
}
int getPort () const {
return port;
}
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief called by the task to indicate connection success
////////////////////////////////////////////////////////////////////////////////
virtual bool handleConnected () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called by the task to indicate connection failure
////////////////////////////////////////////////////////////////////////////////
virtual bool handleConnectionFailure () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called by the task to indicate a timeout
////////////////////////////////////////////////////////////////////////////////
virtual bool handleConnectionTimeout () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called by the task to indicate a timeout
////////////////////////////////////////////////////////////////////////////////
virtual bool handleCommunicationTimeout () = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief destructs a connection task
////////////////////////////////////////////////////////////////////////////////
~ConnectionTask ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
///
/// Note that you should only call registerTask if the address has been
/// resolved.
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken, EventType);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief timer event
////////////////////////////////////////////////////////////////////////////////
EventToken watcher;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief state of the state machine
////////////////////////////////////////////////////////////////////////////////
enum state_e {
STATE_UNCONNECTED,
STATE_CONNECTION_INPROGRESS,
STATE_CONNECTED
};
////////////////////////////////////////////////////////////////////////////////
/// @brief the state of the socket
////////////////////////////////////////////////////////////////////////////////
state_e state;
////////////////////////////////////////////////////////////////////////////////
/// @brief true, if no request is in progress
///
/// Note that the sub-class must set/reset the idle flag in handleEvent.
////////////////////////////////////////////////////////////////////////////////
bool idle;
////////////////////////////////////////////////////////////////////////////////
/// @brief connection timeout in seconds
////////////////////////////////////////////////////////////////////////////////
double connectTimeout;
////////////////////////////////////////////////////////////////////////////////
/// @brief communication timeout in seconds
////////////////////////////////////////////////////////////////////////////////
double commTimeout;
private:
void resolveAddress ();
void connectSocket ();
bool handleConnectionEvent (EventToken token, EventType event);
bool handleCommunicationEvent (EventToken token, EventType event);
private:
string const hostname;
int const port;
char* netaddress;
size_t netaddressLength;
bool resolved;
};
}
}
#endif

343
Scheduler/ListenTask.cpp Normal file
View File

@ -0,0 +1,343 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to establish connections
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "ListenTask.h"
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <Basics/MutexLocker.h>
#include <BasicsC/socket-utils.h>
#include <Logger/Logger.h>
#include "GeneralServer/GeneralFigures.h"
#include "Scheduler/Scheduler.h"
using namespace triagens::basics;
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
ListenTask::ListenTask (string const& address, int port, bool reuseAddress)
: Task("ListenTask"),
readWatcher(0),
reuseAddress(reuseAddress),
address(address),
port(port),
listenSocket(0),
bound(false),
acceptFailures(0) {
bindSocket();
}
ListenTask::ListenTask (int port, bool reuseAddress)
: Task("ListenTask"),
readWatcher(0),
reuseAddress(reuseAddress),
address(""),
port(port),
listenSocket(0),
bound(false),
acceptFailures(0) {
bindSocket();
}
ListenTask::~ListenTask () {
close(listenSocket);
}
// -----------------------------------------------------------------------------
// public methods
// -----------------------------------------------------------------------------
bool ListenTask::isBound () const {
MUTEX_LOCKER(changeLock);
return bound;
}
bool ListenTask::rebind () {
MUTEX_LOCKER(changeLock);
if (bound) {
close(listenSocket);
}
return bindSocket();
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void ListenTask::setup (Scheduler* scheduler, EventLoop loop) {
if (! bound) {
return;
}
this->scheduler = scheduler;
this->loop = loop;
readWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_READ, this, listenSocket);
}
void ListenTask::cleanup () {
scheduler->uninstallEvent(readWatcher);
readWatcher = 0;
}
bool ListenTask::handleEvent (EventToken token, EventType revents) {
if (token == readWatcher) {
if ((revents & EVENT_SOCKET_READ) == 0) {
return true;
}
sockaddr_in addr;
socklen_t len = sizeof(addr);
memset(&addr, 0, sizeof(addr));
// accept connection
socket_t connfd = accept(listenSocket, (sockaddr*) &addr, &len);
if (connfd == INVALID_SOCKET) {
++acceptFailures;
if (acceptFailures < MAX_ACCEPT_ERRORS) {
LOGGER_WARNING << "accept failed with " << errno << " (" << strerror(errno) << ")";
}
else if (acceptFailures == MAX_ACCEPT_ERRORS) {
LOGGER_ERROR << "too many accept failures, stopping logging";
}
GeneralFigures::incCounter<GeneralFigures::GeneralServerStatistics::connectErrorsAccessor>();
return true;
}
struct sockaddr_in addr_out;
socklen_t len_out = sizeof(addr_out);
int res = getsockname(connfd, (sockaddr*) &addr_out, &len_out);
if (res != 0) {
close(connfd);
LOGGER_WARNING << "getsockname failed with " << errno << " (" << strerror(errno) << ")";
GeneralFigures::incCounter<GeneralFigures::GeneralServerStatistics::connectErrorsAccessor>();
return true;
}
acceptFailures = 0;
// disable nagle's algorithm
int n = 1;
res = setsockopt(connfd, IPPROTO_TCP, TCP_NODELAY, (char*)&n, sizeof(n));
if (res != 0 ) {
close(connfd);
LOGGER_WARNING << "setsockopt failed with " << errno << " (" << strerror(errno) << ")";
GeneralFigures::incCounter<GeneralFigures::GeneralServerStatistics::connectErrorsAccessor>();
return true;
}
// set socket to non-blocking
bool ok = TRI_SetNonBlockingSocket(connfd);
if (! ok) {
close(connfd);
LOGGER_ERROR << "cannot switch to non-blocking: " << errno << " (" << strerror(errno) << ")";
GeneralFigures::incCounter<GeneralFigures::GeneralServerStatistics::connectErrorsAccessor>();
return true;
}
ok = TRI_SetCloseOnExecSocket(connfd);
if (! ok) {
close(connfd);
LOGGER_ERROR << "cannot set close-on-exec: " << errno << " (" << strerror(errno) << ")";
GeneralFigures::incCounter<GeneralFigures::GeneralServerStatistics::connectErrorsAccessor>();
return true;
}
// handle connection
ConnectionInfo info;
info.serverAddress = inet_ntoa(addr_out.sin_addr);
info.serverPort = port;
info.clientAddress = inet_ntoa(addr.sin_addr);
info.clientPort = addr.sin_port;
return handleConnected(connfd, info);
}
return true;
}
// -----------------------------------------------------------------------------
// private methods
// -----------------------------------------------------------------------------
bool ListenTask::bindSocket () {
bound = false;
// create a new socket
listenSocket = socket(AF_INET, SOCK_STREAM, 0);
if (listenSocket < 0) {
LOGGER_ERROR << "socket failed with " << errno << " (" << strerror(errno) << ")";
return false;
}
// try to reuse address
if (reuseAddress) {
int opt = 1;
if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) == -1) {
LOGGER_ERROR << "setsockopt failed with " << errno << " (" << strerror(errno) << ")";
return false;
}
LOGGER_TRACE << "reuse address flag set";
}
// bind to any address
sockaddr_in addr;
if (address.empty()) {
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
}
// bind to a given address
else {
size_t length;
char* a = TRI_GetHostByName(address.c_str(), &length);
if (a == 0) {
LOGGER_ERROR << "cannot resolve hostname: " << errno << " (" << strerror(errno) << ")";
return false;
}
if (sizeof(addr.sin_addr.s_addr) < length) {
LOGGER_ERROR << "IPv6 address are not allowed";
return false;
}
// bind socket to an address
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
memcpy(&(addr.sin_addr.s_addr), a, length);
addr.sin_port = htons(port);
delete[] a;
}
int res = bind(listenSocket, (const sockaddr*) &addr, sizeof(addr));
if (res < 0) {
close(listenSocket);
LOGGER_ERROR << "bind failed with " << errno << " (" << strerror(errno) << ")";
return false;
}
// listen for new connection
res = listen(listenSocket, 10);
if (res < 0) {
close(listenSocket);
LOGGER_ERROR << "listen failed with " << errno << " (" << strerror(errno) << ")";
return false;
}
// set socket to non-blocking
bool ok = TRI_SetNonBlockingSocket(listenSocket);
if (! ok) {
close(listenSocket);
LOGGER_ERROR << "cannot switch to non-blocking: " << errno << " (" << strerror(errno) << ")";
return false;
}
// set close on exit
ok = TRI_SetCloseOnExecSocket(listenSocket);
if (! ok) {
close(listenSocket);
LOGGER_ERROR << "cannot set close-on-exit: " << errno << " (" << strerror(errno) << ")";
return false;
}
bound = true;
return true;
}
}
}

150
Scheduler/ListenTask.h Normal file
View File

@ -0,0 +1,150 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to establish connections
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_LISTEN_TASK_H
#define TRIAGENS_FYN_REST_LISTEN_TASK_H 1
#include "Scheduler/Task.h"
#include <Basics/Mutex.h>
#include <Rest/ConnectionInfo.h>
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief task used to establish connections
////////////////////////////////////////////////////////////////////////////////
class ListenTask : virtual public Task {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief maximal number of failed connects
////////////////////////////////////////////////////////////////////////////////
static size_t const MAX_ACCEPT_ERRORS = 1000;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief listen to given address and port
////////////////////////////////////////////////////////////////////////////////
ListenTask (string const& address, int port, bool reuseAddress);
////////////////////////////////////////////////////////////////////////////////
/// @brief listen to given port
////////////////////////////////////////////////////////////////////////////////
ListenTask (int port, bool reuseAddress);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if listen socket is bound
////////////////////////////////////////////////////////////////////////////////
bool isBound () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief try to rebind to port
///
/// Note that this method can only be called before the task has been
/// registered.
////////////////////////////////////////////////////////////////////////////////
bool rebind ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief called by the task to indicate connection success
////////////////////////////////////////////////////////////////////////////////
virtual bool handleConnected (socket_t fd, ConnectionInfo const& info) = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief destructs a listen task
///
/// This method will close the underlying socket.
////////////////////////////////////////////////////////////////////////////////
~ListenTask ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
///
/// Note that registerTask must only be called when the socket is bound.
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken token, EventType);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief event for read
////////////////////////////////////////////////////////////////////////////////
EventToken readWatcher;
private:
bool bindSocket ();
private:
bool reuseAddress;
string const address;
int const port;
socket_t listenSocket;
bool bound;
size_t acceptFailures;
mutable basics::Mutex changeLock;
};
}
}
#endif

View File

@ -0,0 +1,90 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle periodic events
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "PeriodicTask.h"
#include "Scheduler/Scheduler.h"
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
PeriodicTask::PeriodicTask (double offset, double intervall)
: Task("PeriodicTask"),
watcher(0),
offset(offset),
intervall(intervall) {
}
PeriodicTask::~PeriodicTask () {
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void PeriodicTask::resetTimer (double offset, double intervall) {
scheduler->rearmPeriodic(watcher, offset, intervall);
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void PeriodicTask::setup (Scheduler* scheduler, EventLoop loop) {
this->scheduler = scheduler;
this->loop = loop;
watcher = scheduler->installPeriodicEvent(loop, this, offset, intervall);
}
void PeriodicTask::cleanup () {
scheduler->uninstallEvent(watcher);
watcher = 0;
}
bool PeriodicTask::handleEvent (EventToken token, EventType revents) {
bool result = true;
if (token == watcher && (revents & EVENT_PERIODIC)) {
result = handlePeriod();
}
return result;
}
}
}

110
Scheduler/PeriodicTask.h Normal file
View File

@ -0,0 +1,110 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle periodic events
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_PERIODIC_TASK_H
#define TRIAGENS_FYN_REST_PERIODIC_TASK_H 1
#include "Scheduler/Task.h"
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief task used to handle periodic events
////////////////////////////////////////////////////////////////////////////////
class PeriodicTask : virtual public Task {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task for a given periodic event
////////////////////////////////////////////////////////////////////////////////
PeriodicTask (double offset, double intervall);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief called when the timer is reached
////////////////////////////////////////////////////////////////////////////////
virtual bool handlePeriod () = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~PeriodicTask ();
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief resets the timer
////////////////////////////////////////////////////////////////////////////////
void resetTimer (double offset, double intervall);
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken, EventType);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief periodic event
////////////////////////////////////////////////////////////////////////////////
EventToken watcher;
private:
double offset;
double intervall;
};
}
}
#endif

44
Scheduler/Scheduler.cpp Normal file
View File

@ -0,0 +1,44 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief input-output scheduler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Scheduler.h"
#include "Scheduler/SchedulerLibev.h"
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
Scheduler* Scheduler::create () {
return new SchedulerLibev();
}
}
}

295
Scheduler/Scheduler.h Normal file
View File

@ -0,0 +1,295 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief input-output scheduler
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_SCHEDULER_H
#define TRIAGENS_FYN_REST_SCHEDULER_H 1
#include <Basics/Common.h>
////////////////////////////////////////////////////////////////////////////////
/// @defgroup Scheduler I/O Scheduler
////////////////////////////////////////////////////////////////////////////////
namespace triagens {
namespace basics {
class ConditionVariable;
}
namespace rest {
class Task;
////////////////////////////////////////////////////////////////////////////////
/// @brief event loop identifier
////////////////////////////////////////////////////////////////////////////////
typedef uint32_t EventLoop;
////////////////////////////////////////////////////////////////////////////////
/// @brief event handler identifier
////////////////////////////////////////////////////////////////////////////////
typedef uint32_t EventToken;
////////////////////////////////////////////////////////////////////////////////
/// @brief event type identifier
////////////////////////////////////////////////////////////////////////////////
typedef uint32_t EventType;
////////////////////////////////////////////////////////////////////////////////
/// @brief socket read event
////////////////////////////////////////////////////////////////////////////////
uint32_t const EVENT_SOCKET_READ = 1;
////////////////////////////////////////////////////////////////////////////////
/// @brief socket write event
////////////////////////////////////////////////////////////////////////////////
uint32_t const EVENT_SOCKET_WRITE = 2;
////////////////////////////////////////////////////////////////////////////////
/// @brief asynchronous event
////////////////////////////////////////////////////////////////////////////////
uint32_t const EVENT_ASYNC = 4;
////////////////////////////////////////////////////////////////////////////////
/// @brief timer event
////////////////////////////////////////////////////////////////////////////////
uint32_t const EVENT_TIMER = 8;
////////////////////////////////////////////////////////////////////////////////
/// @brief periodic event
////////////////////////////////////////////////////////////////////////////////
uint32_t const EVENT_PERIODIC = 16;
////////////////////////////////////////////////////////////////////////////////
/// @brief signal event
////////////////////////////////////////////////////////////////////////////////
uint32_t const EVENT_SIGNAL = 32;
////////////////////////////////////////////////////////////////////////////////
/// @brief automatically select an io backend
////////////////////////////////////////////////////////////////////////////////
uint32_t const BACKEND_AUTO = 0;
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief interface of a input-output scheduler
////////////////////////////////////////////////////////////////////////////////
class Scheduler {
private:
Scheduler (Scheduler const&);
Scheduler& operator= (Scheduler const&);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a single-threaded scheduler
////////////////////////////////////////////////////////////////////////////////
static Scheduler* create ();
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
Scheduler () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
virtual ~Scheduler () {
}
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if scheduler is shuting down
////////////////////////////////////////////////////////////////////////////////
virtual bool isShutdownInProgress () = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if scheduler is still running
////////////////////////////////////////////////////////////////////////////////
virtual bool isRunning () = 0;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief registers a new task
////////////////////////////////////////////////////////////////////////////////
virtual void registerTask (Task*) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief unregisters a task
///
/// Note that this method is called by the task itself when cleanupTask is
/// executed. If a Task failed in setupTask, it must not call unregisterTask.
////////////////////////////////////////////////////////////////////////////////
virtual void unregisterTask (Task*) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief destroys task
///
/// Even if a Task failed in setupTask, it can still call destroyTask. The
/// methods will delete the task.
////////////////////////////////////////////////////////////////////////////////
virtual void destroyTask (Task*) = 0;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief starts scheduler, keeps running
///
/// The functions returns true, if the scheduler has been started. In this
/// case the condition variable is signal as soon as at least one of the
/// scheduler threads stops.
////////////////////////////////////////////////////////////////////////////////
virtual bool start (basics::ConditionVariable*) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief starts shutdown sequence
////////////////////////////////////////////////////////////////////////////////
virtual void beginShutdown () = 0;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief called to display current status
////////////////////////////////////////////////////////////////////////////////
virtual void reportStatus () = 0;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief main event loop
////////////////////////////////////////////////////////////////////////////////
virtual void eventLoop (EventLoop) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief wakes up an event loop
////////////////////////////////////////////////////////////////////////////////
virtual void wakeupLoop (EventLoop) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to register a socket descriptor event
////////////////////////////////////////////////////////////////////////////////
virtual EventToken installSocketEvent (EventLoop, EventType, Task*, socket_t) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief re-starts the socket events
////////////////////////////////////////////////////////////////////////////////
virtual void startSocketEvents (EventToken) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the socket events
////////////////////////////////////////////////////////////////////////////////
virtual void stopSocketEvents (EventToken) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to register an asynchronous event
////////////////////////////////////////////////////////////////////////////////
virtual EventToken installAsyncEvent (EventLoop, Task*) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief sends an asynchronous event
////////////////////////////////////////////////////////////////////////////////
virtual void sendAsync (EventToken) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to register a timer event
////////////////////////////////////////////////////////////////////////////////
virtual EventToken installTimerEvent (EventLoop, Task*, double timeout) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief clears a timer without removing it
////////////////////////////////////////////////////////////////////////////////
virtual void clearTimer (EventToken) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief rearms a timer
////////////////////////////////////////////////////////////////////////////////
virtual void rearmTimer (EventToken, double timeout) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to register a periodic event
////////////////////////////////////////////////////////////////////////////////
virtual EventToken installPeriodicEvent (EventLoop, Task*, double offset, double intervall) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief rearms a periodic timer
////////////////////////////////////////////////////////////////////////////////
virtual void rearmPeriodic (EventToken, double offset, double timeout) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to register a signal event
////////////////////////////////////////////////////////////////////////////////
virtual EventToken installSignalEvent (EventLoop, Task*, int signal) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to unregister an event handler
////////////////////////////////////////////////////////////////////////////////
virtual void uninstallEvent (EventToken) = 0;
};
}
}
#endif

View File

@ -28,12 +28,12 @@
#include "SchedulerImpl.h"
#include <Basics/Logger.h>
#include <Logger/Logger.h>
#include <Basics/MutexLocker.h>
#include <Basics/StringUtils.h>
#include <Basics/Thread.h>
#include <Rest/Task.h>
#include "Scheduler/Task.h"
#include "Scheduler/SchedulerThread.h"
using namespace triagens::basics;
@ -46,8 +46,8 @@ namespace triagens {
// -----------------------------------------------------------------------------
SchedulerImpl::SchedulerImpl (size_t nrThreads)
: stopping(0),
nrThreads(nrThreads),
: nrThreads(nrThreads),
stopping(0),
nextLoop(0) {
// check for multi-threading scheduler
@ -254,7 +254,7 @@ namespace triagens {
void SchedulerImpl::reportStatus () {
if (TRI_IsDebugLogging()) {
if (TRI_IsDebugLogging(__FILE__)) {
MUTEX_LOCKER(schedulerLock);
string status = "scheduler: ";

View File

@ -29,8 +29,8 @@
#ifndef TRIAGENS_FYN_SCHEDULER_SCHEDULER_IMPL_H
#define TRIAGENS_FYN_SCHEDULER_SCHEDULER_IMPL_H 1
#include <Rest/Scheduler.h>
#include <Rest/Task.h>
#include "Scheduler/Scheduler.h"
#include "Scheduler/Task.h"
#include <Basics/Mutex.h>

View File

@ -31,10 +31,10 @@
#include <ev.h>
#include <Basics/Exceptions.h>
#include <Basics/Logger.h>
#include <Logger/Logger.h>
#include <Basics/MutexLocker.h>
#include <Rest/Task.h>
#include "Scheduler/Task.h"
#include "Scheduler/SchedulerThread.h"
using namespace triagens::basics;

View File

@ -28,8 +28,9 @@
#include "SchedulerThread.h"
#include <Basics/Logger.h>
#include <Rest/Task.h>
#include <Logger/Logger.h>
#include "Scheduler/Task.h"
using namespace triagens::basics;

View File

@ -32,7 +32,8 @@
#include <Basics/Thread.h>
#include <Basics/Mutex.h>
#include <Rest/Task.h>
#include "Scheduler/Task.h"
namespace triagens {
namespace rest {

130
Scheduler/SignalTask.cpp Normal file
View File

@ -0,0 +1,130 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle signals
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "SignalTask.h"
#include <Logger/Logger.h>
#include <Basics/MutexLocker.h>
#include "Scheduler/Scheduler.h"
using namespace triagens::basics;
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
SignalTask::SignalTask ()
: Task("SignalTask") {
for (size_t i = 0; i < MAX_SIGNALS; ++i) {
watcher[i] = 0;
}
}
SignalTask::~SignalTask () {
}
// -----------------------------------------------------------------------------
// public methods
// -----------------------------------------------------------------------------
bool SignalTask::addSignal (int signal) {
MUTEX_LOCKER(changeLock);
if (signals.size() >= MAX_SIGNALS) {
LOGGER_ERROR << "maximal number of signals reached";
return false;
}
else {
if (scheduler != 0) {
scheduler->unregisterTask(this);
}
signals.insert(signal);
if (scheduler != 0) {
scheduler->registerTask(this);
}
return true;
}
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void SignalTask::setup (Scheduler* scheduler, EventLoop loop) {
this->scheduler = scheduler;
this->loop = loop;
size_t pos = 0;
for (set<int>::iterator i = signals.begin(); i != signals.end() && pos < MAX_SIGNALS; ++i, ++pos) {
watcher[pos] = scheduler->installSignalEvent(loop, this, *i);
}
}
void SignalTask::cleanup () {
for (size_t pos = 0; pos < signals.size() && pos < MAX_SIGNALS; ++pos) {
scheduler->uninstallEvent(watcher[pos]);
watcher[pos] = 0;
}
}
bool SignalTask::handleEvent (EventToken token, EventType revents) {
bool result = true;
if (revents & EVENT_SIGNAL) {
for (size_t pos = 0; pos < signals.size() && pos < MAX_SIGNALS; ++pos) {
if (token == watcher[pos]) {
result = handleSignal();
break;
}
}
}
return result;
}
bool SignalTask::needsMainEventLoop () const {
return true;
}
}
}

129
Scheduler/SignalTask.h Normal file
View File

@ -0,0 +1,129 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle signals
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_SIGNAL_TASK_H
#define TRIAGENS_FYN_REST_SIGNAL_TASK_H 1
#include "Scheduler/Task.h"
#include <Basics/Mutex.h>
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief task used to handle signals
////////////////////////////////////////////////////////////////////////////////
class SignalTask : virtual public Task {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief maximal number of signals per task
////////////////////////////////////////////////////////////////////////////////
static size_t const MAX_SIGNALS = 10;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task
////////////////////////////////////////////////////////////////////////////////
SignalTask ();
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief adds a signal which will be handled
///
/// Note that this method can be called on a signal task, which has already
/// been set up.
////////////////////////////////////////////////////////////////////////////////
bool addSignal (int signal);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief called when the signal is received
////////////////////////////////////////////////////////////////////////////////
virtual bool handleSignal () = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// destructor
////////////////////////////////////////////////////////////////////////////////
~SignalTask ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken, EventType);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool needsMainEventLoop () const;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief signal event
////////////////////////////////////////////////////////////////////////////////
EventToken watcher[MAX_SIGNALS];
private:
set<int> signals;
basics::Mutex changeLock;
};
}
}
#endif

363
Scheduler/SocketTask.cpp Normal file
View File

@ -0,0 +1,363 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief base class for input-output tasks from sockets
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "SocketTask.h"
#include <errno.h>
#include <Logger/Logger.h>
#include <Basics/MutexLocker.h>
#include <Basics/StringBuffer.h>
#include "Scheduler/Scheduler.h"
using namespace triagens::basics;
namespace triagens {
namespace rest {
static Mutex lockNumWrites;
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
SocketTask::SocketTask (socket_t fd)
: Task("SocketTask"),
readWatcher(0),
writeWatcher(0),
watcher(0),
commSocket(fd),
writeBuffer(0),
ownBuffer(true),
writeLength(0) {
readBuffer = new StringBuffer();
tmpReadBuffer = new char[READ_BLOCK_SIZE];
}
SocketTask::~SocketTask () {
if (commSocket != -1) {
close(commSocket);
}
if (writeBuffer != 0) {
if (ownBuffer) {
writeBuffer->free();
delete writeBuffer;
}
}
readBuffer->free();
delete readBuffer;
delete[] tmpReadBuffer;
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void SocketTask::setup (Scheduler* scheduler, EventLoop loop) {
this->scheduler = scheduler;
this->loop = loop;
watcher = scheduler->installAsyncEvent(loop, this);
readWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_READ, this, commSocket);
writeWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_WRITE, this, commSocket);
tid = Thread::currentThreadId();
}
void SocketTask::cleanup () {
scheduler->uninstallEvent(watcher);
watcher = 0;
scheduler->uninstallEvent(readWatcher);
readWatcher = 0;
scheduler->uninstallEvent(writeWatcher);
writeWatcher = 0;
}
bool SocketTask::handleEvent (EventToken token, EventType revents) {
bool result = true;
bool closed = false;
if (token == readWatcher && (revents & EVENT_SOCKET_READ)) {
result = handleRead(closed);
}
if (result && ! closed && token == writeWatcher) {
bool noWrite = false;
{
MUTEX_LOCKER(writeBufferLock);
noWrite = (writeBuffer == 0);
}
if (revents & EVENT_SOCKET_WRITE) {
result = handleWrite(closed, noWrite);
}
}
if (result) {
MUTEX_LOCKER(writeBufferLock);
if (writeBuffer == 0) {
scheduler->stopSocketEvents(writeWatcher);
}
else {
scheduler->startSocketEvents(writeWatcher);
}
}
return result;
}
// -----------------------------------------------------------------------------
// protected methods
// -----------------------------------------------------------------------------
bool SocketTask::handleWrite (bool& closed, bool noWrite) {
closed = false;
if (noWrite) {
return true;
}
bool callCompletedWriteBuffer = false;
{
MUTEX_LOCKER(writeBufferLock);
size_t len = writeBuffer->length() - writeLength;
int nr = 0;
if (0 < len) {
nr = write(commSocket, writeBuffer->begin() + writeLength, (int) len);
if (nr < 0) {
if (errno == EINTR) {
return handleWrite(closed, noWrite);
}
else if (errno != EWOULDBLOCK) {
LOGGER_DEBUG << "write failed with " << errno << " (" << strerror(errno) << ")";
return false;
}
else {
nr = 0;
}
}
len -= nr;
}
if (len == 0) {
if (ownBuffer) {
writeBuffer->free();
delete writeBuffer;
}
writeBuffer = 0;
callCompletedWriteBuffer = true;
}
else {
writeLength += nr;
}
}
// we have to release the lock, before calling completedWriteBuffer
if (callCompletedWriteBuffer) {
completedWriteBuffer(closed);
if (closed) {
return false;
}
}
// we might have a new write buffer or none at all
{
MUTEX_LOCKER(writeBufferLock);
if (writeBuffer == 0) {
scheduler->stopSocketEvents(writeWatcher);
}
else {
scheduler->startSocketEvents(writeWatcher);
}
}
return true;
}
bool SocketTask::hasWriteBuffer () const {
MUTEX_LOCKER(writeBufferLock);
return writeBuffer != 0;
}
void SocketTask::setWriteBuffer (StringBuffer* buffer, bool ownBuffer) {
bool callCompletedWriteBuffer = false;
{
MUTEX_LOCKER(writeBufferLock);
writeLength = 0;
if (buffer->empty()) {
if (ownBuffer) {
buffer->free();
delete buffer;
}
writeBuffer = 0;
callCompletedWriteBuffer = true;
}
else {
if (writeBuffer != 0) {
if (this->ownBuffer) {
writeBuffer->free();
delete writeBuffer;
}
}
writeBuffer = buffer;
this->ownBuffer = ownBuffer;
}
}
// we have to release the lock before calling completedWriteBuffer
if (callCompletedWriteBuffer) {
bool closed;
completedWriteBuffer(closed);
}
// we might have a new write buffer or none at all
if (tid == Thread::currentThreadId()) {
MUTEX_LOCKER(writeBufferLock);
if (writeBuffer == 0) {
scheduler->stopSocketEvents(writeWatcher);
}
else {
scheduler->startSocketEvents(writeWatcher);
}
}
else {
scheduler->sendAsync(watcher);
}
}
void SocketTask::appendWriteBuffer (StringBuffer* buffer) {
if (buffer->empty()) {
return;
}
bool newBuffer = false;
{
MUTEX_LOCKER(writeBufferLock);
if (writeBuffer == 0) {
writeLength = 0;
writeBuffer = new StringBuffer;
writeBuffer->initialise();
newBuffer = true;
}
writeBuffer->appendText(*buffer);
}
// we might have a new write buffer
if (newBuffer) {
if (tid == Thread::currentThreadId()) {
MUTEX_LOCKER(writeBufferLock);
scheduler->startSocketEvents(writeWatcher);
}
else {
scheduler->sendAsync(watcher);
}
}
}
bool SocketTask::fillReadBuffer (bool& closed) {
closed = false;
int nr = read(commSocket, tmpReadBuffer, READ_BLOCK_SIZE);
if (nr > 0) {
readBuffer->appendText(tmpReadBuffer, nr);
return true;
}
else if (nr == 0) {
closed = true;
LOGGER_TRACE << "read return 0 with " << errno << " (" << strerror(errno) << ")";
return false;
}
else {
if (errno == EINTR) {
return fillReadBuffer(closed);
}
else if (errno != EWOULDBLOCK) {
LOGGER_TRACE << "read failed with " << errno << " (" << strerror(errno) << ")";
return false;
}
else {
LOGGER_TRACE << "read would block with " << errno << " (" << strerror(errno) << ")";
return true;
}
}
}
}
}

231
Scheduler/SocketTask.h Normal file
View File

@ -0,0 +1,231 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief base class for input-output tasks from sockets
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_SOCKET_TASK_H
#define TRIAGENS_FYN_REST_SOCKET_TASK_H 1
#include "Scheduler/Task.h"
#include <Basics/Mutex.h>
#include <Basics/Thread.h>
namespace triagens {
namespace basics {
class StringBuffer;
}
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief base class for input-output tasks from sockets
////////////////////////////////////////////////////////////////////////////////
class SocketTask : virtual public Task {
private:
SocketTask (SocketTask const&);
SocketTask& operator= (SocketTask const&);
private:
static size_t const READ_BLOCK_SIZE = 10000;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task with a given socket
////////////////////////////////////////////////////////////////////////////////
explicit
SocketTask (socket_t fd);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief fills the read buffer
///
/// @param closed
/// will be set to true, if the system receives a close on the socket.
///
/// The function should be called by the input task if the scheduler has
/// indicated that new data is available. It will return true, if data could
/// be read and false if the connection has been closed.
////////////////////////////////////////////////////////////////////////////////
virtual bool fillReadBuffer (bool& closed);
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a read
///
/// @param closed
/// will be set to true, if the system receives a close on the socket.
////////////////////////////////////////////////////////////////////////////////
virtual bool handleRead (bool& closed) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a write
///
/// @param closed
/// will be set to true, if the system receives a close on the socket.
///
/// @param noWrite
/// is true if no writeBuffer exists.
////////////////////////////////////////////////////////////////////////////////
virtual bool handleWrite (bool& closed, bool noWrite);
////////////////////////////////////////////////////////////////////////////////
/// @brief called if write buffer has been sent
///
/// This called is called if the current write buffer has been sent
/// completly to the client.
////////////////////////////////////////////////////////////////////////////////
virtual void completedWriteBuffer (bool& closed) = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a socket task
///
/// This method will close the underlying socket.
////////////////////////////////////////////////////////////////////////////////
~SocketTask ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken token, EventType);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief sets an active write buffer
////////////////////////////////////////////////////////////////////////////////
void setWriteBuffer (basics::StringBuffer*, bool ownBuffer = true);
////////////////////////////////////////////////////////////////////////////////
/// @brief appends or creates an active write buffer
////////////////////////////////////////////////////////////////////////////////
void appendWriteBuffer (basics::StringBuffer*);
////////////////////////////////////////////////////////////////////////////////
/// @brief checks for presence of an active write buffer
////////////////////////////////////////////////////////////////////////////////
bool hasWriteBuffer () const;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief event for read
////////////////////////////////////////////////////////////////////////////////
EventToken readWatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief event for write
////////////////////////////////////////////////////////////////////////////////
EventToken writeWatcher;
////////////////////////////////////////////////////////////////////////////////
/// @brief event for async
////////////////////////////////////////////////////////////////////////////////
EventToken watcher;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief communication socket
////////////////////////////////////////////////////////////////////////////////
socket_t commSocket;
////////////////////////////////////////////////////////////////////////////////
/// @brief the current write buffer
////////////////////////////////////////////////////////////////////////////////
basics::StringBuffer* writeBuffer;
////////////////////////////////////////////////////////////////////////////////
/// @brief if true, the resource writeBuffer is owned by the write task
///
/// If true, the writeBuffer is deleted as soon as it has been sent to the
/// client. If false, the writeBuffer is keep alive.
////////////////////////////////////////////////////////////////////////////////
bool ownBuffer;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of bytes already written
////////////////////////////////////////////////////////////////////////////////
size_t writeLength;
////////////////////////////////////////////////////////////////////////////////
/// @brief read buffer
///
/// The function fillReadBuffer stores the data in this buffer.
////////////////////////////////////////////////////////////////////////////////
basics::StringBuffer* readBuffer;
////////////////////////////////////////////////////////////////////////////////
/// @brief lock on the write buffer
////////////////////////////////////////////////////////////////////////////////
mutable basics::Mutex writeBufferLock;
private:
TRI_tid_t tid;
char * tmpReadBuffer;
};
}
}
#endif

83
Scheduler/Task.cpp Normal file
View File

@ -0,0 +1,83 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief abstract base class for tasks
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2009-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Task.h"
#include "Scheduler/Scheduler.h"
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// TaskManager
// -----------------------------------------------------------------------------
void TaskManager::deactivateTask (Task* task) {
task->active = 0;
}
void TaskManager::deleteTask (Task* task) {
delete task;
}
void TaskManager::setupTask (Task* task, Scheduler* scheduler, EventLoop loop) {
task->setup(scheduler, loop);
}
void TaskManager::cleanupTask (Task* task) {
task->cleanup();
}
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
Task::Task (string const& name)
: scheduler(0), loop(0), name(name), active(1) {
}
Task::~Task () {
}
// -----------------------------------------------------------------------------
// protected methods
// -----------------------------------------------------------------------------
bool Task::needsMainEventLoop () const {
return false;
}
}
}

215
Scheduler/Task.h Normal file
View File

@ -0,0 +1,215 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief abstract base class for tasks
///
/// @file
/// Tasks are handled by the scheduler. The scheduler calls the task callback
/// as soon as a specific event occurs.
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_TASK_H
#define TRIAGENS_FYN_REST_TASK_H 1
#include <Basics/Common.h>
#include "Scheduler/Scheduler.h"
namespace triagens {
namespace rest {
class Task;
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief abstract base class for tasks
////////////////////////////////////////////////////////////////////////////////
class TaskManager {
private:
TaskManager (TaskManager const&);
TaskManager& operator= (TaskManager const&);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
TaskManager () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
virtual ~TaskManager () {
}
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief deactivates a task
///
/// Sets the task flag active to 0. This happens when unregisterTask or
/// destroyTask is called.
////////////////////////////////////////////////////////////////////////////////
void deactivateTask (Task* task);
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a task
////////////////////////////////////////////////////////////////////////////////
void deleteTask (Task* task);
////////////////////////////////////////////////////////////////////////////////
/// @brief called to setup the callback information
///
/// Called to setup all the necessary information within the event loop. In case
/// of a multi-threaded scheduler the event loop identifier is supplied.
////////////////////////////////////////////////////////////////////////////////
void setupTask (Task*, Scheduler*, EventLoop loop);
////////////////////////////////////////////////////////////////////////////////
/// @brief called to clear the callback information
////////////////////////////////////////////////////////////////////////////////
void cleanupTask (Task*);
};
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief abstract base class for tasks
////////////////////////////////////////////////////////////////////////////////
class Task {
friend class TaskManager;
private:
Task (Task const&);
Task& operator= (Task const&);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task
///
/// Note that the constructor has no access to the event loop. The connection
/// is provided in the method setup and any setup with regards to the event
/// loop must be done there. It is not possible to simply delete a tasks. You
/// must use the method destroy to cleanup the task, remove it from the event
/// loop and delete it. The method cleanup itself will not delete task but
/// remove it from the event loop. It is possible to use setup again to reuse
/// the task.
////////////////////////////////////////////////////////////////////////////////
explicit
Task (string const& name);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the task name for debugging
////////////////////////////////////////////////////////////////////////////////
string const& getName () const {
return name;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns true if task is still active
////////////////////////////////////////////////////////////////////////////////
bool isActive () const {
return active != 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief allow thread to run on slave event loop
////////////////////////////////////////////////////////////////////////////////
virtual bool needsMainEventLoop () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief called by scheduler to indicate an event
///
/// The method will only be called from within the scheduler thread, which
/// belongs to the loop parameter.
////////////////////////////////////////////////////////////////////////////////
virtual bool handleEvent (EventToken token, EventType event) = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a task
///
/// The method will only be called from after the task has been cleaned by
/// the method cleanup.
////////////////////////////////////////////////////////////////////////////////
virtual ~Task ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief called to set up the callback information
///
/// The method will only be called from within the scheduler thread, which
/// belongs to the loop parameter.
////////////////////////////////////////////////////////////////////////////////
virtual void setup (Scheduler*, EventLoop) = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief called to clear the callback information
///
/// The method will only be called from within the scheduler thread, which
/// belongs to the loop parameter.
////////////////////////////////////////////////////////////////////////////////
virtual void cleanup () = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief scheduler
////////////////////////////////////////////////////////////////////////////////
Scheduler* scheduler;
////////////////////////////////////////////////////////////////////////////////
/// @brief event loop identifier
////////////////////////////////////////////////////////////////////////////////
EventLoop loop;
private:
string const name;
sig_atomic_t active;
};
}
}
#endif

97
Scheduler/TimerTask.cpp Normal file
View File

@ -0,0 +1,97 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle timer events
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "TimerTask.h"
#include <Logger/Logger.h>
#include "Scheduler/Scheduler.h"
using namespace triagens::basics;
using namespace triagens::rest;
namespace triagens {
namespace rest {
// -----------------------------------------------------------------------------
// constructors and destructors
// -----------------------------------------------------------------------------
TimerTask::TimerTask (double seconds)
: Task("TimerTask"),
watcher(0),
seconds(seconds) {
}
TimerTask::~TimerTask () {
}
// -----------------------------------------------------------------------------
// Task methods
// -----------------------------------------------------------------------------
void TimerTask::setup (Scheduler* scheduler, EventLoop loop) {
this->scheduler = scheduler;
this->loop = loop;
if (0.0 < seconds) {
watcher = scheduler->installTimerEvent(loop, this, seconds);
LOGGER_TRACE << "armed TimerTask with " << seconds << " seconds";
}
else {
watcher = 0;
}
}
void TimerTask::cleanup () {
scheduler->uninstallEvent(watcher);
watcher = 0;
}
bool TimerTask::handleEvent (EventToken token, EventType revents) {
bool result = true;
if (token == watcher) {
if (revents & EVENT_TIMER) {
scheduler->uninstallEvent(watcher);
watcher = 0;
result = handleTimeout();
}
}
return result;
}
}
}

107
Scheduler/TimerTask.h Normal file
View File

@ -0,0 +1,107 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief tasks used to handle timer events
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2011 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
/// @author Copyright 2008-2011, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_FYN_REST_TIMER_TASK_H
#define TRIAGENS_FYN_REST_TIMER_TASK_H 1
#include "Scheduler/Task.h"
namespace triagens {
namespace rest {
////////////////////////////////////////////////////////////////////////////////
/// @ingroup Scheduler
/// @brief task used to handle timer events
////////////////////////////////////////////////////////////////////////////////
class TimerTask : virtual public Task {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new task for a timer event
////////////////////////////////////////////////////////////////////////////////
explicit
TimerTask (double seconds);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief called when the timer is reached
////////////////////////////////////////////////////////////////////////////////
virtual bool handleTimeout () = 0;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~TimerTask ();
protected:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setup (Scheduler*, EventLoop);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void cleanup ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool handleEvent (EventToken token, EventType event);
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief timer event
////////////////////////////////////////////////////////////////////////////////
EventToken watcher;
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief timeout
////////////////////////////////////////////////////////////////////////////////
double seconds;
};
}
}
#endif