1
0
Fork 0

Bug fix/fix ssl vst (#6547)

This commit is contained in:
Jan 2018-09-19 23:21:46 +02:00 committed by GitHub
parent 203b141a6f
commit 8b26c9db3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 117 additions and 77 deletions

View File

@ -23,7 +23,12 @@ class HttpCommTask final : public GeneralCommTask {
arangodb::Endpoint::TransportType transportType() override { arangodb::Endpoint::TransportType transportType() override {
return arangodb::Endpoint::TransportType::HTTP; return arangodb::Endpoint::TransportType::HTTP;
} }
// whether or not this task can mix sync and async I/O
// this is always true for the HTTPCommTask, because we are not
// multiplexing I/O
bool canUseMixedIO() const override { return true; }
private: private:
bool processRead(double startTime) override; bool processRead(double startTime) override;
void compactify() override; void compactify() override;

View File

@ -95,6 +95,13 @@ VstCommTask::VstCommTask(Scheduler* scheduler, GeneralServer* server,
ServerFeature>("Server") ServerFeature>("Server")
->vstMaxSize(); ->vstMaxSize();
} }
// whether or not this task can mix sync and async I/O
bool VstCommTask::canUseMixedIO() const {
// in case SSL is used, we cannot use a combination of sync and async I/O
// because that will make TLS fall apart
return !_peer->isEncrypted();
}
/// @brief send simple response including response body /// @brief send simple response including response body
void VstCommTask::addSimpleResponse(rest::ResponseCode code, rest::ContentType respType, void VstCommTask::addSimpleResponse(rest::ResponseCode code, rest::ContentType respType,

View File

@ -46,6 +46,9 @@ class VstCommTask final : public GeneralCommTask {
return arangodb::Endpoint::TransportType::VST; return arangodb::Endpoint::TransportType::VST;
} }
// whether or not this task can mix sync and async I/O
bool canUseMixedIO() const override;
protected: protected:
// read data check if chunk and message are complete // read data check if chunk and message are complete
// if message is complete execute a request // if message is complete execute a request

View File

@ -119,7 +119,7 @@ bool SocketTask::start() {
if (_closeRequested.load(std::memory_order_acquire)) { if (_closeRequested.load(std::memory_order_acquire)) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
<< "cannot start, close alread in progress"; << "cannot start, close already in progress";
return false; return false;
} }
@ -191,6 +191,7 @@ void SocketTask::closeStream() {
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} }
// strand::dispatch may execute this immediately if this // strand::dispatch may execute this immediately if this
// is called on a thread inside the same strand // is called on a thread inside the same strand
auto self = shared_from_this(); auto self = shared_from_this();
@ -204,7 +205,7 @@ void SocketTask::closeStream() {
void SocketTask::closeStreamNoLock() { void SocketTask::closeStreamNoLock() {
TRI_ASSERT(_peer != nullptr); TRI_ASSERT(_peer != nullptr);
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
bool mustCloseSend = !_closedSend.load(std::memory_order_acquire); bool mustCloseSend = !_closedSend.load(std::memory_order_acquire);
bool mustCloseReceive = !_closedReceive.load(std::memory_order_acquire); bool mustCloseReceive = !_closedReceive.load(std::memory_order_acquire);
@ -291,6 +292,7 @@ bool SocketTask::trySyncRead() {
asio_ns::error_code err; asio_ns::error_code err;
TRI_ASSERT(_peer != nullptr); TRI_ASSERT(_peer != nullptr);
if (0 == _peer->available(err)) { if (0 == _peer->available(err)) {
return false; return false;
} }
@ -315,17 +317,15 @@ bool SocketTask::trySyncRead() {
_readBuffer.increaseLength(bytesRead); _readBuffer.increaseLength(bytesRead);
if (err) { if (!err) {
if (err == asio_ns::error::would_block) { return true;
return false;
} else {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "trySyncRead failed with: "
<< err.message();
return false;
}
} }
return true; if (err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "trySyncRead failed with: " << err.message();
}
return false;
} }
// caller must hold the _lock // caller must hold the _lock
@ -379,43 +379,49 @@ void SocketTask::asyncReadSome() {
TRI_ASSERT(_peer != nullptr); TRI_ASSERT(_peer != nullptr);
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
try { if (this->canUseMixedIO()) {
size_t const MAX_DIRECT_TRIES = 2; // try some direct read only for non-SSL mode
size_t n = 0; // in SSL mode it will fall apart when mixing direct reads and async
// reads later
try {
size_t const MAX_DIRECT_TRIES = 2;
size_t n = 0;
while (++n <= MAX_DIRECT_TRIES && while (++n <= MAX_DIRECT_TRIES &&
!_abandoned.load(std::memory_order_acquire)) { !_abandoned.load(std::memory_order_acquire)) {
if (!trySyncRead()) { if (!trySyncRead()) {
if (n < MAX_DIRECT_TRIES) { if (n < MAX_DIRECT_TRIES) {
std::this_thread::yield(); std::this_thread::yield();
}
continue;
} }
continue;
}
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} }
// ignore the result of processAll, try to read more bytes down below // ignore the result of processAll, try to read more bytes down below
processAll(); processAll();
compactify(); compactify();
}
} catch (asio_ns::system_error const& err) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync read failed with: "
<< err.what();
closeStreamNoLock();
return;
} catch (...) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
closeStreamNoLock();
return;
} }
} catch (asio_ns::system_error const& err) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync read failed with: "
<< err.what();
closeStreamNoLock();
return;
} catch (...) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
closeStreamNoLock();
return;
} }
// try to read more bytes // try to read more bytes
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} else if (!reserveMemory()) { }
if (!reserveMemory()) {
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory"; LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
return; return;
} }
@ -460,54 +466,72 @@ void SocketTask::asyncWriteSome() {
if (_writeBuffer.empty()) { if (_writeBuffer.empty()) {
return; return;
} }
TRI_ASSERT(_writeBuffer._buffer != nullptr);
size_t total = _writeBuffer._buffer->length(); size_t total = _writeBuffer._buffer->length();
size_t written = 0; size_t written = 0;
TRI_ASSERT(!_abandoned); TRI_ASSERT(!_abandoned);
TRI_ASSERT(_peer != nullptr);
asio_ns::error_code err; asio_ns::error_code err;
err.clear();
while (true) { if (this->canUseMixedIO()) {
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics); // try some direct writes only for non-SSL mode
written = _peer->writeSome(_writeBuffer._buffer, err); // in SSL mode it will fall apart when mixing direct writes and async
// writes later
while (true) {
TRI_ASSERT(_writeBuffer._buffer != nullptr);
if (err) { // we can directly skip sending empty buffers
break; if (_writeBuffer._buffer->length() > 0) {
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
written = _peer->writeSome(_writeBuffer._buffer, err);
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
if (err || written != total) {
// unable to write everything at once, might be a lot of data
// above code does not update the buffer positon
break;
}
TRI_ASSERT(written > 0);
}
if (!completedWriteBuffer()) {
return;
}
// try to send next buffer
TRI_ASSERT(_writeBuffer._buffer != nullptr);
total = _writeBuffer._buffer->length();
} }
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written); // write could have blocked which is the only acceptable error
if (err && err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
if (written != total) { LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync write on stream failed with: "
// unable to write everything at once, might be a lot of data << err.message();
// above code does not update the buffer positon closeStreamNoLock();
break;
}
if (!completedWriteBuffer()) {
return; return;
} }
} // !_peer->isEncrypted
// try to send next buffer // we will be getting here in the following cases
total = _writeBuffer._buffer->length(); // - encrypted mode (SSL)
written = 0; // - we send only parts of the write buffer, but have more to send
} // - we got the error would_block/try_again when sending data
// in this case we dispatch an async write
// write could have blocked which is the only acceptable error
if (err && err != ::asio_ns::error::would_block) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync write on failed with: "
<< err.message();
closeStreamNoLock();
return;
}
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} }
TRI_ASSERT(_writeBuffer._buffer != nullptr);
// so the code could have blocked at this point or not all data // so the code could have blocked at this point or not all data
// was written in one go, begin writing at offset (written) // was written in one go, begin writing at offset (written)
auto self = shared_from_this(); auto self = shared_from_this();
_peer->asyncWrite( _peer->asyncWrite(
asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written), asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written),
[self, this](const asio_ns::error_code& ec, std::size_t transferred) { [self, this](const asio_ns::error_code& ec, std::size_t transferred) {
@ -516,7 +540,8 @@ void SocketTask::asyncWriteSome() {
if (_abandoned.load(std::memory_order_acquire)) { if (_abandoned.load(std::memory_order_acquire)) {
return; return;
} else if (ec) { }
if (ec) {
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on failed with: " LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on failed with: "
<< ec.message(); << ec.message();
closeStream(); closeStream();
@ -527,11 +552,9 @@ void SocketTask::asyncWriteSome() {
transferred); transferred);
if (completedWriteBuffer()) { if (completedWriteBuffer()) {
_peer->post([self, this] { if (!_abandoned.load(std::memory_order_acquire)) {
if (!_abandoned.load(std::memory_order_acquire)) { asyncWriteSome();
asyncWriteSome(); }
}
});
} }
}); });
} }

View File

@ -55,6 +55,9 @@ class SocketTask : virtual public Task {
public: public:
bool start(); bool start();
// whether or not this task can mix sync and async I/O
virtual bool canUseMixedIO() const = 0;
protected: protected:
// caller will hold the _lock // caller will hold the _lock
@ -144,11 +147,10 @@ class SocketTask : virtual public Task {
// method returns true. Used for VST upgrade // method returns true. Used for VST upgrade
bool abandon() { return !(_abandoned.exchange(true)); } bool abandon() { return !(_abandoned.exchange(true)); }
/// lease a string buffer from pool // lease a string buffer from pool
basics::StringBuffer* leaseStringBuffer(size_t length); basics::StringBuffer* leaseStringBuffer(size_t length);
void returnStringBuffer(basics::StringBuffer*); void returnStringBuffer(basics::StringBuffer*);
protected:
bool processAll(); bool processAll();
void triggerProcessAll(); void triggerProcessAll();