From a1a9334b67a9c38798b3a5753e28be90f1cf423c Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 27 Jun 2017 00:00:34 +0200 Subject: [PATCH] unstrand SSL (#2667) --- CHANGELOG | 5 +- arangod/Scheduler/Socket.h | 2 + arangod/Scheduler/SocketTask.cpp | 154 +++++++++++++++---------------- arangod/Scheduler/SocketTask.h | 5 +- 4 files changed, 81 insertions(+), 85 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 415b5d7c46..d72fc6045e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -7,6 +7,8 @@ devel traversal. `LIMIT x` or `LIMIT 0, x` were not affected, but `LIMIT s, x` may have returned too many results +* fix races in SSL communication code + * fix invalid locking in JWT authentication cache, which could have crashed the server @@ -20,9 +22,6 @@ devel * fixed issue #2613: Reduce log level when Foxx manager tries to self heal missing database -* removed `exception` field from transaction error result; users should throw - explicit `Error` instances to return custom exceptions (addresses issue #2561) - v3.2.beta1 (2017-06-12) ----------------------- diff --git a/arangod/Scheduler/Socket.h b/arangod/Scheduler/Socket.h index 584a9f895a..7fb8813e8f 100644 --- a/arangod/Scheduler/Socket.h +++ b/arangod/Scheduler/Socket.h @@ -135,6 +135,8 @@ class Socket { virtual void setNonBlocking(bool) = 0; virtual std::string peerAddress() = 0; virtual int peerPort() = 0; + + bool isEncrypted() const { return _encrypted; } bool handshake(); virtual size_t write(basics::StringBuffer* buffer, boost::system::error_code& ec) = 0; diff --git a/arangod/Scheduler/SocketTask.cpp b/arangod/Scheduler/SocketTask.cpp index 9d03412048..8a6e3240ff 100644 --- a/arangod/Scheduler/SocketTask.cpp +++ b/arangod/Scheduler/SocketTask.cpp @@ -36,6 +36,8 @@ #include "Statistics/ConnectionStatistics.h" #include "Statistics/StatisticsFeature.h" +#include + using namespace arangodb::basics; using namespace arangodb::rest; @@ -87,9 +89,7 @@ SocketTask::~SocketTask() { LOG_TOPIC(ERR, Logger::COMMUNICATION) << "unable to cancel _keepAliveTimer"; } - if (_peer) { - _peer->close(err); - } + _peer->close(err); } // ----------------------------------------------------------------------------- @@ -162,40 +162,42 @@ void SocketTask::writeWriteBuffer() { size_t total = _writeBuffer._buffer->length(); size_t written = 0; - boost::system::error_code err; - err.clear(); + if (!_peer->isEncrypted()) { + boost::system::error_code err; + err.clear(); - while (true) { - RequestStatistics::SET_WRITE_START(_writeBuffer._statistics); - written = _peer->write(_writeBuffer._buffer, err); + while (true) { + RequestStatistics::SET_WRITE_START(_writeBuffer._statistics); + written = _peer->write(_writeBuffer._buffer, err); - if (err) { - break; + if (err) { + break; + } + + RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written); + + if (written != total) { + // unable to write everything at once, might be a lot of data + // above code does not update the buffer positon + break; + } + + if (!completedWriteBuffer()) { + return; + } + + // try to send next buffer + total = _writeBuffer._buffer->length(); + written = 0; } - RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written); - - if (written != total) { - // unable to write everything at once, might be a lot of data - // above code does not update the buffer positon - break; - } - - if (!completedWriteBuffer()) { + // write could have blocked which is the only acceptable error + if (err && err != ::boost::asio::error::would_block) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: " + << err.message(); + closeStreamNoLock(); return; } - - // try to send next buffer - total = _writeBuffer._buffer->length(); - written = 0; - } - - // write could have blocked which is the only acceptable error - if (err && err != ::boost::asio::error::would_block) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: " - << err.message(); - closeStreamNoLock(); - return; } // so the code could have blocked at this point or not all data @@ -337,11 +339,6 @@ bool SocketTask::trySyncRead() { return false; } - if (!_peer) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "peer disappeared"; - return false; - } - if (0 == _peer->available(err)) { return false; } @@ -352,10 +349,7 @@ bool SocketTask::trySyncRead() { return false; } - size_t bytesRead = 0; - - bytesRead = - _peer->read(boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE), err); + size_t bytesRead = _peer->read(boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE), err); if (0 == bytesRead) { return false; // should not happen @@ -402,50 +396,50 @@ bool SocketTask::processAll() { // will acquire the _readLock void SocketTask::asyncReadSome() { MUTEX_LOCKER(locker, _readLock); - - try { - if (_abandoned) { - return; - } - - size_t const MAX_DIRECT_TRIES = 2; - size_t n = 0; - - while (++n <= MAX_DIRECT_TRIES) { - if (!reserveMemory()) { - LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory"; - return; - } - - if (!trySyncRead()) { - if (n < MAX_DIRECT_TRIES) { -#ifdef TRI_HAVE_SCHED_H - sched_yield(); -#endif - } - - continue; - } - - // ignore the result of processAll, try to read more bytes down below - processAll(); - compactify(); - } - } catch (boost::system::system_error& err) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "i/o stream failed with: " - << err.what(); - - closeStream(); - return; - } catch (...) { - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream"; - - closeStream(); + + if (_abandoned) { return; } + if (!_peer->isEncrypted()) { + try { + size_t const MAX_DIRECT_TRIES = 2; + size_t n = 0; + + while (++n <= MAX_DIRECT_TRIES) { + if (!reserveMemory()) { + LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory"; + return; + } + + if (!trySyncRead()) { + if (n < MAX_DIRECT_TRIES) { + std::this_thread::yield(); + } + + continue; + } + + // ignore the result of processAll, try to read more bytes down below + processAll(); + compactify(); + } + } catch (boost::system::system_error& err) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "i/o stream failed with: " + << err.what(); + + closeStream(); + return; + } catch (...) { + LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream"; + + closeStream(); + return; + } + } + // try to read more bytes - if (!_abandoned && _peer) { + if (!_abandoned) { if (!reserveMemory()) { LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory"; return; diff --git a/arangod/Scheduler/SocketTask.h b/arangod/Scheduler/SocketTask.h index f2fd168fa8..ae2a59a9dd 100644 --- a/arangod/Scheduler/SocketTask.h +++ b/arangod/Scheduler/SocketTask.h @@ -139,10 +139,11 @@ class SocketTask : virtual public Task { protected: ConnectionStatistics* _connectionStatistics; ConnectionInfo _connectionInfo; - - Mutex _readLock; basics::StringBuffer _readBuffer; // needs _readLock + private: + Mutex _readLock; + private: // caller must hold the _writeLock void closeStreamNoLock();