1
0
Fork 0

unstrand SSL (#2667)

This commit is contained in:
Jan 2017-06-27 00:00:34 +02:00 committed by Frank Celler
parent 6db52fe25f
commit a1a9334b67
4 changed files with 81 additions and 85 deletions

View File

@ -7,6 +7,8 @@ devel
traversal. `LIMIT x` or `LIMIT 0, x` were not affected, but `LIMIT s, x`
may have returned too many results
* fix races in SSL communication code
* fix invalid locking in JWT authentication cache, which could have
crashed the server
@ -20,9 +22,6 @@ devel
* fixed issue #2613: Reduce log level when Foxx manager tries to self heal missing database
* removed `exception` field from transaction error result; users should throw
explicit `Error` instances to return custom exceptions (addresses issue #2561)
v3.2.beta1 (2017-06-12)
-----------------------

View File

@ -135,6 +135,8 @@ class Socket {
virtual void setNonBlocking(bool) = 0;
virtual std::string peerAddress() = 0;
virtual int peerPort() = 0;
bool isEncrypted() const { return _encrypted; }
bool handshake();
virtual size_t write(basics::StringBuffer* buffer,
boost::system::error_code& ec) = 0;

View File

@ -36,6 +36,8 @@
#include "Statistics/ConnectionStatistics.h"
#include "Statistics/StatisticsFeature.h"
#include <thread>
using namespace arangodb::basics;
using namespace arangodb::rest;
@ -87,9 +89,7 @@ SocketTask::~SocketTask() {
LOG_TOPIC(ERR, Logger::COMMUNICATION) << "unable to cancel _keepAliveTimer";
}
if (_peer) {
_peer->close(err);
}
_peer->close(err);
}
// -----------------------------------------------------------------------------
@ -162,40 +162,42 @@ void SocketTask::writeWriteBuffer() {
size_t total = _writeBuffer._buffer->length();
size_t written = 0;
boost::system::error_code err;
err.clear();
if (!_peer->isEncrypted()) {
boost::system::error_code err;
err.clear();
while (true) {
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
written = _peer->write(_writeBuffer._buffer, err);
while (true) {
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
written = _peer->write(_writeBuffer._buffer, err);
if (err) {
break;
if (err) {
break;
}
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
if (written != total) {
// unable to write everything at once, might be a lot of data
// above code does not update the buffer positon
break;
}
if (!completedWriteBuffer()) {
return;
}
// try to send next buffer
total = _writeBuffer._buffer->length();
written = 0;
}
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
if (written != total) {
// unable to write everything at once, might be a lot of data
// above code does not update the buffer positon
break;
}
if (!completedWriteBuffer()) {
// write could have blocked which is the only acceptable error
if (err && err != ::boost::asio::error::would_block) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: "
<< err.message();
closeStreamNoLock();
return;
}
// try to send next buffer
total = _writeBuffer._buffer->length();
written = 0;
}
// write could have blocked which is the only acceptable error
if (err && err != ::boost::asio::error::would_block) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: "
<< err.message();
closeStreamNoLock();
return;
}
// so the code could have blocked at this point or not all data
@ -337,11 +339,6 @@ bool SocketTask::trySyncRead() {
return false;
}
if (!_peer) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "peer disappeared";
return false;
}
if (0 == _peer->available(err)) {
return false;
}
@ -352,10 +349,7 @@ bool SocketTask::trySyncRead() {
return false;
}
size_t bytesRead = 0;
bytesRead =
_peer->read(boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE), err);
size_t bytesRead = _peer->read(boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE), err);
if (0 == bytesRead) {
return false; // should not happen
@ -403,49 +397,49 @@ bool SocketTask::processAll() {
void SocketTask::asyncReadSome() {
MUTEX_LOCKER(locker, _readLock);
try {
if (_abandoned) {
return;
}
size_t const MAX_DIRECT_TRIES = 2;
size_t n = 0;
while (++n <= MAX_DIRECT_TRIES) {
if (!reserveMemory()) {
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
return;
}
if (!trySyncRead()) {
if (n < MAX_DIRECT_TRIES) {
#ifdef TRI_HAVE_SCHED_H
sched_yield();
#endif
}
continue;
}
// ignore the result of processAll, try to read more bytes down below
processAll();
compactify();
}
} catch (boost::system::system_error& err) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "i/o stream failed with: "
<< err.what();
closeStream();
return;
} catch (...) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
closeStream();
if (_abandoned) {
return;
}
if (!_peer->isEncrypted()) {
try {
size_t const MAX_DIRECT_TRIES = 2;
size_t n = 0;
while (++n <= MAX_DIRECT_TRIES) {
if (!reserveMemory()) {
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
return;
}
if (!trySyncRead()) {
if (n < MAX_DIRECT_TRIES) {
std::this_thread::yield();
}
continue;
}
// ignore the result of processAll, try to read more bytes down below
processAll();
compactify();
}
} catch (boost::system::system_error& err) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "i/o stream failed with: "
<< err.what();
closeStream();
return;
} catch (...) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
closeStream();
return;
}
}
// try to read more bytes
if (!_abandoned && _peer) {
if (!_abandoned) {
if (!reserveMemory()) {
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
return;

View File

@ -139,10 +139,11 @@ class SocketTask : virtual public Task {
protected:
ConnectionStatistics* _connectionStatistics;
ConnectionInfo _connectionInfo;
Mutex _readLock;
basics::StringBuffer _readBuffer; // needs _readLock
private:
Mutex _readLock;
private:
// caller must hold the _writeLock
void closeStreamNoLock();