1
0
Fork 0

Merge branch 'obi-async' into devel

* obi-async:
  clean up code
  try to fix a bug in server that surfaced on heavy load
This commit is contained in:
Jan Christoph Uhde 2017-01-12 10:31:32 +01:00
commit 5921a172fc
2 changed files with 90 additions and 85 deletions

View File

@ -80,7 +80,7 @@ SocketTask::~SocketTask() {
if (err) {
LOG_TOPIC(ERR, Logger::COMMUNICATION) << "unable to cancel _keepAliveTimer";
}
}
}
void SocketTask::start() {
@ -143,123 +143,128 @@ void SocketTask::addWriteBuffer(StringBuffer* buffer,
MUTEX_LOCKER(locker, _writeLock);
if (_writeBuffer != nullptr) {
_writeBuffers.push_back(buffer);
_writeBuffersStats.push_back(stat);
return;
//buffer and stats will be NULL when called form async handler
if (buffer) {
if (_writeBuffer != nullptr) {
_writeBuffers.push_back(buffer);
_writeBuffersStats.push_back(stat);
return;
}
_writeBuffer = buffer;
_writeBufferStatistics = stat;
}
_writeBuffer = buffer;
_writeBufferStatistics = stat; // threadsafe? why not pass to
// completedWriteBuffer does this work with
// async?
if(_writeBufferStatistics){
_writeBufferStatistics->_writeStart = TRI_StatisticsTime();
}
if (_writeBuffer != nullptr) {
size_t total = _writeBuffer->length();
size_t written = 0;
boost::system::error_code err;
do {
err.assign(boost::system::errc::success,
boost::system::generic_category());
err.clear();
while (true){
written = _peer->write(_writeBuffer, err);
if(_writeBufferStatistics){
_writeBufferStatistics->_sentBytes += written;
}
if (written == total) {
locker.unlock();
completedWriteBuffer();
if (err || written != total) {
break; //unable to write everything at once
//might be a lot of data
//above code does not update the buffer positon
}
if(! completedWriteBuffer()){
return;
}
} while (err == boost::asio::error::would_block);
if (err != boost::system::errc::success) {
total = _writeBuffer->length();
written = 0;
if(_writeBufferStatistics){
_writeBufferStatistics->_writeStart = TRI_StatisticsTime();
}
}
// write could have blocked which is the only acceptable error
if (err && err != ::boost::asio::error::would_block) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "SocketTask::addWriteBuffer (write_some) - write on stream "
<< " failed with: " << err.message();
closeStream();
return;
}
boost::system::error_code ec;
ec.assign(boost::system::errc::success,
boost::system::generic_category());
// so the code could have blocked at this point or not all data
// was written in one go
auto self = shared_from_this();
auto handler = [self, this](const boost::system::error_code& ec,
std::size_t transferred) {
MUTEX_LOCKER(locker, _writeLock);
//begin writing at offset (written)
_peer->asyncWrite(boost::asio::buffer(_writeBuffer->begin() + written, total - written)
,[self, this](const boost::system::error_code& ec
,std::size_t transferred)
{
MUTEX_LOCKER(locker, _writeLock);
if (_writeBufferStatistics){
_writeBufferStatistics->_sentBytes += transferred;
}
if (_writeBufferStatistics){
_writeBufferStatistics->_sentBytes += transferred;
}
if (ec) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "SocketTask::addWriterBuffer(async_write) - write on stream "
<< " failed with: " << ec.message();
closeStream();
} else {
locker.unlock();
completedWriteBuffer();
}
};
_peer->asyncWrite(
boost::asio::buffer(_writeBuffer->begin() + written, total - written),
handler);
if (ec) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "SocketTask::addWriterBuffer(async_write) - write on stream "
<< " failed with: " << ec.message();
closeStream();
} else {
if (completedWriteBuffer()){
//completedWriteBuffer already advanced _wirteBuffer and _wirteBufferStatistics
locker.unlock(); // avoid recursive locking
addWriteBuffer(nullptr, nullptr);
}
}
}
);
}
}
void SocketTask::completedWriteBuffer() {
StringBuffer* buffer = nullptr;
TRI_request_statistics_t* statistics = nullptr;
{
MUTEX_LOCKER(locker, _writeLock);
bool SocketTask::completedWriteBuffer() {
delete _writeBuffer;
_writeBuffer = nullptr;
delete _writeBuffer;
_writeBuffer = nullptr;
if (_writeBufferStatistics != nullptr) {
_writeBufferStatistics->_writeEnd = TRI_StatisticsTime();
if (_writeBufferStatistics != nullptr) {
_writeBufferStatistics->_writeEnd = TRI_StatisticsTime();
#ifdef DEBUG_STATISTICS
LOG_TOPIC(TRACE, Logger::REQUESTS)
<< "SocketTask::addWriteBuffer - Statistics release: "
<< _writeBufferStatistics->to_string();
LOG_TOPIC(TRACE, Logger::REQUESTS)
<< "SocketTask::addWriteBuffer - Statistics release: "
<< _writeBufferStatistics->to_string();
#endif
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
_writeBufferStatistics = nullptr;
} else {
TRI_ReleaseRequestStatistics(_writeBufferStatistics);
_writeBufferStatistics = nullptr;
} else {
#ifdef DEBUG_STATISTICS
LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - "
"Statistics release: nullptr - "
"nothing to release";
LOG_TOPIC(TRACE, Logger::REQUESTS) << "SocketTask::addWriteBuffer - "
"Statistics release: nullptr - "
"nothing to release";
#endif
}
if (_writeBuffers.empty()) {
if (_closeRequested) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::"
"completedWriteBuffer - close "
"requested, closing stream";
closeStream();
}
return;
}
buffer = _writeBuffers.front();
_writeBuffers.pop_front();
statistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
}
addWriteBuffer(buffer, statistics);
if (_writeBuffers.empty()) {
if (_closeRequested) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "SocketTask::"
"completedWriteBuffer - close "
"requested, closing stream";
closeStream();
}
return false;
}
_writeBuffer = _writeBuffers.front();
_writeBuffers.pop_front();
_writeBufferStatistics = _writeBuffersStats.front();
_writeBuffersStats.pop_front();
return true;
}
void SocketTask::closeStream() {

View File

@ -82,7 +82,6 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent {
void addWriteBuffer(basics::StringBuffer*, TRI_request_statistics_t*);
void completedWriteBuffer();
void closeStream();
@ -95,6 +94,7 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent {
basics::StringBuffer _readBuffer;
private:
bool completedWriteBuffer(); //returns next buffer to write or none
Mutex _writeLock;
basics::StringBuffer* _writeBuffer = nullptr;
TRI_request_statistics_t* _writeBufferStatistics = nullptr;