mirror of https://gitee.com/bigwinds/arangodb
Revert/ssl fix (#2662)
* Revert "Bug fix/task locking (#2618)" This reverts commit0e0bf7aae3
. * Revert "fix races in SSL communication (#2591)" This reverts commitb32db87b67
.
This commit is contained in:
parent
cfbd8ed93c
commit
91d08645f7
|
@ -7,8 +7,6 @@ devel
|
|||
traversal. `LIMIT x` or `LIMIT 0, x` were not affected, but `LIMIT s, x`
|
||||
may have returned too many results
|
||||
|
||||
* fix races in SSL communication code
|
||||
|
||||
* fix invalid locking in JWT authentication cache, which could have
|
||||
crashed the server
|
||||
|
||||
|
@ -22,6 +20,10 @@ devel
|
|||
|
||||
* fixed issue #2613: Reduce log level when Foxx manager tries to self heal missing database
|
||||
|
||||
* removed `exception` field from transaction error result; users should throw
|
||||
explicit `Error` instances to return custom exceptions (addresses issue #2561)
|
||||
|
||||
|
||||
v3.2.beta1 (2017-06-12)
|
||||
-----------------------
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "GeneralCommTask.h"
|
||||
|
||||
#include "Basics/HybridLogicalClock.h"
|
||||
#include "Basics/Locking.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
|
@ -176,12 +175,10 @@ void GeneralCommTask::executeRequest(
|
|||
}
|
||||
|
||||
void GeneralCommTask::processResponse(GeneralResponse* response) {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
if (response == nullptr) {
|
||||
LOG_TOPIC(WARN, Logger::COMMUNICATION)
|
||||
<< "processResponse received a nullptr, closing connection";
|
||||
closeStreamNoLock();
|
||||
closeStream();
|
||||
} else {
|
||||
addResponse(response, nullptr);
|
||||
}
|
||||
|
@ -251,7 +248,7 @@ bool GeneralCommTask::handleRequest(std::shared_ptr<RestHandler> handler) {
|
|||
}
|
||||
|
||||
if (isDirect) {
|
||||
handleRequestDirectly(basics::ConditionalLocking::DoNotLock, std::move(handler));
|
||||
handleRequestDirectly(std::move(handler));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -259,9 +256,7 @@ bool GeneralCommTask::handleRequest(std::shared_ptr<RestHandler> handler) {
|
|||
|
||||
if (isPrio) {
|
||||
SchedulerFeature::SCHEDULER->post(
|
||||
[self, this, handler]() {
|
||||
handleRequestDirectly(basics::ConditionalLocking::DoLock, std::move(handler));
|
||||
});
|
||||
[self, this, handler]() { handleRequestDirectly(std::move(handler)); });
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -273,7 +268,7 @@ bool GeneralCommTask::handleRequest(std::shared_ptr<RestHandler> handler) {
|
|||
std::unique_ptr<Job> job(
|
||||
new Job(_server, std::move(handler),
|
||||
[self, this](std::shared_ptr<RestHandler> h) {
|
||||
handleRequestDirectly(basics::ConditionalLocking::DoLock, h);
|
||||
handleRequestDirectly(h);
|
||||
}));
|
||||
|
||||
bool ok = SchedulerFeature::SCHEDULER->queue(std::move(job));
|
||||
|
@ -287,18 +282,11 @@ bool GeneralCommTask::handleRequest(std::shared_ptr<RestHandler> handler) {
|
|||
return ok;
|
||||
}
|
||||
|
||||
void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptr<RestHandler> handler) {
|
||||
if (!doLock) {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
}
|
||||
|
||||
void GeneralCommTask::handleRequestDirectly(
|
||||
std::shared_ptr<RestHandler> handler) {
|
||||
auto self = shared_from_this();
|
||||
handler->initEngine(_loop, [self, this, doLock](RestHandler* h) {
|
||||
handler->initEngine(_loop, [self, this](RestHandler* h) {
|
||||
RequestStatistics* stat = h->stealStatistics();
|
||||
|
||||
CONDITIONAL_MUTEX_LOCKER(locker, _lock, doLock);
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
addResponse(h->response(), stat);
|
||||
});
|
||||
|
||||
|
|
|
@ -130,7 +130,7 @@ class GeneralCommTask : public SocketTask {
|
|||
|
||||
private:
|
||||
bool handleRequest(std::shared_ptr<RestHandler>);
|
||||
void handleRequestDirectly(bool doLock, std::shared_ptr<RestHandler>);
|
||||
void handleRequestDirectly(std::shared_ptr<RestHandler>);
|
||||
bool handleRequestAsync(std::shared_ptr<RestHandler>,
|
||||
uint64_t* jobId = nullptr);
|
||||
};
|
||||
|
|
|
@ -105,8 +105,6 @@ void HttpCommTask::handleSimpleError(rest::ResponseCode code, GeneralRequest con
|
|||
|
||||
void HttpCommTask::addResponse(HttpResponse* response,
|
||||
RequestStatistics* stat) {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
resetKeepAlive();
|
||||
|
||||
// response has been queued, allow further requests
|
||||
|
@ -205,7 +203,6 @@ void HttpCommTask::addResponse(HttpResponse* response,
|
|||
}
|
||||
|
||||
// reads data from the socket
|
||||
// caller must hold the _lock
|
||||
bool HttpCommTask::processRead(double startTime) {
|
||||
cancelKeepAlive();
|
||||
|
||||
|
@ -281,21 +278,16 @@ bool HttpCommTask::processRead(double startTime) {
|
|||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "switching from HTTP to VST";
|
||||
ProtocolVersion protocolVersion = _readBuffer.c_str()[6] == '0'
|
||||
? ProtocolVersion::VST_1_0 : ProtocolVersion::VST_1_1;
|
||||
|
||||
if (!abandon()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "task is already abandoned");
|
||||
}
|
||||
|
||||
std::shared_ptr<GeneralCommTask> commTask = std::make_shared<VstCommTask>(
|
||||
_abandoned = true;
|
||||
cancelKeepAlive();
|
||||
std::shared_ptr<GeneralCommTask> commTask;
|
||||
commTask = std::make_shared<VstCommTask>(
|
||||
_loop, _server, std::move(_peer), std::move(_connectionInfo),
|
||||
GeneralServerFeature::keepAliveTimeout(),
|
||||
protocolVersion, /*skipSocketInit*/ true);
|
||||
commTask->addToReadBuffer(_readBuffer.c_str() + 11,
|
||||
_readBuffer.length() - 11);
|
||||
{
|
||||
MUTEX_LOCKER(locker, commTask->_lock);
|
||||
commTask->processRead(startTime);
|
||||
}
|
||||
commTask->processRead(startTime);
|
||||
commTask->start();
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
#ifndef ARANGOD_GENERAL_SERVER_HTTP_COMM_TASK_H
|
||||
#define ARANGOD_GENERAL_SERVER_HTTP_COMM_TASK_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "GeneralServer/GeneralCommTask.h"
|
||||
|
||||
#include "Rest/HttpResponse.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <velocypack/Exception.h>
|
||||
|
||||
#include "Basics/StringUtils.h"
|
||||
#include "GeneralServer/GeneralCommTask.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "Rest/GeneralRequest.h"
|
||||
#include "Statistics/RequestStatistics.h"
|
||||
|
|
|
@ -23,8 +23,13 @@
|
|||
|
||||
#include "VstCommTask.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
#include "Basics/HybridLogicalClock.h"
|
||||
#include "Basics/MutexUnlocker.h"
|
||||
#include "Basics/StringBuffer.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "GeneralServer/AuthenticationFeature.h"
|
||||
|
@ -41,10 +46,6 @@
|
|||
#include "Utils/Events.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::basics;
|
||||
using namespace arangodb::rest;
|
||||
|
@ -103,8 +104,6 @@ VstCommTask::VstCommTask(EventLoop loop, GeneralServer* server,
|
|||
}
|
||||
|
||||
void VstCommTask::addResponse(VstResponse* response, RequestStatistics* stat) {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
VPackMessageNoOwnBuffer response_message = response->prepareForNetwork();
|
||||
uint64_t const id = response_message._id;
|
||||
|
||||
|
@ -285,8 +284,6 @@ void VstCommTask::handleAuthentication(VPackSlice const& header,
|
|||
|
||||
// reads data from the socket
|
||||
bool VstCommTask::processRead(double startTime) {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
auto& prv = _processReadVariables;
|
||||
|
||||
auto chunkBegin = _readBuffer.begin() + prv._readBufferOffset;
|
||||
|
@ -398,9 +395,6 @@ bool VstCommTask::processRead(double startTime) {
|
|||
} else {
|
||||
request->setClientTaskId(_taskId);
|
||||
|
||||
// temporarily release the mutex
|
||||
MUTEX_UNLOCKER(locker, _lock);
|
||||
|
||||
std::unique_ptr<VstResponse> response(new VstResponse(
|
||||
rest::ResponseCode::SERVER_ERROR, chunkHeader._messageID));
|
||||
response->setContentTypeRequested(request->contentTypeResponse());
|
||||
|
|
|
@ -24,21 +24,21 @@
|
|||
#ifndef ARANGOD_GENERAL_SERVER_VST_COMM_TASK_H
|
||||
#define ARANGOD_GENERAL_SERVER_VST_COMM_TASK_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "GeneralServer/GeneralCommTask.h"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
#include "lib/Rest/VstMessage.h"
|
||||
#include "lib/Rest/VstRequest.h"
|
||||
#include "lib/Rest/VstResponse.h"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class AuthenticationFeature;
|
||||
|
||||
namespace rest {
|
||||
|
||||
class VstCommTask final : public GeneralCommTask {
|
||||
class VstCommTask : public GeneralCommTask {
|
||||
public:
|
||||
VstCommTask(EventLoop, GeneralServer*, std::unique_ptr<Socket> socket,
|
||||
ConnectionInfo&&, double timeout, ProtocolVersion protocolVersion,
|
||||
|
@ -54,11 +54,11 @@ class VstCommTask final : public GeneralCommTask {
|
|||
}
|
||||
|
||||
addResponse(vstResponse, stat);
|
||||
}
|
||||
};
|
||||
|
||||
arangodb::Endpoint::TransportType transportType() override {
|
||||
return arangodb::Endpoint::TransportType::VST;
|
||||
}
|
||||
};
|
||||
|
||||
protected:
|
||||
// read data check if chunk and message are complete
|
||||
|
|
|
@ -135,8 +135,6 @@ class Socket {
|
|||
virtual void setNonBlocking(bool) = 0;
|
||||
virtual std::string peerAddress() = 0;
|
||||
virtual int peerPort() = 0;
|
||||
|
||||
bool isEncrypted() const { return _encrypted; }
|
||||
bool handshake();
|
||||
virtual size_t write(basics::StringBuffer* buffer,
|
||||
boost::system::error_code& ec) = 0;
|
||||
|
|
|
@ -36,8 +36,6 @@
|
|||
#include "Statistics/ConnectionStatistics.h"
|
||||
#include "Statistics/StatisticsFeature.h"
|
||||
|
||||
#include <thread>
|
||||
|
||||
using namespace arangodb::basics;
|
||||
using namespace arangodb::rest;
|
||||
|
||||
|
@ -54,7 +52,6 @@ SocketTask::SocketTask(arangodb::EventLoop loop,
|
|||
_connectionInfo(std::move(connectionInfo)),
|
||||
_readBuffer(TRI_UNKNOWN_MEM_ZONE, READ_BLOCK_SIZE + 1, false),
|
||||
_writeBuffer(nullptr, nullptr),
|
||||
_strand(socket->_ioService),
|
||||
_peer(std::move(socket)),
|
||||
_keepAliveTimeout(static_cast<long>(keepAliveTimeout * 1000)),
|
||||
_keepAliveTimer(_peer->_ioService, _keepAliveTimeout),
|
||||
|
@ -80,9 +77,8 @@ SocketTask::~SocketTask() {
|
|||
_connectionStatistics = nullptr;
|
||||
}
|
||||
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
boost::system::error_code err;
|
||||
|
||||
|
||||
if (_keepAliveTimerActive) {
|
||||
_keepAliveTimer.cancel(err);
|
||||
}
|
||||
|
@ -127,11 +123,9 @@ void SocketTask::start() {
|
|||
// --SECTION-- protected methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// caller must hold the _lock
|
||||
// will acquire the _writeLock
|
||||
void SocketTask::addWriteBuffer(WriteBuffer& buffer) {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
if (_closedSend || _abandoned) {
|
||||
if (_closedSend) {
|
||||
buffer.release();
|
||||
return;
|
||||
}
|
||||
|
@ -140,11 +134,13 @@ void SocketTask::addWriteBuffer(WriteBuffer& buffer) {
|
|||
auto self = shared_from_this();
|
||||
|
||||
_loop._scheduler->post([self, this]() {
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
MUTEX_LOCKER(locker, _readLock);
|
||||
processAll();
|
||||
});
|
||||
}
|
||||
|
||||
MUTEX_LOCKER(locker, _writeLock);
|
||||
|
||||
if (!buffer.empty()) {
|
||||
if (!_writeBuffer.empty()) {
|
||||
_writeBuffers.emplace_back(std::move(buffer));
|
||||
|
@ -157,10 +153,8 @@ void SocketTask::addWriteBuffer(WriteBuffer& buffer) {
|
|||
writeWriteBuffer();
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// caller must hold the _writeLock
|
||||
void SocketTask::writeWriteBuffer() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
if (_writeBuffer.empty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -168,42 +162,40 @@ void SocketTask::writeWriteBuffer() {
|
|||
size_t total = _writeBuffer._buffer->length();
|
||||
size_t written = 0;
|
||||
|
||||
if (!_peer->isEncrypted()) {
|
||||
boost::system::error_code err;
|
||||
err.clear();
|
||||
boost::system::error_code err;
|
||||
err.clear();
|
||||
|
||||
while (true) {
|
||||
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
|
||||
written = _peer->write(_writeBuffer._buffer, err);
|
||||
while (true) {
|
||||
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
|
||||
written = _peer->write(_writeBuffer._buffer, err);
|
||||
|
||||
if (err) {
|
||||
break;
|
||||
}
|
||||
|
||||
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
|
||||
|
||||
if (written != total) {
|
||||
// unable to write everything at once, might be a lot of data
|
||||
// above code does not update the buffer positon
|
||||
break;
|
||||
}
|
||||
|
||||
if (!completedWriteBuffer()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// try to send next buffer
|
||||
total = _writeBuffer._buffer->length();
|
||||
written = 0;
|
||||
if (err) {
|
||||
break;
|
||||
}
|
||||
|
||||
// write could have blocked which is the only acceptable error
|
||||
if (err && err != ::boost::asio::error::would_block) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: "
|
||||
<< err.message();
|
||||
closeStreamNoLock();
|
||||
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
|
||||
|
||||
if (written != total) {
|
||||
// unable to write everything at once, might be a lot of data
|
||||
// above code does not update the buffer positon
|
||||
break;
|
||||
}
|
||||
|
||||
if (!completedWriteBuffer()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// try to send next buffer
|
||||
total = _writeBuffer._buffer->length();
|
||||
written = 0;
|
||||
}
|
||||
|
||||
// write could have blocked which is the only acceptable error
|
||||
if (err && err != ::boost::asio::error::would_block) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on stream failed with: "
|
||||
<< err.message();
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
}
|
||||
|
||||
// so the code could have blocked at this point or not all data
|
||||
|
@ -211,13 +203,9 @@ void SocketTask::writeWriteBuffer() {
|
|||
auto self = shared_from_this();
|
||||
_peer->asyncWrite(boost::asio::buffer(_writeBuffer._buffer->begin() + written,
|
||||
total - written),
|
||||
_strand.wrap([self, this](const boost::system::error_code& ec,
|
||||
[self, this](const boost::system::error_code& ec,
|
||||
std::size_t transferred) {
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
|
||||
if (_abandoned) {
|
||||
return;
|
||||
}
|
||||
MUTEX_LOCKER(locker, _writeLock);
|
||||
|
||||
RequestStatistics::ADD_SENT_BYTES(
|
||||
_writeBuffer._statistics, transferred);
|
||||
|
@ -229,18 +217,16 @@ void SocketTask::writeWriteBuffer() {
|
|||
} else {
|
||||
if (completedWriteBuffer()) {
|
||||
_loop._scheduler->post([self, this]() {
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
MUTEX_LOCKER(locker, _writeLock);
|
||||
writeWriteBuffer();
|
||||
});
|
||||
}
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// caller must hold the _writeLock
|
||||
bool SocketTask::completedWriteBuffer() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
RequestStatistics::SET_WRITE_END(_writeBuffer._statistics);
|
||||
_writeBuffer.release();
|
||||
|
||||
|
@ -258,16 +244,14 @@ bool SocketTask::completedWriteBuffer() {
|
|||
return true;
|
||||
}
|
||||
|
||||
// caller must not hold the _lock
|
||||
// caller must not hold the _writeLock
|
||||
void SocketTask::closeStream() {
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
MUTEX_LOCKER(locker, _writeLock);
|
||||
closeStreamNoLock();
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// caller must hold the _writeLock
|
||||
void SocketTask::closeStreamNoLock() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
boost::system::error_code err;
|
||||
|
||||
bool closeSend = !_closedSend;
|
||||
|
@ -286,17 +270,17 @@ void SocketTask::closeStreamNoLock() {
|
|||
// --SECTION-- private methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// will acquire the _lock
|
||||
// will acquire the _readLock
|
||||
void SocketTask::addToReadBuffer(char const* data, std::size_t len) {
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
MUTEX_LOCKER(locker, _readLock);
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << std::string(data, len);
|
||||
_readBuffer.appendText(data, len);
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// will acquire the _writeLock
|
||||
void SocketTask::resetKeepAlive() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
MUTEX_LOCKER(locker, _writeLock);
|
||||
|
||||
if (_useKeepAliveTimer) {
|
||||
boost::system::error_code err;
|
||||
|
@ -323,21 +307,9 @@ void SocketTask::resetKeepAlive() {
|
|||
}
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// abandon the task. if the task was already abandoned, this
|
||||
// method returns false. if abandoing was successful, this
|
||||
// method returns true
|
||||
bool SocketTask::abandon() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
bool old = _abandoned;
|
||||
_abandoned = true;
|
||||
return !old;
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// will acquire the _writeLock
|
||||
void SocketTask::cancelKeepAlive() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
MUTEX_LOCKER(locker, _writeLock);
|
||||
|
||||
if (_useKeepAliveTimer && _keepAliveTimerActive) {
|
||||
boost::system::error_code err;
|
||||
|
@ -346,29 +318,30 @@ void SocketTask::cancelKeepAlive() {
|
|||
}
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// caller must hold the _readLock
|
||||
bool SocketTask::reserveMemory() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
if (_readBuffer.reserve(READ_BLOCK_SIZE + 1) == TRI_ERROR_OUT_OF_MEMORY) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "out of memory while reading from client";
|
||||
closeStreamNoLock();
|
||||
closeStream();
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// caller must hold the _readLock
|
||||
bool SocketTask::trySyncRead() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
|
||||
boost::system::error_code err;
|
||||
|
||||
if (_abandoned) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_peer) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "peer disappeared";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (0 == _peer->available(err)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -379,7 +352,10 @@ bool SocketTask::trySyncRead() {
|
|||
return false;
|
||||
}
|
||||
|
||||
size_t bytesRead = _peer->read(boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE), err);
|
||||
size_t bytesRead = 0;
|
||||
|
||||
bytesRead =
|
||||
_peer->read(boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE), err);
|
||||
|
||||
if (0 == bytesRead) {
|
||||
return false; // should not happen
|
||||
|
@ -400,13 +376,11 @@ bool SocketTask::trySyncRead() {
|
|||
return true;
|
||||
}
|
||||
|
||||
// caller must hold the _lock
|
||||
// caller must hold the _readLock
|
||||
bool SocketTask::processAll() {
|
||||
_lock.assertLockedByCurrentThread();
|
||||
double start_time = StatisticsFeature::time();
|
||||
|
||||
double startTime = StatisticsFeature::time();
|
||||
|
||||
while (processRead(startTime)) {
|
||||
while (processRead(start_time)) {
|
||||
if (_abandoned) {
|
||||
return false;
|
||||
}
|
||||
|
@ -425,53 +399,53 @@ bool SocketTask::processAll() {
|
|||
return true;
|
||||
}
|
||||
|
||||
// will acquire the _lock
|
||||
// will acquire the _readLock
|
||||
void SocketTask::asyncReadSome() {
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
|
||||
if (_abandoned) {
|
||||
MUTEX_LOCKER(locker, _readLock);
|
||||
|
||||
try {
|
||||
if (_abandoned) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t const MAX_DIRECT_TRIES = 2;
|
||||
size_t n = 0;
|
||||
|
||||
while (++n <= MAX_DIRECT_TRIES) {
|
||||
if (!reserveMemory()) {
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
|
||||
return;
|
||||
}
|
||||
|
||||
if (!trySyncRead()) {
|
||||
if (n < MAX_DIRECT_TRIES) {
|
||||
#ifdef TRI_HAVE_SCHED_H
|
||||
sched_yield();
|
||||
#endif
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// ignore the result of processAll, try to read more bytes down below
|
||||
processAll();
|
||||
compactify();
|
||||
}
|
||||
} catch (boost::system::system_error& err) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "i/o stream failed with: "
|
||||
<< err.what();
|
||||
|
||||
closeStream();
|
||||
return;
|
||||
} catch (...) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
|
||||
|
||||
closeStream();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_peer->isEncrypted()) {
|
||||
try {
|
||||
size_t const MAX_DIRECT_TRIES = 2;
|
||||
size_t n = 0;
|
||||
|
||||
while (++n <= MAX_DIRECT_TRIES) {
|
||||
if (!reserveMemory()) {
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
|
||||
return;
|
||||
}
|
||||
|
||||
if (!trySyncRead()) {
|
||||
if (n < MAX_DIRECT_TRIES) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// ignore the result of processAll, try to read more bytes down below
|
||||
processAll();
|
||||
compactify();
|
||||
}
|
||||
} catch (boost::system::system_error& err) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "i/o stream failed with: "
|
||||
<< err.what();
|
||||
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
} catch (...) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
|
||||
|
||||
closeStreamNoLock();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// try to read more bytes
|
||||
if (!_abandoned) {
|
||||
if (!_abandoned && _peer) {
|
||||
if (!reserveMemory()) {
|
||||
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
|
||||
return;
|
||||
|
@ -485,21 +459,17 @@ void SocketTask::asyncReadSome() {
|
|||
|
||||
_peer->asyncRead(
|
||||
boost::asio::buffer(_readBuffer.end(), READ_BLOCK_SIZE),
|
||||
_strand.wrap([self, this](const boost::system::error_code& ec,
|
||||
[self, this](const boost::system::error_code& ec,
|
||||
std::size_t transferred) {
|
||||
JobGuard guard(_loop);
|
||||
guard.work();
|
||||
|
||||
MUTEX_LOCKER(locker, _lock);
|
||||
|
||||
if (_abandoned) {
|
||||
return;
|
||||
}
|
||||
MUTEX_LOCKER(locker, _readLock);
|
||||
|
||||
if (ec) {
|
||||
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
|
||||
<< "read on stream failed with: " << ec.message();
|
||||
closeStreamNoLock();
|
||||
closeStream();
|
||||
} else {
|
||||
_readBuffer.increaseLength(transferred);
|
||||
|
||||
|
@ -509,6 +479,6 @@ void SocketTask::asyncReadSome() {
|
|||
|
||||
compactify();
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ class ConnectionStatistics;
|
|||
namespace rest {
|
||||
class SocketTask : virtual public Task {
|
||||
friend class HttpCommTask;
|
||||
|
||||
explicit SocketTask(SocketTask const&) = delete;
|
||||
SocketTask& operator=(SocketTask const&) = delete;
|
||||
|
||||
|
@ -60,8 +59,8 @@ class SocketTask : virtual public Task {
|
|||
void start();
|
||||
|
||||
protected:
|
||||
// caller will hold the _lock
|
||||
virtual bool processRead(double startTime) = 0;
|
||||
// caller will hold the _readLock
|
||||
virtual bool processRead(double start_time) = 0;
|
||||
virtual void compactify() {}
|
||||
|
||||
// This function is used during the protocol switch from http
|
||||
|
@ -125,28 +124,29 @@ class SocketTask : virtual public Task {
|
|||
}
|
||||
};
|
||||
|
||||
// will acquire the _lock
|
||||
// will acquire the _writeLock
|
||||
void addWriteBuffer(WriteBuffer&);
|
||||
|
||||
// will acquire the _lock
|
||||
// will acquire the _writeLock
|
||||
void closeStream();
|
||||
|
||||
// caller must hold the _lock
|
||||
void closeStreamNoLock();
|
||||
|
||||
// caller must hold the _lock
|
||||
// will acquire the _writeLock
|
||||
void resetKeepAlive();
|
||||
|
||||
// caller must hold the _lock
|
||||
// will acquire the _writeLock
|
||||
void cancelKeepAlive();
|
||||
|
||||
|
||||
protected:
|
||||
Mutex _lock;
|
||||
ConnectionStatistics* _connectionStatistics;
|
||||
ConnectionInfo _connectionInfo;
|
||||
basics::StringBuffer _readBuffer; // needs _lock
|
||||
|
||||
Mutex _readLock;
|
||||
basics::StringBuffer _readBuffer; // needs _readLock
|
||||
|
||||
private:
|
||||
// caller must hold the _writeLock
|
||||
void closeStreamNoLock();
|
||||
|
||||
void writeWriteBuffer();
|
||||
bool completedWriteBuffer();
|
||||
|
||||
|
@ -154,21 +154,19 @@ class SocketTask : virtual public Task {
|
|||
bool trySyncRead();
|
||||
bool processAll();
|
||||
void asyncReadSome();
|
||||
bool abandon();
|
||||
|
||||
private:
|
||||
Mutex _writeLock;
|
||||
WriteBuffer _writeBuffer;
|
||||
std::list<WriteBuffer> _writeBuffers;
|
||||
|
||||
boost::asio::io_service::strand _strand;
|
||||
|
||||
std::unique_ptr<Socket> _peer;
|
||||
boost::posix_time::milliseconds _keepAliveTimeout;
|
||||
boost::asio::deadline_timer _keepAliveTimer;
|
||||
bool const _useKeepAliveTimer;
|
||||
bool _keepAliveTimerActive;
|
||||
bool _closeRequested;
|
||||
std::atomic<bool> _abandoned;
|
||||
std::atomic_bool _abandoned;
|
||||
|
||||
bool _closedSend = false;
|
||||
bool _closedReceive = false;
|
||||
|
|
|
@ -23,34 +23,18 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "Mutex.h"
|
||||
#include "Logger/Logger.h"
|
||||
|
||||
#include <limits>
|
||||
#include "Logger/Logger.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief constructs a mutex
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_HAVE_POSIX_THREADS
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
// initialize _holder to "maximum" thread id. this will work if the type of _holder
|
||||
// is numeric, but will not work if its type is more complex.
|
||||
Mutex::Mutex() : _mutex(), _holder((std::numeric_limits<decltype(_holder)>::max)()) {
|
||||
#else
|
||||
Mutex::Mutex() : _mutex() {
|
||||
#endif
|
||||
pthread_mutexattr_init(&_attributes);
|
||||
|
||||
#ifdef __linux__
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
// use an error checking mutex if available (only for LinuxThread) and only
|
||||
// in maintainer mode
|
||||
pthread_mutexattr_settype(&_attributes, PTHREAD_MUTEX_ERRORCHECK_NP);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
pthread_mutex_init(&_mutex, &_attributes);
|
||||
}
|
||||
Mutex::Mutex() : _mutex() { pthread_mutex_init(&_mutex, nullptr); }
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -60,13 +44,13 @@ Mutex::Mutex() : _mutex() { InitializeSRWLock(&_mutex); }
|
|||
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief deletes the mutex
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_HAVE_POSIX_THREADS
|
||||
|
||||
Mutex::~Mutex() {
|
||||
pthread_mutex_destroy(&_mutex);
|
||||
pthread_mutexattr_destroy(&_attributes);
|
||||
}
|
||||
Mutex::~Mutex() { pthread_mutex_destroy(&_mutex); }
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -75,15 +59,13 @@ Mutex::~Mutex() {
|
|||
Mutex::~Mutex() {}
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief acquires the lock
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_HAVE_POSIX_THREADS
|
||||
|
||||
void Mutex::lock() {
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
// we must not hold the lock ourselves here
|
||||
TRI_ASSERT(_holder != Thread::currentThreadId());
|
||||
#endif
|
||||
|
||||
int rc = pthread_mutex_lock(&_mutex);
|
||||
|
||||
if (rc != 0) {
|
||||
|
@ -94,18 +76,9 @@ void Mutex::lock() {
|
|||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "could not lock the mutex object: " << strerror(rc);
|
||||
FATAL_ERROR_ABORT();
|
||||
}
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
_holder = Thread::currentThreadId();
|
||||
#endif
|
||||
}
|
||||
|
||||
bool Mutex::tryLock() {
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
// we must not hold the lock ourselves here
|
||||
TRI_ASSERT(_holder != Thread::currentThreadId());
|
||||
#endif
|
||||
|
||||
int rc = pthread_mutex_trylock(&_mutex);
|
||||
|
||||
if (rc != 0) {
|
||||
|
@ -118,10 +91,6 @@ bool Mutex::tryLock() {
|
|||
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "could not lock the mutex object: " << strerror(rc);
|
||||
FATAL_ERROR_ABORT();
|
||||
}
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
_holder = Thread::currentThreadId();
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -135,14 +104,13 @@ bool Mutex::tryLock() { return TryAcquireSRWLockExclusive(&_mutex) != 0; }
|
|||
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief releases the lock
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef TRI_HAVE_POSIX_THREADS
|
||||
|
||||
void Mutex::unlock() {
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
TRI_ASSERT(_holder == Thread::currentThreadId());
|
||||
_holder = 0;
|
||||
#endif
|
||||
int rc = pthread_mutex_unlock(&_mutex);
|
||||
|
||||
if (rc != 0) {
|
||||
|
@ -158,11 +126,3 @@ void Mutex::unlock() {
|
|||
void Mutex::unlock() { ReleaseSRWLockExclusive(&_mutex); }
|
||||
|
||||
#endif
|
||||
|
||||
/// @brief assert that the mutex is locked by the current thread. will do
|
||||
/// nothing in non-maintainer mode and will do nothing for non-posix locks
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
void Mutex::assertLockedByCurrentThread() {
|
||||
TRI_ASSERT(_holder == Thread::currentThreadId());
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -27,61 +27,66 @@
|
|||
|
||||
#include "Basics/Common.h"
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
#include "Basics/Thread.h"
|
||||
#endif
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief mutex
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class Mutex {
|
||||
private:
|
||||
Mutex(Mutex const&) = delete;
|
||||
Mutex& operator=(Mutex const&) = delete;
|
||||
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief constructs a mutex
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Mutex();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief deletes the mutex
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
~Mutex();
|
||||
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief acquires the lock
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void lock();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tries to acquire the lock
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool tryLock();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief releases the lock
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void unlock();
|
||||
|
||||
/// @brief assert that the mutex is locked by the current thread. will do
|
||||
/// nothing in non-maintainer mode and will do nothing for non-posix locks
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
void assertLockedByCurrentThread();
|
||||
#else
|
||||
inline void assertLockedByCurrentThread() {}
|
||||
#endif
|
||||
|
||||
private:
|
||||
#ifdef TRI_HAVE_POSIX_THREADS
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief pthread mutex
|
||||
pthread_mutex_t _mutex;
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// @brief pthread mutex attributes
|
||||
pthread_mutexattr_t _attributes;
|
||||
pthread_mutex_t _mutex;
|
||||
#endif
|
||||
|
||||
#ifdef TRI_HAVE_WIN32_THREADS
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief SRWLocks
|
||||
///
|
||||
/// as of VS2013, exclusive SRWLocks tend to be faster than native mutexes
|
||||
SRWLOCK _mutex;
|
||||
#endif
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
TRI_tid_t _holder;
|
||||
SRWLOCK _mutex;
|
||||
#endif
|
||||
};
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ class MutexLocker {
|
|||
|
||||
public:
|
||||
/// @brief acquires a mutex
|
||||
/// The constructor acquires the mutex, the destructor unlocks the mutex.
|
||||
/// The constructor acquires a read lock, the destructor unlocks the mutex.
|
||||
MutexLocker(LockType* mutex, LockerType type, bool condition, char const* file, int line)
|
||||
: _mutex(mutex), _file(file), _line(line),
|
||||
#ifdef TRI_SHOW_LOCK_TIME
|
||||
|
|
|
@ -1,102 +0,0 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Dr. Frank Celler
|
||||
/// @author Achim Brandt
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGODB_BASICS_MUTEX_UNLOCKER_H
|
||||
#define ARANGODB_BASICS_MUTEX_UNLOCKER_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Locking.h"
|
||||
|
||||
#ifdef TRI_SHOW_LOCK_TIME
|
||||
#include "Logger/Logger.h"
|
||||
#endif
|
||||
|
||||
#include <thread>
|
||||
|
||||
#define MUTEX_UNLOCKER(obj, lock) \
|
||||
arangodb::basics::MutexUnlocker<typename std::decay<decltype (lock)>::type> obj(&(lock), __FILE__, __LINE__)
|
||||
|
||||
namespace arangodb {
|
||||
namespace basics {
|
||||
|
||||
/// @brief mutex locker
|
||||
/// A MutexUnlocker unlocks a mutex during its lifetime und locks the mutex
|
||||
/// when it is destroyed.
|
||||
template<class LockType>
|
||||
class MutexUnlocker {
|
||||
MutexUnlocker(MutexUnlocker const&) = delete;
|
||||
MutexUnlocker& operator=(MutexUnlocker const&) = delete;
|
||||
|
||||
public:
|
||||
/// The constructor unlocks the mutex, the destructor locks the mutex.
|
||||
MutexUnlocker(LockType* mutex, char const* file, int line)
|
||||
: _mutex(mutex), _file(file), _line(line), _isLocked(true) {
|
||||
|
||||
unlock();
|
||||
}
|
||||
|
||||
/// @brief releases the read-lock
|
||||
~MutexUnlocker() {
|
||||
if (!_isLocked) {
|
||||
lock();
|
||||
}
|
||||
}
|
||||
|
||||
bool isLocked() const { return _isLocked; }
|
||||
|
||||
/// @brief acquire the mutex, blocking
|
||||
void lock() {
|
||||
TRI_ASSERT(!_isLocked);
|
||||
_mutex->lock();
|
||||
_isLocked = true;
|
||||
}
|
||||
|
||||
/// @brief unlocks the mutex if we own it
|
||||
bool unlock() {
|
||||
if (_isLocked) {
|
||||
_mutex->unlock();
|
||||
_isLocked = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
/// @brief the mutex
|
||||
LockType* _mutex;
|
||||
|
||||
/// @brief file
|
||||
char const* _file;
|
||||
|
||||
/// @brief line number
|
||||
int _line;
|
||||
|
||||
/// @brief whether or not the mutex is locked
|
||||
bool _isLocked;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue