1
0
Fork 0

small cleanup

This commit is contained in:
Jan Steemann 2015-08-12 15:01:33 +02:00
parent bbb22ef005
commit f1c5480e97
12 changed files with 77 additions and 117 deletions

View File

@ -219,7 +219,7 @@ namespace RandomHelper {
if (r == 0) {
LOG_FATAL_AND_EXIT("read on random device failed: nothing read");
}
else if (errno == EWOULDBLOCK) {
else if (errno == EWOULDBLOCK || errno == EAGAIN) {
LOG_INFO("not enough entropy (got %d), switching to pseudo-random", (int) (sizeof(buffer) - n));
break;
}

View File

@ -191,9 +191,10 @@ void HttpServer::stopListening () {
////////////////////////////////////////////////////////////////////////////////
void HttpServer::registerChunkedTask (HttpCommTask* task, ssize_t n) {
auto id = task->taskId();
MUTEX_LOCKER(HttpCommTaskMapLock);
HttpCommTaskMap[task->taskId()] = task;
HttpCommTaskMap[id] = task;
}
////////////////////////////////////////////////////////////////////////////////
@ -201,9 +202,10 @@ void HttpServer::registerChunkedTask (HttpCommTask* task, ssize_t n) {
////////////////////////////////////////////////////////////////////////////////
void HttpServer::unregisterChunkedTask (HttpCommTask* task) {
auto id = task->taskId();
MUTEX_LOCKER(HttpCommTaskMapLock);
HttpCommTaskMap.erase(task->taskId());
HttpCommTaskMap.erase(id);
}
////////////////////////////////////////////////////////////////////////////////
@ -237,14 +239,14 @@ void HttpServer::stop () {
void HttpServer::handleConnected (TRI_socket_t s, const ConnectionInfo& info) {
HttpCommTask* task = createCommTask(s, info);
{
try {
GENERAL_SERVER_LOCKER(_commTasksLock);
try {
_commTasks.emplace(task);
}
catch (...) {
throw;
}
_commTasks.emplace(task);
}
catch (...) {
// destroy the task to prevent a leak
deleteTask(task);
throw;
}
// registers the task and get the number of the scheduler thread

View File

@ -97,7 +97,6 @@ bool ListenTask::setup (Scheduler* scheduler, EventLoop loop) {
return true;
}
#ifdef _WIN32
// ..........................................................................
@ -166,7 +165,7 @@ bool ListenTask::handleEvent (EventToken token, EventType revents) {
"expect sockaddr size to be less or equal to the v6 version");
sockaddr_in6 addrmem;
sockaddr_in *addr = (sockaddr_in *)&addrmem;
sockaddr_in* addr = (sockaddr_in*) &addrmem;
socklen_t len = sizeof(sockaddr_in6);
memset(addr, 0, sizeof(sockaddr_in6));
@ -193,7 +192,7 @@ bool ListenTask::handleEvent (EventToken token, EventType revents) {
acceptFailures = 0;
struct sockaddr_in6 addr_out_mem;
struct sockaddr_in *addr_out = (sockaddr_in*) &addr_out_mem;;
struct sockaddr_in* addr_out = (sockaddr_in*) &addr_out_mem;;
socklen_t len_out = sizeof(addr_out_mem);
int res = TRI_getsockname(connectionSocket, (sockaddr*) addr_out, &len_out);
@ -234,9 +233,8 @@ bool ListenTask::handleEvent (EventToken token, EventType revents) {
else {
Endpoint::DomainType type = _endpoint->getDomainType();
if (type == Endpoint::DOMAIN_IPV4) {
const char *p;
char buf[INET_ADDRSTRLEN + 1];
p = inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf) - 1);
char const* p = inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf) - 1);
buf[INET_ADDRSTRLEN] = '\0';
if (p != nullptr) {
info.clientAddress = p;
@ -244,9 +242,8 @@ bool ListenTask::handleEvent (EventToken token, EventType revents) {
info.clientPort = addr->sin_port;
}
else if (type == Endpoint::DOMAIN_IPV6) {
const char *p;
char buf[INET6_ADDRSTRLEN + 1];
p = inet_ntop(AF_INET6, &addrmem.sin6_addr, buf, sizeof(buf) - 1);
char const* p = inet_ntop(AF_INET6, &addrmem.sin6_addr, buf, sizeof(buf) - 1);
buf[INET6_ADDRSTRLEN] = '\0';
if (p != nullptr) {
info.clientAddress = p;

View File

@ -447,18 +447,26 @@ void Scheduler::setProcessorAffinity (size_t i, size_t c) {
////////////////////////////////////////////////////////////////////////////////
int Scheduler::registerTask (Task* task, ssize_t* got, ssize_t want) {
SchedulerThread* thread = nullptr;
if (task->isUserDefined() && task->id().empty()) {
// user-defined task without id is invalid
return TRI_ERROR_TASK_INVALID_ID;
}
string const& name = task->name();
std::string const& name = task->name();
LOG_TRACE("registerTask for task %p (%s)", (void*) task, name.c_str());
// determine thread
SchedulerThread* thread = nullptr;
size_t n = 0;
if (0 <= want) {
n = want;
if (nrThreads <= n) {
return TRI_ERROR_INTERNAL;
}
}
{
size_t n = 0;
MUTEX_LOCKER(schedulerLock);
int res = checkInsertTask(task);
@ -467,28 +475,21 @@ int Scheduler::registerTask (Task* task, ssize_t* got, ssize_t want) {
return res;
}
if (0 <= want) {
n = want;
if (nrThreads <= n) {
return TRI_ERROR_INTERNAL;
}
}
else {
if (0 > want) {
if (multiThreading && ! task->needsMainEventLoop()) {
n = (++nextLoop) % nrThreads;
}
}
if (nullptr != got) {
*got = static_cast<ssize_t>(n);
}
thread = threads[n];
task2thread[task] = thread;
taskRegistered.emplace(task);
}
if (nullptr != got) {
*got = static_cast<ssize_t>(n);
}
if (! thread->registerTask(this, task)) {
return TRI_ERROR_INTERNAL;
@ -507,7 +508,7 @@ int Scheduler::checkInsertTask (Task const* task) {
// this is a user-defined task
// now check if there is an id conflict
string const& id = task->id();
std::string const& id = task->id();
for (auto& it : task2thread) {
auto const* t = it.first;

View File

@ -83,7 +83,7 @@ namespace {
AsyncWatcher* watcher = (AsyncWatcher*) w; // cast from C type to C++ class
Task* task = watcher->task;
if (task != nullptr && (revents & EV_ASYNC) && task->isActive()) {
if (task != nullptr && (revents & EV_ASYNC)) {
task->handleEvent(watcher, EVENT_ASYNC);
}
}
@ -119,7 +119,7 @@ namespace {
SocketWatcher* watcher = (SocketWatcher*) w; // cast from C type to C++ class
Task* task = watcher->task;
if (task != nullptr && task->isActive()) {
if (task != nullptr) {
if (revents & EV_READ) {
if (revents & EV_WRITE) {
task->handleEvent(watcher, EVENT_SOCKET_READ | EVENT_SOCKET_WRITE);
@ -157,7 +157,7 @@ namespace {
PeriodicWatcher* watcher = (PeriodicWatcher*) w; // cast from C type to C++ class
Task* task = watcher->task;
if (task != nullptr && (revents & EV_PERIODIC) && task->isActive()) {
if (task != nullptr && (revents & EV_PERIODIC)) {
task->handleEvent(watcher, EVENT_PERIODIC);
}
}
@ -185,7 +185,7 @@ namespace {
SignalWatcher* watcher = (SignalWatcher*) w; // cast from C type to C++ class
Task* task = watcher->task;
if (task != nullptr && (revents & EV_SIGNAL) && task->isActive()) {
if (task != nullptr && (revents & EV_SIGNAL)) {
task->handleEvent(watcher, EVENT_SIGNAL);
}
}
@ -213,7 +213,7 @@ namespace {
TimerWatcher* watcher = (TimerWatcher*) w; // cast from C type to C++ class
Task* task = watcher->task;
if (task != nullptr && (revents & EV_TIMER) && task->isActive()) {
if (task != nullptr && (revents & EV_TIMER)) {
task->handleEvent(watcher, EVENT_TIMER);
}
}

View File

@ -153,8 +153,6 @@ bool SchedulerThread::registerTask (Scheduler* scheduler, Task* task) {
////////////////////////////////////////////////////////////////////////////////
void SchedulerThread::unregisterTask (Task* task) {
deactivateTask(task);
// thread has already been stopped
if (_stopped) {
// do nothing
@ -184,8 +182,6 @@ void SchedulerThread::unregisterTask (Task* task) {
////////////////////////////////////////////////////////////////////////////////
void SchedulerThread::destroyTask (Task* task) {
deactivateTask(task);
// thread has already been stopped
if (_stopped) {
deleteTask(task);

View File

@ -134,7 +134,8 @@ bool SocketTask::fillReadBuffer () {
_readBuffer->increaseLength(nr);
return true;
}
else if (nr == 0) {
if (nr == 0) {
_clientClosed = true;
LOG_TRACE("read returned 0");
@ -142,17 +143,26 @@ bool SocketTask::fillReadBuffer () {
return false;
}
if (errno == EINTR) {
int myerrno = errno;
if (myerrno == EINTR) {
return fillReadBuffer();
}
if (errno != EWOULDBLOCK) {
LOG_TRACE("read failed with %d: %s", (int) errno, strerror(errno));
if (myerrno != EWOULDBLOCK && myerrno != EAGAIN) {
LOG_TRACE("read failed with %d: %s", (int) myerrno, strerror(myerrno));
return false;
}
LOG_TRACE("read would block with %d: %s", (int) errno, strerror(errno));
TRI_ASSERT(myerrno == EWOULDBLOCK || myerrno == EAGAIN);
// from man(2) read:
// The file descriptor fd refers to a socket and has been marked nonblocking (O_NONBLOCK),
// and the read would block. POSIX.1-2001 allows
// either error to be returned for this case, and does not require these constants to have the same value,
// so a portable application should check for both possibilities.
LOG_TRACE("read would block with %d: %s", (int) myerrno, strerror(myerrno));
return true;
}
@ -162,9 +172,6 @@ bool SocketTask::fillReadBuffer () {
////////////////////////////////////////////////////////////////////////////////
bool SocketTask::handleWrite () {
bool callCompletedWriteBuffer = false;
// size_t is unsigned, should never get < 0
size_t len = 0;
if (nullptr != _writeBuffer) {
@ -178,19 +185,23 @@ bool SocketTask::handleWrite () {
nr = TRI_WRITE_SOCKET(_commSocket, _writeBuffer->begin() + _writeLength, (int) len, 0);
if (nr < 0) {
if (errno == EINTR) {
int myerrno = errno;
if (myerrno == EINTR) {
return handleWrite();
}
else if (errno != EWOULDBLOCK) {
LOG_DEBUG("write failed with %d: %s", (int) errno, strerror(errno));
if (myerrno != EWOULDBLOCK || myerrno != EAGAIN) {
LOG_DEBUG("write failed with %d: %s", (int) myerrno, strerror(myerrno));
return false;
}
else {
nr = 0;
}
nr = 0;
}
TRI_ASSERT(nr >= 0);
len -= nr;
}
@ -199,19 +210,15 @@ bool SocketTask::handleWrite () {
delete _writeBuffer;
}
callCompletedWriteBuffer = true;
}
else {
_writeLength += nr;
}
if (callCompletedWriteBuffer) {
completedWriteBuffer();
// rearm timer for keep-alive timeout
// TODO: do we need some lock before we modify the scheduler?
setKeepAliveTimeout(_keepAliveTimeout);
}
else {
_writeLength += nr;
}
if (_clientClosed) {
return false;
@ -237,10 +244,7 @@ bool SocketTask::handleWrite () {
////////////////////////////////////////////////////////////////////////////////
void SocketTask::setWriteBuffer (StringBuffer* buffer,
TRI_request_statistics_t* statistics,
bool ownBuffer) {
bool callCompletedWriteBuffer = false;
TRI_request_statistics_t* statistics) {
_writeBufferStatistics = statistics;
if (_writeBufferStatistics != nullptr) {
@ -251,11 +255,9 @@ void SocketTask::setWriteBuffer (StringBuffer* buffer,
_writeLength = 0;
if (buffer->empty()) {
if (ownBuffer) {
delete buffer;
}
delete buffer;
callCompletedWriteBuffer = true;
completedWriteBuffer();
}
else {
if (_writeBuffer != nullptr) {
@ -265,12 +267,7 @@ void SocketTask::setWriteBuffer (StringBuffer* buffer,
}
_writeBuffer = buffer;
_ownBuffer = ownBuffer;
}
if (callCompletedWriteBuffer) {
completedWriteBuffer();
_ownBuffer = true;
}
if (_clientClosed) {

View File

@ -162,8 +162,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
void setWriteBuffer (basics::StringBuffer*,
TRI_request_statistics_t*,
bool ownBuffer = true);
TRI_request_statistics_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief checks for presence of an active write buffer

View File

@ -50,21 +50,15 @@ namespace {
Task::Task (string const& id,
string const& name)
: _scheduler(0),
: _scheduler(nullptr),
_loop(0),
_taskId(NEXT_TASK_ID.fetch_add(1, memory_order_relaxed)),
_taskId(NEXT_TASK_ID.fetch_add(1, memory_order_seq_cst)),
_id(id),
_name(name),
_active(1) {
_name(name) {
}
Task::Task (string const& name)
: _scheduler(0),
_loop(0),
_taskId(NEXT_TASK_ID.fetch_add(1, memory_order_relaxed)),
_id(),
_name(name),
_active(1) {
: Task("", name) {
}
Task::~Task () {

View File

@ -94,14 +94,6 @@ namespace triagens {
return _id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns true if task is still active
////////////////////////////////////////////////////////////////////////////////
bool isActive () const {
return _active != 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the internal task identifier
////////////////////////////////////////////////////////////////////////////////
@ -208,11 +200,6 @@ namespace triagens {
std::string const _name;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the task is active
////////////////////////////////////////////////////////////////////////////////
volatile sig_atomic_t _active;
};
}
}

View File

@ -38,10 +38,6 @@ using namespace triagens::rest;
// TaskManager
// -----------------------------------------------------------------------------
void TaskManager::deactivateTask (Task* task) {
task->_active = 0;
}
void TaskManager::deleteTask (Task* task) {
delete task;
}

View File

@ -70,15 +70,6 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief deactivates a task
///
/// Sets the task flag active to 0. This happens when unregisterTask or
/// destroyTask is called.
////////////////////////////////////////////////////////////////////////////////
void deactivateTask (Task* task);
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a task
////////////////////////////////////////////////////////////////////////////////