1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2017-01-09 17:30:58 +01:00
commit 11b83b762b
4 changed files with 85 additions and 43 deletions

View File

@ -26,6 +26,7 @@
#include "SocketTask.h" #include "SocketTask.h"
#include "Basics/MutexLocker.h"
#include "Basics/StringBuffer.h" #include "Basics/StringBuffer.h"
#include "Basics/socket-utils.h" #include "Basics/socket-utils.h"
#include "Endpoint/ConnectionInfo.h" #include "Endpoint/ConnectionInfo.h"
@ -134,12 +135,14 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
LOG_TOPIC(TRACE, Logger::COMMUNICATION) LOG_TOPIC(TRACE, Logger::COMMUNICATION)
<< "SocketTask::addWriteBuffer - " << "SocketTask::addWriteBuffer - "
"Statistics release: nullptr - " "Statistics release: nullptr - "
"nothing to realease"; "nothing to release";
} }
return; return;
} }
MUTEX_LOCKER(locker, _writeLock);
if (_writeBuffer != nullptr) { if (_writeBuffer != nullptr) {
_writeBuffers.push_back(buffer); _writeBuffers.push_back(buffer);
_writeBuffersStats.push_back(stat); _writeBuffersStats.push_back(stat);
@ -156,19 +159,19 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
} }
if (_writeBuffer != nullptr) { if (_writeBuffer != nullptr) {
boost::system::error_code ec;
size_t total = _writeBuffer->length(); size_t total = _writeBuffer->length();
size_t written = 0; size_t written = 0;
boost::system::error_code err; boost::system::error_code err;
do { do {
ec.assign(boost::system::errc::success, err.assign(boost::system::errc::success,
boost::system::generic_category()); boost::system::generic_category());
written = _peer->write(_writeBuffer, err); written = _peer->write(_writeBuffer, err);
if(_writeBufferStatistics){ if(_writeBufferStatistics){
_writeBufferStatistics->_sentBytes += written; _writeBufferStatistics->_sentBytes += written;
} }
if (written == total) { if (written == total) {
locker.unlock();
completedWriteBuffer(); completedWriteBuffer();
return; return;
} }
@ -181,19 +184,27 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
closeStream(); closeStream();
return; return;
} }
boost::system::error_code ec;
ec.assign(boost::system::errc::success,
boost::system::generic_category());
auto self = shared_from_this(); auto self = shared_from_this();
auto handler = [self, this](const boost::system::error_code& ec, auto handler = [self, this](const boost::system::error_code& ec,
std::size_t transferred) { std::size_t transferred) {
if(_writeBufferStatistics){ MUTEX_LOCKER(locker, _writeLock);
if (_writeBufferStatistics){
_writeBufferStatistics->_sentBytes += transferred; _writeBufferStatistics->_sentBytes += transferred;
} }
if (ec) { if (ec) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "SocketTask::addWriterBuffer(async_write) - write on stream " << "SocketTask::addWriterBuffer(async_write) - write on stream "
<< " failed with: " << ec.message(); << " failed with: " << ec.message();
closeStream(); closeStream();
} else { } else {
locker.unlock();
completedWriteBuffer(); completedWriteBuffer();
} }
}; };
@ -205,44 +216,49 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
} }
void SocketTask::completedWriteBuffer() { void SocketTask::completedWriteBuffer() {
delete _writeBuffer; StringBuffer* buffer = nullptr;
_writeBuffer = nullptr; TRI_request_statistics_t* statistics = nullptr;
{
MUTEX_LOCKER(locker, _writeLock);
if (_writeBufferStatistics != nullptr) { delete _writeBuffer;
_writeBufferStatistics->_writeEnd = TRI_StatisticsTime(); _writeBuffer = nullptr;
#ifdef DEBUG_STATISTICS
LOG_TOPIC(TRACE, Logger::REQUESTS)
<< "SocketTask::addWriteBuffer - Statistics release: "
<< _writeBufferStatistics->to_string();
#endif
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
_writeBufferStatistics = nullptr;
} else {
#ifdef DEBUG_STATISTICS
LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - "
"Statistics release: nullptr - "
"nothing to realease";
#endif
}
if (_writeBuffers.empty()) {
if (_closeRequested) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::"
"completedWriteBuffer - close "
"requested, closing stream";
closeStream(); if (_writeBufferStatistics != nullptr) {
_writeBufferStatistics->_writeEnd = TRI_StatisticsTime();
#ifdef DEBUG_STATISTICS
LOG_TOPIC(TRACE, Logger::REQUESTS)
<< "SocketTask::addWriteBuffer - Statistics release: "
<< _writeBufferStatistics->to_string();
#endif
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
_writeBufferStatistics = nullptr;
} else {
#ifdef DEBUG_STATISTICS
LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - "
"Statistics release: nullptr - "
"nothing to release";
#endif
} }
return; if (_writeBuffers.empty()) {
if (_closeRequested) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::"
"completedWriteBuffer - close "
"requested, closing stream";
closeStream();
}
return;
}
buffer = _writeBuffers.front();
_writeBuffers.pop_front();
statistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
} }
StringBuffer* buffer = _writeBuffers.front();
_writeBuffers.pop_front();
TRI_request_statistics_t* statistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
addWriteBuffer(buffer, statistics); addWriteBuffer(buffer, statistics);
} }

View File

@ -29,6 +29,7 @@
#include <boost/asio/ssl.hpp> #include <boost/asio/ssl.hpp>
#include "Basics/Mutex.h"
#include "Basics/StringBuffer.h" #include "Basics/StringBuffer.h"
#include "Basics/asio-helper.h" #include "Basics/asio-helper.h"
#include "Scheduler/Socket.h" #include "Scheduler/Socket.h"
@ -93,12 +94,15 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent {
basics::StringBuffer _readBuffer; basics::StringBuffer _readBuffer;
private:
Mutex _writeLock;
basics::StringBuffer* _writeBuffer = nullptr; basics::StringBuffer* _writeBuffer = nullptr;
TRI_request_statistics_t* _writeBufferStatistics = nullptr; TRI_request_statistics_t* _writeBufferStatistics = nullptr;
std::deque<basics::StringBuffer*> _writeBuffers; std::deque<basics::StringBuffer*> _writeBuffers;
std::deque<TRI_request_statistics_t*> _writeBuffersStats; std::deque<TRI_request_statistics_t*> _writeBuffersStats;
protected:
std::unique_ptr<Socket> _peer; std::unique_ptr<Socket> _peer;
boost::posix_time::milliseconds _keepAliveTimeout; boost::posix_time::milliseconds _keepAliveTimeout;
boost::asio::deadline_timer _keepAliveTimer; boost::asio::deadline_timer _keepAliveTimer;

View File

@ -40,7 +40,7 @@ using namespace arangodb::basics;
#ifdef TRI_SHOW_LOCK_TIME #ifdef TRI_SHOW_LOCK_TIME
MutexLocker::MutexLocker(Mutex* mutex, char const* file, int line) MutexLocker::MutexLocker(Mutex* mutex, char const* file, int line)
: _mutex(mutex), _file(file), _line(line), _time(0.0) { : _mutex(mutex), isLocked(true), _isLocked(true), _file(file), _line(line), _time(0.0) {
double t = TRI_microtime(); double t = TRI_microtime();
_mutex->lock(); _mutex->lock();
_time = TRI_microtime() - t; _time = TRI_microtime() - t;
@ -48,7 +48,9 @@ MutexLocker::MutexLocker(Mutex* mutex, char const* file, int line)
#else #else
MutexLocker::MutexLocker(Mutex* mutex) : _mutex(mutex) { _mutex->lock(); } MutexLocker::MutexLocker(Mutex* mutex) : _mutex(mutex), _isLocked(true) {
_mutex->lock();
}
#endif #endif
@ -57,7 +59,9 @@ MutexLocker::MutexLocker(Mutex* mutex) : _mutex(mutex) { _mutex->lock(); }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
MutexLocker::~MutexLocker() { MutexLocker::~MutexLocker() {
_mutex->unlock(); if (_isLocked) {
_mutex->unlock();
}
#ifdef TRI_SHOW_LOCK_TIME #ifdef TRI_SHOW_LOCK_TIME
if (_time > TRI_SHOW_LOCK_THRESHOLD) { if (_time > TRI_SHOW_LOCK_THRESHOLD) {
@ -65,3 +69,10 @@ MutexLocker::~MutexLocker() {
} }
#endif #endif
} }
void MutexLocker::unlock() {
if (_isLocked) {
_mutex->unlock();
_isLocked = false;
}
}

View File

@ -79,11 +79,16 @@ class MutexLocker {
#endif #endif
~MutexLocker();
bool isLocked() const { return _isLocked; }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief releases the lock /// @brief releases the lock
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
~MutexLocker(); void unlock();
private: private:
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -91,6 +96,12 @@ class MutexLocker {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
Mutex* _mutex; Mutex* _mutex;
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the mutex is locked
//////////////////////////////////////////////////////////////////////////////
bool _isLocked;
#ifdef TRI_SHOW_LOCK_TIME #ifdef TRI_SHOW_LOCK_TIME