From a92b49d46e4bc73d6b8bed1d23bf92c7cdb46368 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Mon, 9 Jan 2017 17:06:33 +0100 Subject: [PATCH 1/2] fix races in VST response handling --- arangod/Scheduler/SocketTask.cpp | 92 +++++++++++++++++++------------- arangod/Scheduler/SocketTask.h | 4 ++ lib/Basics/MutexLocker.cpp | 17 ++++-- lib/Basics/MutexLocker.h | 15 +++++- 4 files changed, 85 insertions(+), 43 deletions(-) diff --git a/arangod/Scheduler/SocketTask.cpp b/arangod/Scheduler/SocketTask.cpp index 3c4116de90..ad6dce3bd7 100644 --- a/arangod/Scheduler/SocketTask.cpp +++ b/arangod/Scheduler/SocketTask.cpp @@ -26,6 +26,7 @@ #include "SocketTask.h" +#include "Basics/MutexLocker.h" #include "Basics/StringBuffer.h" #include "Basics/socket-utils.h" #include "Endpoint/ConnectionInfo.h" @@ -134,12 +135,14 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer, LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "SocketTask::addWriteBuffer - " "Statistics release: nullptr - " - "nothing to realease"; + "nothing to release"; } return; } + MUTEX_LOCKER(locker, _writeLock); + if (_writeBuffer != nullptr) { _writeBuffers.push_back(buffer); _writeBuffersStats.push_back(stat); @@ -156,19 +159,19 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer, } if (_writeBuffer != nullptr) { - boost::system::error_code ec; size_t total = _writeBuffer->length(); size_t written = 0; boost::system::error_code err; do { - ec.assign(boost::system::errc::success, - boost::system::generic_category()); + err.assign(boost::system::errc::success, + boost::system::generic_category()); written = _peer->write(_writeBuffer, err); if(_writeBufferStatistics){ _writeBufferStatistics->_sentBytes += written; } if (written == total) { + locker.unlock(); completedWriteBuffer(); return; } @@ -181,19 +184,27 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer, closeStream(); return; } + + boost::system::error_code ec; + ec.assign(boost::system::errc::success, + boost::system::generic_category()); auto self = shared_from_this(); auto handler = [self, this](const boost::system::error_code& ec, std::size_t transferred) { - if(_writeBufferStatistics){ + MUTEX_LOCKER(locker, _writeLock); + + if (_writeBufferStatistics){ _writeBufferStatistics->_sentBytes += transferred; } + if (ec) { LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::addWriterBuffer(async_write) - write on stream " << " failed with: " << ec.message(); closeStream(); } else { + locker.unlock(); completedWriteBuffer(); } }; @@ -205,44 +216,49 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer, } void SocketTask::completedWriteBuffer() { - delete _writeBuffer; - _writeBuffer = nullptr; + StringBuffer* buffer = nullptr; + TRI_request_statistics_t* statistics = nullptr; + { + MUTEX_LOCKER(locker, _writeLock); - if (_writeBufferStatistics != nullptr) { - _writeBufferStatistics->_writeEnd = TRI_StatisticsTime(); -#ifdef DEBUG_STATISTICS - LOG_TOPIC(TRACE, Logger::REQUESTS) - << "SocketTask::addWriteBuffer - Statistics release: " - << _writeBufferStatistics->to_string(); -#endif - TRI_ReleaseRequestStatistics(_writeBufferStatistics); - _writeBufferStatistics = nullptr; - } else { -#ifdef DEBUG_STATISTICS - LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - " - "Statistics release: nullptr - " - "nothing to realease"; -#endif - } - - if (_writeBuffers.empty()) { - if (_closeRequested) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::" - "completedWriteBuffer - close " - "requested, closing stream"; + delete _writeBuffer; + _writeBuffer = nullptr; - closeStream(); + if (_writeBufferStatistics != nullptr) { + _writeBufferStatistics->_writeEnd = TRI_StatisticsTime(); +#ifdef DEBUG_STATISTICS + LOG_TOPIC(TRACE, Logger::REQUESTS) + << "SocketTask::addWriteBuffer - Statistics release: " + << _writeBufferStatistics->to_string(); +#endif + TRI_ReleaseRequestStatistics(_writeBufferStatistics); + _writeBufferStatistics = nullptr; + } else { +#ifdef DEBUG_STATISTICS + LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - " + "Statistics release: nullptr - " + "nothing to release"; +#endif } - return; + if (_writeBuffers.empty()) { + if (_closeRequested) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::" + "completedWriteBuffer - close " + "requested, closing stream"; + + closeStream(); + } + + return; + } + + buffer = _writeBuffers.front(); + _writeBuffers.pop_front(); + + statistics = _writeBuffersStats.front(); + _writeBuffersStats.pop_front(); } - - StringBuffer* buffer = _writeBuffers.front(); - _writeBuffers.pop_front(); - - TRI_request_statistics_t* statistics = _writeBuffersStats.front(); - _writeBuffersStats.pop_front(); - addWriteBuffer(buffer, statistics); } diff --git a/arangod/Scheduler/SocketTask.h b/arangod/Scheduler/SocketTask.h index 60eb422d66..5aa9937c5c 100644 --- a/arangod/Scheduler/SocketTask.h +++ b/arangod/Scheduler/SocketTask.h @@ -29,6 +29,7 @@ #include +#include "Basics/Mutex.h" #include "Basics/StringBuffer.h" #include "Basics/asio-helper.h" #include "Scheduler/Socket.h" @@ -93,12 +94,15 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent { basics::StringBuffer _readBuffer; + private: + Mutex _writeLock; basics::StringBuffer* _writeBuffer = nullptr; TRI_request_statistics_t* _writeBufferStatistics = nullptr; std::deque _writeBuffers; std::deque _writeBuffersStats; + protected: std::unique_ptr _peer; boost::posix_time::milliseconds _keepAliveTimeout; boost::asio::deadline_timer _keepAliveTimer; diff --git a/lib/Basics/MutexLocker.cpp b/lib/Basics/MutexLocker.cpp index 864284ceba..1894dcb4a1 100644 --- a/lib/Basics/MutexLocker.cpp +++ b/lib/Basics/MutexLocker.cpp @@ -40,7 +40,7 @@ using namespace arangodb::basics; #ifdef TRI_SHOW_LOCK_TIME MutexLocker::MutexLocker(Mutex* mutex, char const* file, int line) - : _mutex(mutex), _file(file), _line(line), _time(0.0) { + : _mutex(mutex), isLocked(true), _file(file), _line(line), _time(0.0) { double t = TRI_microtime(); _mutex->lock(); _time = TRI_microtime() - t; @@ -48,7 +48,9 @@ MutexLocker::MutexLocker(Mutex* mutex, char const* file, int line) #else -MutexLocker::MutexLocker(Mutex* mutex) : _mutex(mutex) { _mutex->lock(); } +MutexLocker::MutexLocker(Mutex* mutex) : _mutex(mutex), _isLocked(true) { + _mutex->lock(); +} #endif @@ -57,7 +59,9 @@ MutexLocker::MutexLocker(Mutex* mutex) : _mutex(mutex) { _mutex->lock(); } //////////////////////////////////////////////////////////////////////////////// MutexLocker::~MutexLocker() { - _mutex->unlock(); + if (_isLocked) { + _mutex->unlock(); + } #ifdef TRI_SHOW_LOCK_TIME if (_time > TRI_SHOW_LOCK_THRESHOLD) { @@ -65,3 +69,10 @@ MutexLocker::~MutexLocker() { } #endif } + +void MutexLocker::unlock() { + if (_isLocked) { + _mutex->unlock(); + _isLocked = false; + } +} diff --git a/lib/Basics/MutexLocker.h b/lib/Basics/MutexLocker.h index 835f2898b9..e5d18fe30a 100644 --- a/lib/Basics/MutexLocker.h +++ b/lib/Basics/MutexLocker.h @@ -79,11 +79,16 @@ class MutexLocker { #endif + + ~MutexLocker(); + + bool isLocked() const { return _isLocked; } + ////////////////////////////////////////////////////////////////////////////// /// @brief releases the lock ////////////////////////////////////////////////////////////////////////////// - - ~MutexLocker(); + + void unlock(); private: ////////////////////////////////////////////////////////////////////////////// @@ -91,6 +96,12 @@ class MutexLocker { ////////////////////////////////////////////////////////////////////////////// Mutex* _mutex; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief whether or not the mutex is locked + ////////////////////////////////////////////////////////////////////////////// + + bool _isLocked; #ifdef TRI_SHOW_LOCK_TIME From 0590b984426736c1beb3f341219bf5b79c2a2ef4 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Mon, 9 Jan 2017 17:08:28 +0100 Subject: [PATCH 2/2] properly initialize variables... --- lib/Basics/MutexLocker.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Basics/MutexLocker.cpp b/lib/Basics/MutexLocker.cpp index 1894dcb4a1..3086ca0ca5 100644 --- a/lib/Basics/MutexLocker.cpp +++ b/lib/Basics/MutexLocker.cpp @@ -40,7 +40,7 @@ using namespace arangodb::basics; #ifdef TRI_SHOW_LOCK_TIME MutexLocker::MutexLocker(Mutex* mutex, char const* file, int line) - : _mutex(mutex), isLocked(true), _file(file), _line(line), _time(0.0) { + : _mutex(mutex), isLocked(true), _isLocked(true), _file(file), _line(line), _time(0.0) { double t = TRI_microtime(); _mutex->lock(); _time = TRI_microtime() - t;