diff --git a/arangod/Scheduler/SocketTask.cpp b/arangod/Scheduler/SocketTask.cpp index ad6dce3bd7..3008d362a3 100644 --- a/arangod/Scheduler/SocketTask.cpp +++ b/arangod/Scheduler/SocketTask.cpp @@ -80,7 +80,7 @@ SocketTask::~SocketTask() { if (err) { LOG_TOPIC(ERR, Logger::COMMUNICATION) << "unable to cancel _keepAliveTimer"; - } + } } void SocketTask::start() { @@ -143,123 +143,128 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer, MUTEX_LOCKER(locker, _writeLock); - if (_writeBuffer != nullptr) { - _writeBuffers.push_back(buffer); - _writeBuffersStats.push_back(stat); - return; + //buffer and stats will be NULL when called form async handler + if (buffer) { + if (_writeBuffer != nullptr) { + _writeBuffers.push_back(buffer); + _writeBuffersStats.push_back(stat); + return; + } + + _writeBuffer = buffer; + _writeBufferStatistics = stat; } - _writeBuffer = buffer; - _writeBufferStatistics = stat; // threadsafe? why not pass to - // completedWriteBuffer does this work with - // async? - - if(_writeBufferStatistics){ - _writeBufferStatistics->_writeStart = TRI_StatisticsTime(); - } if (_writeBuffer != nullptr) { size_t total = _writeBuffer->length(); size_t written = 0; boost::system::error_code err; - do { - err.assign(boost::system::errc::success, - boost::system::generic_category()); + err.clear(); + + while (true){ written = _peer->write(_writeBuffer, err); if(_writeBufferStatistics){ _writeBufferStatistics->_sentBytes += written; } - if (written == total) { - locker.unlock(); - completedWriteBuffer(); + if (err || written != total) { + break; //unable to write everything at once + //might be a lot of data + //above code does not update the buffer positon + } + + if(! completedWriteBuffer()){ return; } - } while (err == boost::asio::error::would_block); - if (err != boost::system::errc::success) { + total = _writeBuffer->length(); + written = 0; + if(_writeBufferStatistics){ + _writeBufferStatistics->_writeStart = TRI_StatisticsTime(); + } + } + + // write could have blocked which is the only acceptable error + if (err && err != ::boost::asio::error::would_block) { LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::addWriteBuffer (write_some) - write on stream " << " failed with: " << err.message(); closeStream(); return; } - - boost::system::error_code ec; - ec.assign(boost::system::errc::success, - boost::system::generic_category()); + + // so the code could have blocked at this point or not all data + // was written in one go auto self = shared_from_this(); - auto handler = [self, this](const boost::system::error_code& ec, - std::size_t transferred) { - MUTEX_LOCKER(locker, _writeLock); + //begin writing at offset (written) + _peer->asyncWrite(boost::asio::buffer(_writeBuffer->begin() + written, total - written) + ,[self, this](const boost::system::error_code& ec + ,std::size_t transferred) + { + MUTEX_LOCKER(locker, _writeLock); - if (_writeBufferStatistics){ - _writeBufferStatistics->_sentBytes += transferred; - } + if (_writeBufferStatistics){ + _writeBufferStatistics->_sentBytes += transferred; + } - if (ec) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) - << "SocketTask::addWriterBuffer(async_write) - write on stream " - << " failed with: " << ec.message(); - closeStream(); - } else { - locker.unlock(); - completedWriteBuffer(); - } - }; - - _peer->asyncWrite( - boost::asio::buffer(_writeBuffer->begin() + written, total - written), - handler); + if (ec) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) + << "SocketTask::addWriterBuffer(async_write) - write on stream " + << " failed with: " << ec.message(); + closeStream(); + } else { + if (completedWriteBuffer()){ + //completedWriteBuffer already advanced _wirteBuffer and _wirteBufferStatistics + locker.unlock(); // avoid recursive locking + addWriteBuffer(nullptr, nullptr); + } + } + } + ); } } -void SocketTask::completedWriteBuffer() { - StringBuffer* buffer = nullptr; - TRI_request_statistics_t* statistics = nullptr; - { - MUTEX_LOCKER(locker, _writeLock); +bool SocketTask::completedWriteBuffer() { + delete _writeBuffer; + _writeBuffer = nullptr; - delete _writeBuffer; - _writeBuffer = nullptr; - - if (_writeBufferStatistics != nullptr) { - _writeBufferStatistics->_writeEnd = TRI_StatisticsTime(); + if (_writeBufferStatistics != nullptr) { + _writeBufferStatistics->_writeEnd = TRI_StatisticsTime(); #ifdef DEBUG_STATISTICS - LOG_TOPIC(TRACE, Logger::REQUESTS) - << "SocketTask::addWriteBuffer - Statistics release: " - << _writeBufferStatistics->to_string(); + LOG_TOPIC(TRACE, Logger::REQUESTS) + << "SocketTask::addWriteBuffer - Statistics release: " + << _writeBufferStatistics->to_string(); #endif - TRI_ReleaseRequestStatistics(_writeBufferStatistics); - _writeBufferStatistics = nullptr; - } else { + TRI_ReleaseRequestStatistics(_writeBufferStatistics); + _writeBufferStatistics = nullptr; + } else { #ifdef DEBUG_STATISTICS - LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - " - "Statistics release: nullptr - " - "nothing to release"; + LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - " + "Statistics release: nullptr - " + "nothing to release"; #endif - } - - if (_writeBuffers.empty()) { - if (_closeRequested) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::" - "completedWriteBuffer - close " - "requested, closing stream"; - - closeStream(); - } - - return; - } - - buffer = _writeBuffers.front(); - _writeBuffers.pop_front(); - - statistics = _writeBuffersStats.front(); - _writeBuffersStats.pop_front(); } - addWriteBuffer(buffer, statistics); + + if (_writeBuffers.empty()) { + if (_closeRequested) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::" + "completedWriteBuffer - close " + "requested, closing stream"; + + closeStream(); + } + + return false; + } + + _writeBuffer = _writeBuffers.front(); + _writeBuffers.pop_front(); + + _writeBufferStatistics = _writeBuffersStats.front(); + _writeBuffersStats.pop_front(); + return true; } void SocketTask::closeStream() { diff --git a/arangod/Scheduler/SocketTask.h b/arangod/Scheduler/SocketTask.h index 5aa9937c5c..42718b6cf2 100644 --- a/arangod/Scheduler/SocketTask.h +++ b/arangod/Scheduler/SocketTask.h @@ -82,7 +82,6 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent { void addWriteBuffer(basics::StringBuffer*, TRI_request_statistics_t*); - void completedWriteBuffer(); void closeStream(); @@ -95,6 +94,7 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent { basics::StringBuffer _readBuffer; private: + bool completedWriteBuffer(); //returns next buffer to write or none Mutex _writeLock; basics::StringBuffer* _writeBuffer = nullptr; TRI_request_statistics_t* _writeBufferStatistics = nullptr;