1
0
Fork 0

try to fix a bug in server that surfaced on heavy load

This commit is contained in:
Jan Christoph Uhde 2017-01-11 15:27:53 +01:00
parent 62eda019c2
commit 4858469bfa
2 changed files with 96 additions and 85 deletions

View File

@ -143,6 +143,8 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
MUTEX_LOCKER(locker, _writeLock); MUTEX_LOCKER(locker, _writeLock);
//buffer and stats will be NULL when called form async handler
if (buffer) {
if (_writeBuffer != nullptr) { if (_writeBuffer != nullptr) {
_writeBuffers.push_back(buffer); _writeBuffers.push_back(buffer);
_writeBuffersStats.push_back(stat); _writeBuffersStats.push_back(stat);
@ -150,34 +152,45 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
} }
_writeBuffer = buffer; _writeBuffer = buffer;
_writeBufferStatistics = stat; // threadsafe? why not pass to _writeBufferStatistics = stat;
// completedWriteBuffer does this work with
// async?
if(_writeBufferStatistics){
_writeBufferStatistics->_writeStart = TRI_StatisticsTime();
} }
if (_writeBuffer != nullptr) { if (_writeBuffer != nullptr) {
size_t total = _writeBuffer->length(); size_t total = _writeBuffer->length();
size_t written = 0; size_t written = 0;
boost::system::error_code err; boost::system::error_code err;
do { err.clear();
err.assign(boost::system::errc::success,
boost::system::generic_category()); bool moreWork = true;
while (!err && moreWork){
written = _peer->write(_writeBuffer, err); written = _peer->write(_writeBuffer, err);
if(_writeBufferStatistics){ if(_writeBufferStatistics){
_writeBufferStatistics->_sentBytes += written; _writeBufferStatistics->_sentBytes += written;
} }
if (written == total) { if (written == total) {
locker.unlock(); moreWork = completedWriteBuffer();
completedWriteBuffer(); if(_writeBuffer){
total = _writeBuffer->length();
written = 0;
if(_writeBufferStatistics){
_writeBufferStatistics->_writeStart = TRI_StatisticsTime();
}
}
} else {
break; //unable to write everything at once
//might be a lot of data
//above code does not update the buffer positon
}
}
if(!moreWork || _writeBuffer == nullptr){
return; return;
} }
} while (err == boost::asio::error::would_block);
if (err != boost::system::errc::success) { // write could have blocked which is the only acceptable error
if (err && err != ::boost::asio::error::would_block) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "SocketTask::addWriteBuffer (write_some) - write on stream " << "SocketTask::addWriteBuffer (write_some) - write on stream "
<< " failed with: " << err.message(); << " failed with: " << err.message();
@ -185,13 +198,17 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
return; return;
} }
boost::system::error_code ec; // so the code could have blocked at this point or not all data
ec.assign(boost::system::errc::success, // was written in one go
boost::system::generic_category());
err.clear(); //reset error state
auto self = shared_from_this(); auto self = shared_from_this();
auto handler = [self, this](const boost::system::error_code& ec, //begin writing at offset (written)
std::size_t transferred) { _peer->asyncWrite(boost::asio::buffer(_writeBuffer->begin() + written, total - written)
,[self, this](const boost::system::error_code& ec
,std::size_t transferred)
{
MUTEX_LOCKER(locker, _writeLock); MUTEX_LOCKER(locker, _writeLock);
if (_writeBufferStatistics){ if (_writeBufferStatistics){
@ -204,23 +221,18 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
<< " failed with: " << ec.message(); << " failed with: " << ec.message();
closeStream(); closeStream();
} else { } else {
locker.unlock(); if (completedWriteBuffer()){
completedWriteBuffer(); //completedWriteBuffer already advanced _wirteBuffer and _wirteBufferStatistics
locker.unlock(); // avoid recursive locking
addWriteBuffer(nullptr, nullptr);
} }
}; }
}
_peer->asyncWrite( );
boost::asio::buffer(_writeBuffer->begin() + written, total - written),
handler);
} }
} }
void SocketTask::completedWriteBuffer() { bool SocketTask::completedWriteBuffer() {
StringBuffer* buffer = nullptr;
TRI_request_statistics_t* statistics = nullptr;
{
MUTEX_LOCKER(locker, _writeLock);
delete _writeBuffer; delete _writeBuffer;
_writeBuffer = nullptr; _writeBuffer = nullptr;
@ -250,16 +262,15 @@ void SocketTask::completedWriteBuffer() {
closeStream(); closeStream();
} }
return; return false;
} }
buffer = _writeBuffers.front(); _writeBuffer = _writeBuffers.front();
_writeBuffers.pop_front(); _writeBuffers.pop_front();
statistics = _writeBuffersStats.front(); _writeBufferStatistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front(); _writeBuffersStats.pop_front();
} return true;
addWriteBuffer(buffer, statistics);
} }
void SocketTask::closeStream() { void SocketTask::closeStream() {

View File

@ -82,7 +82,6 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent {
void addWriteBuffer(basics::StringBuffer*, TRI_request_statistics_t*); void addWriteBuffer(basics::StringBuffer*, TRI_request_statistics_t*);
void completedWriteBuffer();
void closeStream(); void closeStream();
@ -95,6 +94,7 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent {
basics::StringBuffer _readBuffer; basics::StringBuffer _readBuffer;
private: private:
bool completedWriteBuffer(); //returns next buffer to write or none
Mutex _writeLock; Mutex _writeLock;
basics::StringBuffer* _writeBuffer = nullptr; basics::StringBuffer* _writeBuffer = nullptr;
TRI_request_statistics_t* _writeBufferStatistics = nullptr; TRI_request_statistics_t* _writeBufferStatistics = nullptr;