mirror of https://gitee.com/bigwinds/arangodb
fixed some TLS errors when combining protocols SSL/TLS and VST (#6551)
This commit is contained in:
parent
fbd49fe51a
commit
b25ffd9dae
10
CHANGELOG
10
CHANGELOG
|
@ -1,3 +1,13 @@
|
|||
v3.3.17 (XXXX-XX-XX)
|
||||
--------------------
|
||||
|
||||
* fix some TLS errors that occurred when combining HTTPS/TLS transport with the
|
||||
VelocyStream protocol (VST)
|
||||
|
||||
That combination could have led to spurious errors such as "TLS padding error"
|
||||
or "Tag mismatch" and connections being closed.
|
||||
|
||||
|
||||
v3.3.16 (2018-09-19)
|
||||
--------------------
|
||||
|
||||
|
|
|
@ -70,6 +70,13 @@ HttpCommTask::HttpCommTask(EventLoop loop, GeneralServer* server,
|
|||
ConnectionStatistics::SET_HTTP(_connectionStatistics);
|
||||
}
|
||||
|
||||
// whether or not this task can mix sync and async I/O
|
||||
bool HttpCommTask::canUseMixedIO() const {
|
||||
// in case SSL is used, we cannot use a combination of sync and async I/O
|
||||
// because that will make TLS fall apart
|
||||
return !_peer->isEncrypted();
|
||||
}
|
||||
|
||||
/// @brief send error response including response body
|
||||
void HttpCommTask::addSimpleResponse(rest::ResponseCode code, rest::ContentType respType,
|
||||
uint64_t /*messageId*/, velocypack::Buffer<uint8_t>&& buffer) {
|
||||
|
|
|
@ -23,7 +23,10 @@ class HttpCommTask final : public GeneralCommTask {
|
|||
arangodb::Endpoint::TransportType transportType() override {
|
||||
return arangodb::Endpoint::TransportType::HTTP;
|
||||
}
|
||||
|
||||
|
||||
// whether or not this task can mix sync and async I/O
|
||||
bool canUseMixedIO() const override;
|
||||
|
||||
private:
|
||||
bool processRead(double startTime) override;
|
||||
void compactify() override;
|
||||
|
|
|
@ -99,6 +99,13 @@ VstCommTask::VstCommTask(EventLoop loop, GeneralServer* server,
|
|||
ServerFeature>("Server")
|
||||
->vstMaxSize();
|
||||
}
|
||||
|
||||
// whether or not this task can mix sync and async I/O
|
||||
bool VstCommTask::canUseMixedIO() const {
|
||||
// in case SSL is used, we cannot use a combination of sync and async I/O
|
||||
// because that will make TLS fall apart
|
||||
return !_peer->isEncrypted();
|
||||
}
|
||||
|
||||
/// @brief send simple response including response body
|
||||
void VstCommTask::addSimpleResponse(rest::ResponseCode code, rest::ContentType respType,
|
||||
|
|
|
@ -46,6 +46,9 @@ class VstCommTask final : public GeneralCommTask {
|
|||
return arangodb::Endpoint::TransportType::VST;
|
||||
}
|
||||
|
||||
// whether or not this task can mix sync and async I/O
|
||||
bool canUseMixedIO() const override;
|
||||
|
||||
protected:
|
||||
// read data check if chunk and message are complete
|
||||
// if message is complete execute a request
|
||||
|
|
|
@ -120,7 +120,7 @@ bool SocketTask::start() {
|
|||
|
||||
if (_closeRequested.load(std::memory_order_acquire)) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "cannot start, close alread in progress";
|
||||
<< "cannot start, close already in progress";
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -192,6 +192,7 @@ void SocketTask::closeStream() {
|
|||
if (_abandoned.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// strand::dispatch may execute this immediately if this
|
||||
// is called on a thread inside the same strand
|
||||
auto self = shared_from_this();
|
||||
|
@ -291,6 +292,7 @@ bool SocketTask::trySyncRead() {
|
|||
|
||||
asio_ns::error_code err;
|
||||
TRI_ASSERT(_peer != nullptr);
|
||||
|
||||
if (0 == _peer->available(err)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -315,17 +317,15 @@ bool SocketTask::trySyncRead() {
|
|||
|
||||
_readBuffer.increaseLength(bytesRead);
|
||||
|
||||
if (err) {
|
||||
if (err == asio_ns::error::would_block) {
|
||||
return false;
|
||||
} else {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "trySyncRead failed with: "
|
||||
<< err.message();
|
||||
return false;
|
||||
}
|
||||
if (!err) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
if (err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "trySyncRead failed with: " << err.message();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// must run in strand
|
||||
|
@ -379,39 +379,44 @@ void SocketTask::asyncReadSome() {
|
|||
TRI_ASSERT(_peer != nullptr);
|
||||
TRI_ASSERT(_peer->strand.running_in_this_thread());
|
||||
|
||||
try {
|
||||
size_t const MAX_DIRECT_TRIES = 2;
|
||||
size_t n = 0;
|
||||
if (this->canUseMixedIO()) {
|
||||
// try some direct reads only for non-SSL mode
|
||||
// in SSL mode it will fall apart when mixing direct reads and async
|
||||
// reads later
|
||||
try {
|
||||
size_t const MAX_DIRECT_TRIES = 2;
|
||||
size_t n = 0;
|
||||
|
||||
while (++n <= MAX_DIRECT_TRIES &&
|
||||
!_abandoned.load(std::memory_order_acquire)) {
|
||||
if (!trySyncRead()) {
|
||||
if (n < MAX_DIRECT_TRIES) {
|
||||
std::this_thread::yield();
|
||||
while (++n <= MAX_DIRECT_TRIES &&
|
||||
!_abandoned.load(std::memory_order_acquire)) {
|
||||
if (!trySyncRead()) {
|
||||
if (n < MAX_DIRECT_TRIES) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_abandoned.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
if (_abandoned.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// ignore the result of processAll, try to read more bytes down below
|
||||
processAll();
|
||||
compactify();
|
||||
// ignore the result of processAll, try to read more bytes down below
|
||||
processAll();
|
||||
compactify();
|
||||
}
|
||||
} catch (asio_ns::system_error const& err) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync read failed with: "
|
||||
<< err.what();
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
} catch (...) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
|
||||
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
}
|
||||
} catch (asio_ns::system_error const& err) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync read failed with: "
|
||||
<< err.what();
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
} catch (...) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
|
||||
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// try to read more bytes
|
||||
if (_abandoned.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
|
@ -461,54 +466,72 @@ void SocketTask::asyncWriteSome() {
|
|||
if (_writeBuffer.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
TRI_ASSERT(_writeBuffer._buffer != nullptr);
|
||||
size_t total = _writeBuffer._buffer->length();
|
||||
size_t written = 0;
|
||||
|
||||
TRI_ASSERT(!_abandoned);
|
||||
TRI_ASSERT(_peer != nullptr);
|
||||
|
||||
asio_ns::error_code err;
|
||||
err.clear();
|
||||
while (true) {
|
||||
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
|
||||
written = _peer->writeSome(_writeBuffer._buffer, err);
|
||||
|
||||
if (this->canUseMixedIO()) {
|
||||
// try some direct writes only for non-SSL mode
|
||||
// in SSL mode it will fall apart when mixing direct writes and async
|
||||
// writes later
|
||||
while (true) {
|
||||
TRI_ASSERT(_writeBuffer._buffer != nullptr);
|
||||
|
||||
if (err) {
|
||||
break;
|
||||
// we can directly skip sending empty buffers
|
||||
if (_writeBuffer._buffer->length() > 0) {
|
||||
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
|
||||
written = _peer->writeSome(_writeBuffer._buffer, err);
|
||||
|
||||
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
|
||||
|
||||
if (err || written != total) {
|
||||
// unable to write everything at once, might be a lot of data
|
||||
// above code does not update the buffer positon
|
||||
break;
|
||||
}
|
||||
|
||||
TRI_ASSERT(written > 0);
|
||||
}
|
||||
|
||||
if (!completedWriteBuffer()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// try to send next buffer
|
||||
TRI_ASSERT(_writeBuffer._buffer != nullptr);
|
||||
total = _writeBuffer._buffer->length();
|
||||
}
|
||||
|
||||
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
|
||||
|
||||
if (written != total) {
|
||||
// unable to write everything at once, might be a lot of data
|
||||
// above code does not update the buffer positon
|
||||
break;
|
||||
}
|
||||
|
||||
if (!completedWriteBuffer()) {
|
||||
// write could have blocked which is the only acceptable error
|
||||
if (err && err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync write on stream failed with: "
|
||||
<< err.message();
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
}
|
||||
} // !_peer->isEncrypted
|
||||
|
||||
// try to send next buffer
|
||||
total = _writeBuffer._buffer->length();
|
||||
written = 0;
|
||||
}
|
||||
|
||||
// write could have blocked which is the only acceptable error
|
||||
if (err && err != ::asio_ns::error::would_block) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync write on failed with: "
|
||||
<< err.message();
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
}
|
||||
// we will be getting here in the following cases
|
||||
// - encrypted mode (SSL)
|
||||
// - we send only parts of the write buffer, but have more to send
|
||||
// - we got the error would_block/try_again when sending data
|
||||
// in this case we dispatch an async write
|
||||
|
||||
if (_abandoned.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
TRI_ASSERT(_writeBuffer._buffer != nullptr);
|
||||
|
||||
// so the code could have blocked at this point or not all data
|
||||
// was written in one go, begin writing at offset (written)
|
||||
auto self = shared_from_this();
|
||||
|
||||
_peer->asyncWrite(
|
||||
asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written),
|
||||
[self, this](const asio_ns::error_code& ec, std::size_t transferred) {
|
||||
|
@ -517,7 +540,8 @@ void SocketTask::asyncWriteSome() {
|
|||
|
||||
if (_abandoned.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
} else if (ec) {
|
||||
}
|
||||
if (ec) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on failed with: "
|
||||
<< ec.message();
|
||||
closeStream();
|
||||
|
|
|
@ -55,6 +55,9 @@ class SocketTask : virtual public Task {
|
|||
|
||||
public:
|
||||
bool start();
|
||||
|
||||
// whether or not this task can mix sync and async I/O
|
||||
virtual bool canUseMixedIO() const = 0;
|
||||
|
||||
protected:
|
||||
// caller will hold the _lock
|
||||
|
@ -144,11 +147,10 @@ class SocketTask : virtual public Task {
|
|||
// method returns true. Used for VST upgrade
|
||||
bool abandon() { return !(_abandoned.exchange(true)); }
|
||||
|
||||
/// lease a string buffer from pool
|
||||
// lease a string buffer from pool
|
||||
basics::StringBuffer* leaseStringBuffer(size_t length);
|
||||
void returnStringBuffer(basics::StringBuffer*);
|
||||
|
||||
protected:
|
||||
bool processAll();
|
||||
void triggerProcessAll();
|
||||
|
||||
|
|
Loading…
Reference in New Issue