1
0
Fork 0

fix crash when pressing CTRL+c in arangosh (#6187)

This commit is contained in:
Jan 2018-08-21 11:12:11 +02:00 committed by GitHub
parent 2aa879a53b
commit cb12be3e4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 202 additions and 243 deletions

View File

@ -52,17 +52,6 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief Send a request to the server and wait into a response it received. /// @brief Send a request to the server and wait into a response it received.
std::unique_ptr<Response> sendRequest(std::unique_ptr<Request> r); std::unique_ptr<Response> sendRequest(std::unique_ptr<Request> r);
/// @brief sed request synchronously, only save to use if the
virtual std::unique_ptr<Response> sendRequestSync(std::unique_ptr<Request> r) {
return sendRequest(std::move(r));
}
// Send a request to the server and wait into a response it received.
/*std::unique_ptr<Response> sendRequest(Request const& r) {
std::unique_ptr<Request> copy(new Request(r));
return sendRequest(std::move(copy));
}*/
// Send a request to the server and return immediately. // Send a request to the server and return immediately.
// When a response is received or an error occurs, the corresponding // When a response is received or an error occurs, the corresponding
// callbackis called. The callback is executed on a specific // callbackis called. The callback is executed on a specific
@ -70,13 +59,6 @@ class Connection : public std::enable_shared_from_this<Connection> {
virtual MessageID sendRequest(std::unique_ptr<Request> r, virtual MessageID sendRequest(std::unique_ptr<Request> r,
RequestCallback cb) = 0; RequestCallback cb) = 0;
// Send a request to the server and return immediately.
// When a response is received or an error occurs, the corresponding
// callback is called.
/*MessageID sendRequest(Request const& r, RequestCallback cb) {
std::unique_ptr<Request> copy(new Request(r));
return sendRequest(std::move(copy), cb);
}*/
// Return the number of requests that have not yet finished. // Return the number of requests that have not yet finished.
virtual std::size_t requestsLeft() const = 0; virtual std::size_t requestsLeft() const = 0;
@ -84,9 +66,8 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief connection state /// @brief connection state
virtual State state() const = 0; virtual State state() const = 0;
/// @brief Activate the connection. /// @brief cancel the connection, unusable afterwards
virtual void startConnection() = 0; virtual void cancel() = 0;
virtual void shutdownConnection(const ErrorCondition) = 0;
std::string endpoint() const; std::string endpoint() const;
@ -94,6 +75,9 @@ class Connection : public std::enable_shared_from_this<Connection> {
Connection(detail::ConnectionConfiguration const& conf) Connection(detail::ConnectionConfiguration const& conf)
: _config(conf) {} : _config(conf) {}
/// @brief Activate the connection.
virtual void startConnection() = 0;
// Invoke the configured ConnectionFailureCallback (if any) // Invoke the configured ConnectionFailureCallback (if any)
void onFailure(Error errorCode, const std::string& errorMessage) { void onFailure(Error errorCode, const std::string& errorMessage) {
if (_config._onFailure) { if (_config._onFailure) {

View File

@ -38,8 +38,10 @@ struct Socket<SocketType::Tcp> {
: resolver(ctx), socket(ctx) {} : resolver(ctx), socket(ctx) {}
~Socket() { ~Socket() {
try {
resolver.cancel(); resolver.cancel();
shutdown(); shutdown();
} catch(...) {}
} }
template<typename CallbackT> template<typename CallbackT>
@ -63,10 +65,10 @@ struct Socket<SocketType::Tcp> {
} }
void shutdown() { void shutdown() {
if (socket.is_open()) { if (socket.is_open()) {
asio_ns::error_code error; // prevents exceptions asio_ns::error_code ec; // prevents exceptions
socket.cancel(error); socket.cancel(ec);
socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, error); socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
socket.close(error); socket.close(ec);
} }
} }
@ -81,8 +83,10 @@ struct Socket<fuerte::SocketType::Ssl> {
: resolver(ctx), socket(ctx, loop.sslContext()) {} : resolver(ctx), socket(ctx, loop.sslContext()) {}
~Socket() { ~Socket() {
try {
resolver.cancel(); resolver.cancel();
shutdown(); shutdown();
} catch(...) {}
} }
template<typename CallbackT> template<typename CallbackT>
@ -124,12 +128,10 @@ struct Socket<fuerte::SocketType::Ssl> {
resolver.async_resolve({config._host, config._port}, rcb); resolver.async_resolve({config._host, config._port}, rcb);
} }
void shutdown() { void shutdown() {
//socket.cancel(); if (socket.next_layer().is_open()) {
if (socket.lowest_layer().is_open()) { asio_ns::error_code ec;
asio_ns::error_code error; socket.next_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
socket.shutdown(error); socket.shutdown(ec);
socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, error);
socket.lowest_layer().close(error);
} }
} }

View File

@ -63,7 +63,7 @@ std::shared_ptr<Connection> ConnectionBuilder::connect(EventLoopService& loop) {
#endif #endif
} }
if (!result) { if (!result) {
throw std::logic_error("unsupported socket type"); throw std::logic_error("unsupported socket or protocol type");
} }
// Start the connection implementation // Start the connection implementation

View File

@ -104,7 +104,7 @@ HttpConnection<ST>::HttpConnection(EventLoopService& loop,
_state(Connection::State::Disconnected), _state(Connection::State::Disconnected),
_numQueued(0), _numQueued(0),
_active(false), _active(false),
_queue(1024) { _queue() {
// initialize http parsing code // initialize http parsing code
_parserSettings.on_message_begin = ::on_message_began; _parserSettings.on_message_begin = ::on_message_began;
_parserSettings.on_status = ::on_status; _parserSettings.on_status = ::on_status;
@ -141,7 +141,12 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
RequestCallback cb) { RequestCallback cb) {
static std::atomic<uint64_t> ticketId(1); static std::atomic<uint64_t> ticketId(1);
assert(req); Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Failed) {
cb(errorToInt(ErrorCondition::Canceled), std::move(req), nullptr);
return 0;
}
// construct RequestItem // construct RequestItem
std::unique_ptr<RequestItem> item(new RequestItem()); std::unique_ptr<RequestItem> item(new RequestItem());
// requestItem->_response later // requestItem->_response later
@ -153,13 +158,12 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
// Prepare a new request // Prepare a new request
uint64_t id = item->_messageID; uint64_t id = item->_messageID;
if (!_queue.push(item.get())) { if (!_queue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capactiy exceeded\n"; FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
throw std::length_error("connection queue capactiy exceeded"); throw std::length_error("connection queue capacity exceeded");
} }
item.release(); item.release();
_numQueued.fetch_add(1, std::memory_order_relaxed); _numQueued.fetch_add(1, std::memory_order_relaxed);
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Connected) { if (state == Connection::State::Connected) {
FUERTE_LOG_HTTPTRACE << "sendRequest (http): start sending & reading\n"; FUERTE_LOG_HTTPTRACE << "sendRequest (http): start sending & reading\n";
startWriting(); startWriting();
@ -173,7 +177,6 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
// Activate this connection. // Activate this connection.
template <SocketType ST> template <SocketType ST>
void HttpConnection<ST>::startConnection() { void HttpConnection<ST>::startConnection() {
// start connecting only if state is disconnected // start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected; Connection::State exp = Connection::State::Disconnected;
if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) { if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
@ -195,19 +198,38 @@ void HttpConnection<ST>::startConnection() {
}); });
} }
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void HttpConnection<ST>::cancel() {
std::weak_ptr<Connection> self = shared_from_this();
asio_ns::post(*_io_context, [self, this] {
auto s = self.lock();
if (s) {
shutdownConnection(ErrorCondition::Canceled);
_state.store(State::Failed);
}
});
}
// shutdown the connection and cancel all pending messages. // shutdown the connection and cancel all pending messages.
template<SocketType ST> template<SocketType ST>
void HttpConnection<ST>::shutdownConnection(const ErrorCondition ec) { void HttpConnection<ST>::shutdownConnection(const ErrorCondition ec) {
FUERTE_LOG_CALLBACKS << "shutdownConnection\n"; FUERTE_LOG_CALLBACKS << "shutdownConnection\n";
_state.store(State::Disconnected, std::memory_order_release); if (_state.load() != State::Failed) {
_state.store(State::Disconnected);
}
// cancel timeouts
try { try {
_timeout.cancel(); // cancel timeouts _timeout.cancel();
} catch (...) { } catch (...) {
// cancel() may throw, but we are not allowed to throw here // cancel() may throw, but we are not allowed to throw here
// as we may be called from the dtor // as we may be called from the dtor
} }
_protocol.shutdown(); // Close socket
// Close socket
_protocol.shutdown();
_active.store(false); // no IO operations running _active.store(false); // no IO operations running
RequestItem* item = nullptr; RequestItem* item = nullptr;
@ -335,7 +357,7 @@ void HttpConnection<ST>::asyncWriteNextRequest() {
return; return;
} }
// a request got queued in-between last minute // a request got queued in-between last minute
_active.store(true, std::memory_order_release); _active.store(true);
} }
std::shared_ptr<http::RequestItem> item(ptr); std::shared_ptr<http::RequestItem> item(ptr);
_numQueued.fetch_sub(1, std::memory_order_relaxed); _numQueued.fetch_sub(1, std::memory_order_relaxed);
@ -505,115 +527,23 @@ template<SocketType ST>
void HttpConnection<ST>::setTimeout(std::chrono::milliseconds millis) { void HttpConnection<ST>::setTimeout(std::chrono::milliseconds millis) {
if (millis.count() == 0) { if (millis.count() == 0) {
_timeout.cancel(); _timeout.cancel();
return; // do return;
} }
assert(millis.count() > 0); assert(millis.count() > 0);
_timeout.expires_after(millis); _timeout.expires_after(millis);
auto self = shared_from_this();
_timeout.async_wait([this, self] (asio_ns::error_code const& e) {
if (e == asio_ns::error::operation_aborted) {
// timer was canceled
return;
}
if (!e) { // expired std::weak_ptr<Connection> self = shared_from_this();
_timeout.async_wait([self, this] (asio_ns::error_code const& ec) {
if (!ec) {
auto s = self.lock();
if (s) {
FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; FUERTE_LOG_DEBUG << "HTTP-Request timeout\n";
restartConnection(ErrorCondition::Timeout); restartConnection(ErrorCondition::Timeout);
} }
}
}); });
} }
/// @brief sed request synchronously, only save to use if the
template<SocketType ST>
std::unique_ptr<Response> HttpConnection<ST>::sendRequestSync(std::unique_ptr<Request> req) {
int max = 1024;
Connection::State state = _state.load(std::memory_order_acquire);
while (state != State::Connected && max-- > 0) {
if (state == State::Failed) {
return nullptr;
} else if (state == State::Disconnected) {
startConnection();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
state = _state.load(std::memory_order_acquire);
}
if (state != State::Connected) {
throw ErrorCondition::CouldNotConnect;
}
RequestItem item;
// requestItem->_response later
item._requestHeader = buildRequestBody(*req);
item._request = std::move(req);
item._response.reset(new Response());
setTimeout(item._request->timeout());
std::vector<asio_ns::const_buffer> buffers(2);
buffers.emplace_back(item._requestHeader.data(),
item._requestHeader.size());
// GET and HEAD have no payload
if (item._request->header.restVerb != RestVerb::Get &&
item._request->header.restVerb != RestVerb::Head) {
buffers.emplace_back(item._request->payload());
}
asio_ns::error_code ec;
asio_ns::write(_protocol.socket, buffers, ec);
if (ec) {
auto err = checkEOFError(ec, ErrorCondition::WriteError);;
shutdownConnection(err);
throw err;
}
http_parser_init(&_parser, HTTP_RESPONSE);
_parser.data = static_cast<void*>(&item);
while (!item.message_complete) {
// reserve 32kB in output buffer
auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE);
size_t transferred = _protocol.socket.read_some(mutableBuff, ec);
if (ec) {
auto err = checkEOFError(ec, ErrorCondition::ReadError);;
shutdownConnection(err);
throw err;
}
// Inspect the data we've received so far.
auto cursor = asio_ns::buffer_cast<const char*>(_receiveBuffer.data()); // no copy
/* Start up / continue the parser.
* Note we pass recved==0 to signal that EOF has been received.
*/
size_t nparsed = http_parser_execute(&_parser, &_parserSettings,
cursor, transferred);
if (_parser.upgrade || nparsed != transferred) {
/* Handle error. Usually just close the connection. */
FUERTE_LOG_ERROR << "Invalid HTTP response in parser\n";
shutdownConnection(ErrorCondition::ProtocolError); // will cleanup _inFlight
throw ErrorCondition::ProtocolError;
}
// item.message_complete may have been set by the call to http_parser_execute!
if (item.message_complete) {
//_timeout.cancel(); // got response in time
// Remove consumed data from receive buffer.
_receiveBuffer.consume(nparsed);
// thread-safe access on IO-Thread
if (!item._responseBuffer.empty()) {
item._response->setPayload(std::move(item._responseBuffer), 0);
}
if (!item.should_keep_alive) {
shutdownConnection(ErrorCondition::CloseRequested);
}
return std::move(item._response);
}
}
throw ErrorCondition::Timeout;
}
template class arangodb::fuerte::v1::http::HttpConnection<SocketType::Tcp>; template class arangodb::fuerte::v1::http::HttpConnection<SocketType::Tcp>;
template class arangodb::fuerte::v1::http::HttpConnection<SocketType::Ssl>; template class arangodb::fuerte::v1::http::HttpConnection<SocketType::Ssl>;
#ifdef ASIO_HAS_LOCAL_SOCKETS #ifdef ASIO_HAS_LOCAL_SOCKETS

View File

@ -55,9 +55,6 @@ class HttpConnection final : public fuerte::Connection {
/// Start an asynchronous request. /// Start an asynchronous request.
MessageID sendRequest(std::unique_ptr<Request>, RequestCallback) override; MessageID sendRequest(std::unique_ptr<Request>, RequestCallback) override;
/// @brief sed request synchronously, only save to use if the
std::unique_ptr<Response> sendRequestSync(std::unique_ptr<Request> r) override;
// Return the number of unfinished requests. // Return the number of unfinished requests.
size_t requestsLeft() const override { size_t requestsLeft() const override {
return _numQueued.load(std::memory_order_acquire); return _numQueued.load(std::memory_order_acquire);
@ -71,10 +68,14 @@ class HttpConnection final : public fuerte::Connection {
// Activate this connection // Activate this connection
void startConnection() override; void startConnection() override;
// called on shutdown, always call superclass /// @brief cancel the connection, unusable afterwards
void shutdownConnection(const ErrorCondition) override; void cancel() override;
private: private:
// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);
// restart connection // restart connection
void restartConnection(const ErrorCondition); void restartConnection(const ErrorCondition);
@ -124,7 +125,8 @@ class HttpConnection final : public fuerte::Connection {
std::atomic<bool> _active; std::atomic<bool> _active;
/// elements to send out /// elements to send out
boost::lockfree::queue<fuerte::v1::http::RequestItem*> _queue; boost::lockfree::queue<fuerte::v1::http::RequestItem*,
boost::lockfree::capacity<1024>> _queue;
/// cached authentication header /// cached authentication header
std::string _authHeader; std::string _authHeader;

View File

@ -49,7 +49,7 @@ VstConnection<ST>::VstConnection(
_timeout(*_io_context), _timeout(*_io_context),
_state(Connection::State::Disconnected), _state(Connection::State::Disconnected),
_loopState(0), _loopState(0),
_writeQueue(1024){} _writeQueue() {}
template<SocketType ST> template<SocketType ST>
VstConnection<ST>::~VstConnection() { VstConnection<ST>::~VstConnection() {
@ -62,6 +62,12 @@ static std::atomic<MessageID> vstMessageId(1);
template<SocketType ST> template<SocketType ST>
MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req, MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
RequestCallback cb) { RequestCallback cb) {
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Failed) {
cb(errorToInt(ErrorCondition::Canceled), std::move(req), nullptr);
return 0;
}
// it does not matter if IDs are reused on different connections // it does not matter if IDs are reused on different connections
uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed); uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed);
// Create RequestItem from parameters // Create RequestItem from parameters
@ -74,14 +80,13 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
// Add item to send queue // Add item to send queue
if (!_writeQueue.push(item.get())) { if (!_writeQueue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capactiy exceeded\n"; FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
throw std::length_error("connection queue capactiy exceeded"); throw std::length_error("connection queue capacity exceeded");
} }
item.release(); item.release();
// WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg // WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg
uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst); uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst);
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Connected) { if (state == Connection::State::Connected) {
FUERTE_LOG_VSTTRACE << "sendRequest (vst): start sending & reading\n"; FUERTE_LOG_VSTTRACE << "sendRequest (vst): start sending & reading\n";
if (!(loop & WRITE_LOOP_ACTIVE)) { if (!(loop & WRITE_LOOP_ACTIVE)) {
@ -103,7 +108,6 @@ void VstConnection<ST>::startConnection() {
FUERTE_LOG_ERROR << "already resolving endpoint\n"; FUERTE_LOG_ERROR << "already resolving endpoint\n";
return; return;
} }
auto self = shared_from_this(); auto self = shared_from_this();
_protocol.connect(_config, [self, this](asio_ns::error_code const& ec) { _protocol.connect(_config, [self, this](asio_ns::error_code const& ec) {
if (ec) { if (ec) {
@ -117,18 +121,34 @@ void VstConnection<ST>::startConnection() {
}); });
} }
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void VstConnection<ST>::cancel() {
std::weak_ptr<Connection> self = shared_from_this();
asio_ns::post(*_io_context, [self, this] {
auto s = self.lock();
if (s) {
shutdownConnection(ErrorCondition::Canceled);
_state.store(State::Failed);
}
});
}
// shutdown the connection and cancel all pending messages. // shutdown the connection and cancel all pending messages.
template <SocketType ST> template <SocketType ST>
void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) { void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) {
FUERTE_LOG_CALLBACKS << "shutdownConnection\n"; FUERTE_LOG_CALLBACKS << "shutdownConnection\n";
_state.store(State::Disconnected, std::memory_order_release); if (_state.load() != State::Failed) {
_state.store(State::Disconnected);
}
// cancel timeouts // cancel timeouts
try { try {
_timeout.cancel(); _timeout.cancel();
} catch (...) { } catch (...) {
// cancel() may throw // cancel() may throw, but we are not allowed to throw here
// as we may be called from the dtor
} }
// Close socket // Close socket
@ -199,10 +219,9 @@ void VstConnection<ST>::finishInitialization() {
auto self = shared_from_this(); auto self = shared_from_this();
asio_ns::async_write(_protocol.socket, asio_ns::async_write(_protocol.socket,
asio_ns::buffer(vstHeader, strlen(vstHeader)), asio_ns::buffer(vstHeader, strlen(vstHeader)),
[this, self](asio_ns::error_code const& ec, std::size_t transferred) { [self, this](asio_ns::error_code const& ec, std::size_t transferred) {
if (ec) { if (ec) {
FUERTE_LOG_ERROR << ec.message() << "\n"; FUERTE_LOG_ERROR << ec.message() << "\n";
_state.store(Connection::State::Disconnected, std::memory_order_release);
shutdownConnection(ErrorCondition::CouldNotConnect); shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect), onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"unable to initialize connection: error=" + ec.message()); "unable to initialize connection: error=" + ec.message());
@ -242,7 +261,7 @@ void VstConnection<ST>::sendAuthenticationRequest() {
item->prepareForNetwork(_vstVersion, header, asio_ns::const_buffer(0,0)); item->prepareForNetwork(_vstVersion, header, asio_ns::const_buffer(0,0));
auto self = shared_from_this(); auto self = shared_from_this();
item->_callback = [this, self](Error error, std::unique_ptr<Request>, item->_callback = [self, this](Error error, std::unique_ptr<Request>,
std::unique_ptr<Response> resp) { std::unique_ptr<Response> resp) {
if (error || resp->statusCode() != StatusOK) { if (error || resp->statusCode() != StatusOK) {
_state.store(State::Failed, std::memory_order_release); _state.store(State::Failed, std::memory_order_release);
@ -255,7 +274,7 @@ void VstConnection<ST>::sendAuthenticationRequest() {
// actually send auth request // actually send auth request
asio_ns::post(*_io_context, [this, self, item] { asio_ns::post(*_io_context, [this, self, item] {
auto cb = [this, self, item](asio_ns::error_code const& ec, auto cb = [self, item, this](asio_ns::error_code const& ec,
std::size_t transferred) { std::size_t transferred) {
if (ec) { if (ec) {
asyncWriteCallback(ec, transferred, std::move(item)); // error handling asyncWriteCallback(ec, transferred, std::move(item)); // error handling
@ -286,7 +305,7 @@ void VstConnection<ST>::startWriting() {
std::memory_order_seq_cst)) { std::memory_order_seq_cst)) {
FUERTE_LOG_TRACE << "startWriting (vst): starting write\n"; FUERTE_LOG_TRACE << "startWriting (vst): starting write\n";
auto self = shared_from_this(); // only one thread can get here per connection auto self = shared_from_this(); // only one thread can get here per connection
asio_ns::post(*_io_context, [this, self] { asio_ns::post(*_io_context, [self, this] {
asyncWriteNextRequest(); asyncWriteNextRequest();
}); });
return; return;
@ -328,7 +347,7 @@ void VstConnection<ST>::asyncWriteNextRequest() {
setTimeout(); // prepare request / connection timeouts setTimeout(); // prepare request / connection timeouts
auto self = shared_from_this(); auto self = shared_from_this();
auto cb = [this, self, item](asio_ns::error_code const& ec, std::size_t transferred) { auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) {
asyncWriteCallback(ec, transferred, std::move(item)); asyncWriteCallback(ec, transferred, std::move(item));
}; };
asio_ns::async_write(_protocol.socket, item->_requestBuffers, cb); asio_ns::async_write(_protocol.socket, item->_requestBuffers, cb);
@ -404,7 +423,7 @@ void VstConnection<ST>::startReading() {
while (!(state & READ_LOOP_ACTIVE)) { while (!(state & READ_LOOP_ACTIVE)) {
if (_loopState.compare_exchange_weak(state, state | READ_LOOP_ACTIVE)) { if (_loopState.compare_exchange_weak(state, state | READ_LOOP_ACTIVE)) {
auto self = shared_from_this(); // only one thread can get here per connection auto self = shared_from_this(); // only one thread can get here per connection
asio_ns::post(*_io_context, [this, self] { asio_ns::post(*_io_context, [self, this] {
asyncReadSome(); asyncReadSome();
}); });
return; return;
@ -444,7 +463,7 @@ void VstConnection<ST>::asyncReadSome() {
#endif #endif
auto self = shared_from_this(); auto self = shared_from_this();
auto cb = [this, self](asio_ns::error_code const& ec, size_t transferred) { auto cb = [self, this](asio_ns::error_code const& ec, size_t transferred) {
// received data is "committed" from output sequence to input sequence // received data is "committed" from output sequence to input sequence
_receiveBuffer.commit(transferred); _receiveBuffer.commit(transferred);
asyncReadCallback(ec, transferred); asyncReadCallback(ec, transferred);
@ -613,11 +632,15 @@ void VstConnection<ST>::setTimeout() {
} }
_timeout.expires_at(expires); _timeout.expires_at(expires);
auto self = shared_from_this(); std::weak_ptr<Connection> self = shared_from_this();
_timeout.async_wait([this, self](asio_ns::error_code const& ec) { _timeout.async_wait([self, this](asio_ns::error_code const& ec) {
if (ec) { // was canceled if (ec) { // was canceled
return; return;
} }
auto s = self.lock();
if (!s) {
return;
}
// cancel expired requests // cancel expired requests
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();

View File

@ -72,11 +72,14 @@ class VstConnection final : public Connection {
/// Activate the connection. /// Activate the connection.
void startConnection() override final; void startConnection() override final;
/// called on shutdown, always call superclass /// @brief cancel the connection, unusable afterwards
void shutdownConnection(const ErrorCondition) override; void cancel() override final;
private: private:
/// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);
void restartConnection(const ErrorCondition); void restartConnection(const ErrorCondition);
void finishInitialization(); void finishInitialization();
@ -151,7 +154,8 @@ class VstConnection final : public Connection {
static_assert((WRITE_LOOP_ACTIVE & READ_LOOP_ACTIVE) == 0, ""); static_assert((WRITE_LOOP_ACTIVE & READ_LOOP_ACTIVE) == 0, "");
/// elements to send out /// elements to send out
boost::lockfree::queue<vst::RequestItem*> _writeQueue; boost::lockfree::queue<vst::RequestItem*,
boost::lockfree::capacity<1024>> _writeQueue;
}; };
}}}} // namespace arangodb::fuerte::v1::vst }}}} // namespace arangodb::fuerte::v1::vst

View File

@ -192,6 +192,7 @@ GeneralCommTask::RequestFlow GeneralCommTask::prepareExecution(
break; // continue with auth check break; // continue with auth check
} }
// intentional fallthrough // intentional fallthrough
[[gnu::fallthrough]];
} }
case ServerState::Mode::TRYAGAIN: { case ServerState::Mode::TRYAGAIN: {
if (path.find("/_admin/shutdown") == std::string::npos && if (path.find("/_admin/shutdown") == std::string::npos &&

View File

@ -62,6 +62,10 @@ V8ClientConnection::V8ClientConnection()
_vpackOptions(VPackOptions::Defaults) { _vpackOptions(VPackOptions::Defaults) {
_vpackOptions.buildUnindexedObjects = true; _vpackOptions.buildUnindexedObjects = true;
_vpackOptions.buildUnindexedArrays = true; _vpackOptions.buildUnindexedArrays = true;
_builder.onFailure([this](int error, std::string const& msg) {
_lastHttpReturnCode = 503;
_lastErrorMessage = msg;
});
} }
V8ClientConnection::~V8ClientConnection() { V8ClientConnection::~V8ClientConnection() {
@ -69,34 +73,15 @@ V8ClientConnection::~V8ClientConnection() {
_loop.forceStop(); _loop.forceStop();
} }
void V8ClientConnection::init(ClientFeature* client) { void V8ClientConnection::createConnection() {
_requestTimeout = std::chrono::duration<double>(client->requestTimeout());
_username = client->username();
_password = client->password();
_databaseName = client->databaseName();
fuerte::ConnectionBuilder builder; _connection = _builder.connect(_loop);
builder.endpoint(client->endpoint());
if (!client->username().empty()) {
builder.user(client->username()).password(client->password());
builder.authenticationType(fuerte::AuthenticationType::Basic);
} else if (!client->jwtSecret().empty()) {
builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh"));
builder.authenticationType(fuerte::AuthenticationType::Jwt);
}
builder.onFailure([this](int error, std::string const& msg) {
_lastHttpReturnCode = 505;
_lastErrorMessage = msg;
});
shutdownConnection();
_connection = builder.connect(_loop);
fuerte::StringMap params{{"details","true"}}; fuerte::StringMap params{{"details","true"}};
auto req = fuerte::createRequest(fuerte::RestVerb::Get, "/_api/version", params); auto req = fuerte::createRequest(fuerte::RestVerb::Get, "/_api/version", params);
req->header.database = _databaseName; req->header.database = _databaseName;
try { try {
auto res = _connection->sendRequestSync(std::move(req)); auto res = _connection->sendRequest(std::move(req));
_lastHttpReturnCode = res->statusCode(); _lastHttpReturnCode = res->statusCode();
if (_lastHttpReturnCode == 200) { if (_lastHttpReturnCode == 200) {
@ -140,19 +125,16 @@ void V8ClientConnection::init(ClientFeature* client) {
} }
} catch (fuerte::ErrorCondition const& e) { // connection error } catch (fuerte::ErrorCondition const& e) { // connection error
_lastErrorMessage = fuerte::to_string(e); _lastErrorMessage = fuerte::to_string(e);
_lastHttpReturnCode = 505; _lastHttpReturnCode = 503;
} }
} }
void V8ClientConnection::setInterrupted(bool interrupted) { void V8ClientConnection::setInterrupted(bool interrupted) {
if (_connection) { if (interrupted && _connection.get() != nullptr) {
if (interrupted) { _connection->cancel();
shutdownConnection(); _connection.reset();
} else { } else if (!interrupted && _connection.get() == nullptr) {
if (_connection->state() == fuerte::Connection::State::Disconnected) { createConnection();
_connection->startConnection();
}
}
} }
} }
@ -168,12 +150,34 @@ std::string V8ClientConnection::endpointSpecification() const {
} }
void V8ClientConnection::connect(ClientFeature* client) { void V8ClientConnection::connect(ClientFeature* client) {
this->init(client); TRI_ASSERT(client);
_requestTimeout = std::chrono::duration<double>(client->requestTimeout());
_databaseName = client->databaseName();
_builder.endpoint(client->endpoint());
if (!client->username().empty()) {
_builder.user(client->username()).password(client->password());
_builder.authenticationType(fuerte::AuthenticationType::Basic);
} else if (!client->jwtSecret().empty()) {
_builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh"));
_builder.authenticationType(fuerte::AuthenticationType::Jwt);
}
createConnection();
} }
void V8ClientConnection::reconnect(ClientFeature* client) { void V8ClientConnection::reconnect(ClientFeature* client) {
_requestTimeout = std::chrono::duration<double>(client->requestTimeout());
_databaseName = client->databaseName();
_builder.endpoint(client->endpoint());
if (!client->username().empty()) {
_builder.user(client->username()).password(client->password());
_builder.authenticationType(fuerte::AuthenticationType::Basic);
} else if (!client->jwtSecret().empty()) {
_builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh"));
_builder.authenticationType(fuerte::AuthenticationType::Jwt);
}
try { try {
init(client); createConnection();
} catch (...) { } catch (...) {
std::string errorMessage = "error in '" + client->endpoint() + "'"; std::string errorMessage = "error in '" + client->endpoint() + "'";
throw errorMessage; throw errorMessage;
@ -186,7 +190,7 @@ void V8ClientConnection::reconnect(ClientFeature* client) {
<< "'" << endpointSpecification() << "', " << "'" << endpointSpecification() << "', "
<< "version " << _version << " [" << _mode << "], " << "version " << _version << " [" << _mode << "], "
<< "database '" << _databaseName << "', " << "database '" << _databaseName << "', "
<< "username: '" << _username << "'"; << "username: '" << client->username() << "'";
} else { } else {
if (client->getWarnConnect()) { if (client->getWarnConnect()) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) LOG_TOPIC(ERR, arangodb::Logger::FIXME)
@ -1353,7 +1357,7 @@ v8::Local<v8::Value> V8ClientConnection::requestData(
_lastHttpReturnCode = 0; _lastHttpReturnCode = 0;
if (!_connection) { if (!_connection) {
_lastErrorMessage = "not connected"; _lastErrorMessage = "not connected";
_lastHttpReturnCode = 505; _lastHttpReturnCode = 503;
return v8::Null(isolate); return v8::Null(isolate);
} }
@ -1403,7 +1407,7 @@ v8::Local<v8::Value> V8ClientConnection::requestData(
std::unique_ptr<fuerte::Response> response; std::unique_ptr<fuerte::Response> response;
try { try {
response = _connection->sendRequestSync(std::move(req)); response = _connection->sendRequest(std::move(req));
} catch (fuerte::ErrorCondition const& ec) { } catch (fuerte::ErrorCondition const& ec) {
return handleResult(isolate, nullptr, ec); return handleResult(isolate, nullptr, ec);
} }
@ -1455,10 +1459,10 @@ v8::Local<v8::Value> V8ClientConnection::requestDataRaw(
std::unique_ptr<fuerte::Response> response; std::unique_ptr<fuerte::Response> response;
try { try {
response = _connection->sendRequestSync(std::move(req)); response = _connection->sendRequest(std::move(req));
} catch (fuerte::ErrorCondition const& e) { } catch (fuerte::ErrorCondition const& e) {
_lastErrorMessage.assign(fuerte::to_string(e)); _lastErrorMessage.assign(fuerte::to_string(e));
_lastHttpReturnCode = 505; _lastHttpReturnCode = 503;
} }
v8::Local<v8::Object> result = v8::Object::New(isolate); v8::Local<v8::Object> result = v8::Object::New(isolate);
@ -1756,7 +1760,7 @@ void V8ClientConnection::initServer(v8::Isolate* isolate,
void V8ClientConnection::shutdownConnection() { void V8ClientConnection::shutdownConnection() {
if (_connection) { if (_connection) {
_connection->shutdownConnection(fuerte::ErrorCondition::Canceled); _connection->cancel();
_connection.reset(); _connection.reset();
} }
} }

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Basics/StringRef.h" #include "Basics/StringRef.h"
#include <fuerte/connection.h>
#include <fuerte/loop.h> #include <fuerte/loop.h>
#include <fuerte/types.h> #include <fuerte/types.h>
#include <v8.h> #include <v8.h>
@ -58,7 +59,7 @@ class V8ClientConnection {
~V8ClientConnection(); ~V8ClientConnection();
public: public:
void setInterrupted(bool value); void setInterrupted(bool interrupted);
bool isConnected(); bool isConnected();
void connect(ClientFeature*); void connect(ClientFeature*);
@ -66,8 +67,8 @@ class V8ClientConnection {
std::string const& databaseName() const { return _databaseName; } std::string const& databaseName() const { return _databaseName; }
void setDatabaseName(std::string const& value) { _databaseName = value; } void setDatabaseName(std::string const& value) { _databaseName = value; }
std::string const& username() const { return _username; } std::string username() const { return _builder.user(); }
std::string const& password() const { return _password; } std::string password() const { return _builder.password(); }
int lastHttpReturnCode() const { return _lastHttpReturnCode; } int lastHttpReturnCode() const { return _lastHttpReturnCode; }
std::string lastErrorMessage() const { return _lastErrorMessage; } std::string lastErrorMessage() const { return _lastErrorMessage; }
std::string const& version() const { return _version; } std::string const& version() const { return _version; }
@ -107,7 +108,8 @@ class V8ClientConnection {
ClientFeature*); ClientFeature*);
private: private:
void init(ClientFeature*);
void createConnection();
v8::Local<v8::Value> requestData( v8::Local<v8::Value> requestData(
v8::Isolate* isolate, fuerte::RestVerb verb, v8::Isolate* isolate, fuerte::RestVerb verb,
@ -130,8 +132,6 @@ class V8ClientConnection {
private: private:
std::string _databaseName; std::string _databaseName;
std::string _username;
std::string _password;
std::chrono::duration<double> _requestTimeout; std::chrono::duration<double> _requestTimeout;
int _lastHttpReturnCode; int _lastHttpReturnCode;
@ -140,8 +140,9 @@ class V8ClientConnection {
std::string _version; std::string _version;
std::string _mode; std::string _mode;
std::shared_ptr<fuerte::Connection> _connection;
fuerte::EventLoopService _loop; fuerte::EventLoopService _loop;
fuerte::ConnectionBuilder _builder;
std::shared_ptr<fuerte::Connection> _connection;
velocypack::Options _vpackOptions; velocypack::Options _vpackOptions;
}; };
} // namespace arangodb } // namespace arangodb

View File

@ -282,10 +282,10 @@ bool V8ShellFeature::printHello(V8ClientConnection* v8connection) {
} }
// the result is wrapped in a Javascript variable SYS_ARANGO // the result is wrapped in a Javascript variable SYS_ARANGO
std::unique_ptr<V8ClientConnection> V8ShellFeature::setup( std::shared_ptr<V8ClientConnection> V8ShellFeature::setup(
v8::Local<v8::Context>& context, bool createConnection, v8::Local<v8::Context>& context, bool createConnection,
std::vector<std::string> const& positionals, bool* promptError) { std::vector<std::string> const& positionals, bool* promptError) {
std::unique_ptr<V8ClientConnection> v8connection; std::shared_ptr<V8ClientConnection> v8connection;
ClientFeature* client = nullptr; ClientFeature* client = nullptr;
@ -293,10 +293,6 @@ std::unique_ptr<V8ClientConnection> V8ShellFeature::setup(
client = server()->getFeature<ClientFeature>("Client"); client = server()->getFeature<ClientFeature>("Client");
if (client != nullptr && client->isEnabled()) { if (client != nullptr && client->isEnabled()) {
/*auto jwtSecret = client->jwtSecret();
if (!jwtSecret.empty()) {
V8ClientConnection::setJwtSecret(jwtSecret);
}*/
v8connection = std::make_unique<V8ClientConnection>(); v8connection = std::make_unique<V8ClientConnection>();
v8connection->connect(client); v8connection->connect(client);
} else { } else {
@ -338,7 +334,7 @@ int V8ShellFeature::runShell(std::vector<std::string> const& positionals) {
if (v8connection != nullptr) { if (v8connection != nullptr) {
v8LineEditor.setSignalFunction( v8LineEditor.setSignalFunction(
[&v8connection]() { v8connection->setInterrupted(true); }); [v8connection]() { v8connection->setInterrupted(true); });
} }
v8LineEditor.open(_console->autoComplete()); v8LineEditor.open(_console->autoComplete());
@ -910,8 +906,8 @@ void V8ShellFeature::initGlobals() {
auto ctx = ArangoGlobalContext::CONTEXT; auto ctx = ArangoGlobalContext::CONTEXT;
if (ctx == nullptr) { if (ctx == nullptr) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "failed to get global context. "; << "failed to get global context";
FATAL_ERROR_EXIT(); FATAL_ERROR_EXIT();
} }

View File

@ -76,7 +76,7 @@ class V8ShellFeature final : public application_features::ApplicationFeature {
void initGlobals(); void initGlobals();
void initMode(ShellFeature::RunMode, std::vector<std::string> const&); void initMode(ShellFeature::RunMode, std::vector<std::string> const&);
void loadModules(ShellFeature::RunMode); void loadModules(ShellFeature::RunMode);
std::unique_ptr<V8ClientConnection> setup(v8::Local<v8::Context>& context, bool, std::shared_ptr<V8ClientConnection> setup(v8::Local<v8::Context>& context, bool,
std::vector<std::string> const&, std::vector<std::string> const&,
bool* promptError = nullptr); bool* promptError = nullptr);

View File

@ -23,6 +23,8 @@
#include "V8LineEditor.h" #include "V8LineEditor.h"
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h" #include "Basics/StringUtils.h"
#include "Basics/tri-strings.h" #include "Basics/tri-strings.h"
#include "Logger/Logger.h" #include "Logger/Logger.h"
@ -31,14 +33,17 @@
#include "V8/v8-utils.h" #include "V8/v8-utils.h"
using namespace arangodb; using namespace arangodb;
using namespace arangodb;
namespace {
static arangodb::Mutex singletonMutex;
static arangodb::V8LineEditor* singleton = nullptr;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief the active instance of the editor /// @brief the active instance of the editor
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static std::atomic<V8LineEditor*> SINGLETON(nullptr);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief signal handler for CTRL-C /// @brief signal handler for CTRL-C
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -53,7 +58,8 @@ static bool SignalHandler(DWORD eventType) {
case CTRL_LOGOFF_EVENT: case CTRL_LOGOFF_EVENT:
case CTRL_SHUTDOWN_EVENT: { case CTRL_SHUTDOWN_EVENT: {
// get the instance of the console // get the instance of the console
auto instance = SINGLETON.load(); MUTEX_LOCKER(mutex, ::singletonMutex);
auto instance = ::singleton;
if (instance != nullptr) { if (instance != nullptr) {
if (instance->isExecutingCommand()) { if (instance->isExecutingCommand()) {
@ -77,7 +83,8 @@ static bool SignalHandler(DWORD eventType) {
static void SignalHandler(int /*signal*/) { static void SignalHandler(int /*signal*/) {
// get the instance of the console // get the instance of the console
auto instance = SINGLETON.load(); MUTEX_LOCKER(mutex, ::singletonMutex);
auto instance = ::singleton;
if (instance != nullptr) { if (instance != nullptr) {
if (instance->isExecutingCommand()) { if (instance->isExecutingCommand()) {
@ -375,8 +382,12 @@ V8LineEditor::V8LineEditor(v8::Isolate* isolate,
_context(context), _context(context),
_executingCommand(false) { _executingCommand(false) {
// register global instance // register global instance
TRI_ASSERT(SINGLETON.load() == nullptr);
SINGLETON.store(this); {
MUTEX_LOCKER(mutex, ::singletonMutex);
TRI_ASSERT(::singleton == nullptr);
::singleton = this;
}
// create shell // create shell
_shell = ShellBase::buildShell(history, new V8Completer()); _shell = ShellBase::buildShell(history, new V8Completer());
@ -409,6 +420,7 @@ V8LineEditor::V8LineEditor(v8::Isolate* isolate,
V8LineEditor::~V8LineEditor() { V8LineEditor::~V8LineEditor() {
// unregister global instance // unregister global instance
TRI_ASSERT(SINGLETON.load() != nullptr); MUTEX_LOCKER(mutex, ::singletonMutex);
SINGLETON.store(nullptr); TRI_ASSERT(::singleton != nullptr);
::singleton = nullptr;
} }