1
0
Fork 0
arangodb/arangod/Network/Methods.cpp

385 lines
14 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018-2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "Methods.h"
#include "Agency/AgencyFeature.h"
#include "Agency/Agent.h"
#include "Basics/Common.h"
#include "Basics/HybridLogicalClock.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "Logger/LogMacros.h"
#include "Network/ConnectionPool.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include <fuerte/connection.h>
#include <fuerte/requests.h>
#include <fuerte/types.h>
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
namespace arangodb {
namespace network {
using namespace arangodb::fuerte;
using PromiseRes = arangodb::futures::Promise<network::Response>;
/// @brief shardId or empty
std::string Response::destinationShard() const {
if (this->destination.size() > 6 && this->destination.compare(0, 6, "shard:", 6) == 0) {
return this->destination.substr(6);
}
return StaticStrings::Empty;
}
std::string Response::serverId() const {
if (this->destination.size() > 7 && this->destination.compare(0, 7, "server:", 7) == 0) {
return this->destination.substr(7);
}
return StaticStrings::Empty;
}
auto prepareRequest(RestVerb type, std::string path, VPackBufferUInt8 payload,
RequestOptions const& options, Headers headers) {
auto req = fuerte::createRequest(type, path, options.parameters, std::move(payload));
req->header.database = options.database;
req->header.setMeta(std::move(headers));
if (!options.contentType.empty()) {
req->header.contentType(options.contentType);
}
if (!options.acceptType.empty()) {
req->header.acceptType(options.acceptType);
}
TRI_voc_tick_t timeStamp = TRI_HybridLogicalClock();
req->header.addMeta(StaticStrings::HLCHeader,
arangodb::basics::HybridLogicalClock::encodeTimeStamp(timeStamp));
req->timeout(std::chrono::duration_cast<std::chrono::milliseconds>(options.timeout));
auto state = ServerState::instance();
if (state->isCoordinator() || state->isDBServer()) {
req->header.addMeta(StaticStrings::ClusterCommSource, state->getId());
} else if (state->isAgent()) {
auto agent = AgencyFeature::AGENT;
if (agent != nullptr) {
req->header.addMeta(StaticStrings::ClusterCommSource, "AGENT-" + agent->id());
}
}
return req;
}
/// @brief send a request to a given destination
FutureRes sendRequest(ConnectionPool* pool, DestinationId dest, RestVerb type,
std::string path, velocypack::Buffer<uint8_t> payload,
RequestOptions const& options, Headers headers) {
// FIXME build future.reset(..)
if (!pool || !pool->config().clusterInfo) {
LOG_TOPIC("59b95", ERR, Logger::COMMUNICATION)
<< "connection pool unavailable";
return futures::makeFuture(Response{std::move(dest), Error::Canceled, nullptr});
}
LOG_TOPIC("2713a", DEBUG, Logger::COMMUNICATION)
<< "request to '" << dest
<< "' '" << fuerte::to_string(type) << " " << path << "'";
arangodb::network::EndpointSpec spec;
int res = resolveDestination(*pool->config().clusterInfo, dest, spec);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
return futures::makeFuture(Response{std::move(dest), Error::Canceled, nullptr});
}
TRI_ASSERT(!spec.endpoint.empty());
auto req = prepareRequest(type, std::move(path), std::move(payload),
options, std::move(headers));
struct Pack {
DestinationId dest;
futures::Promise<network::Response> promise;
std::unique_ptr<fuerte::Response> tmp;
bool skipScheduler;
Pack(DestinationId&& dest, bool skip)
: dest(std::move(dest)), promise(), skipScheduler(skip) {}
};
// fits in SSO of std::function
static_assert(sizeof(std::shared_ptr<Pack>) <= 2 * sizeof(void*), "");
auto conn = pool->leaseConnection(spec.endpoint);
auto p = std::make_shared<Pack>(std::move(dest), options.skipScheduler);
FutureRes f = p->promise.getFuture();
conn->sendRequest(std::move(req), [p(std::move(p))](fuerte::Error err,
std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
Scheduler* sch = SchedulerFeature::SCHEDULER;
if (p->skipScheduler || sch == nullptr) {
p->promise.setValue(network::Response{std::move(p->dest), err, std::move(res)});
return;
}
p->tmp = std::move(res);
bool queued = sch->queue(RequestLane::CLUSTER_INTERNAL, [p, err]() {
p->promise.setValue(Response{std::move(p->dest), err, std::move(p->tmp)});
});
if (ADB_UNLIKELY(!queued)) {
p->promise.setValue(Response{std::move(p->dest), fuerte::Error::Canceled, nullptr});
}
});
return f;
}
/// Handler class with enough information to keep retrying
/// a request until an overall timeout is hit (or the request succeeds)
class RequestsState final : public std::enable_shared_from_this<RequestsState> {
public:
RequestsState(ConnectionPool* pool, DestinationId&& destination, RestVerb type,
std::string&& path, velocypack::Buffer<uint8_t>&& payload,
Headers&& headers, RequestOptions const& options)
: _payload(std::move(payload)),
_destination(std::move(destination)),
_path(std::move(path)),
_headers(std::move(headers)),
_options(options),
_pool(pool),
_type(type),
_workItem(nullptr),
_response(nullptr),
_promise(),
_startTime(std::chrono::steady_clock::now()),
_endTime(_startTime + std::chrono::duration_cast<std::chrono::steady_clock::duration>(
options.timeout)) {}
~RequestsState() = default;
private:
velocypack::Buffer<uint8_t> _payload;
DestinationId _destination;
std::string _path;
Headers _headers;
RequestOptions const _options;
ConnectionPool* _pool;
RestVerb _type;
std::shared_ptr<arangodb::Scheduler::WorkItem> _workItem;
std::unique_ptr<fuerte::Response> _response; /// temporary response
futures::Promise<network::Response> _promise; /// promise called
std::chrono::steady_clock::time_point const _startTime;
std::chrono::steady_clock::time_point const _endTime;
public:
FutureRes future() { return _promise.getFuture(); }
// scheduler requests that are due
void startRequest() {
auto now = std::chrono::steady_clock::now();
if (now > _endTime || _pool->config().clusterInfo->server().isStopping()) {
callResponse(Error::Timeout, nullptr);
return; // we are done
}
arangodb::network::EndpointSpec spec;
int res = resolveDestination(*_pool->config().clusterInfo, _destination, spec);
if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work
callResponse(Error::Canceled, nullptr);
return;
}
if (!_pool) {
LOG_TOPIC("5949f", ERR, Logger::COMMUNICATION)
<< "connection pool unavailable";
callResponse(Error::Canceled, nullptr);
return;
}
auto localOptions = _options;
localOptions.timeout =
std::chrono::duration_cast<std::chrono::milliseconds>(_endTime - now);
TRI_ASSERT(localOptions.timeout.count() > 0);
auto conn = _pool->leaseConnection(spec.endpoint);
auto req = prepareRequest(_type, _path, _payload, localOptions, _headers);
conn->sendRequest(std::move(req),
[self = shared_from_this()](fuerte::Error err,
std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
self->handleResponse(err, std::move(req), std::move(res));
});
}
private:
void handleResponse(fuerte::Error err, std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
switch (err) {
case fuerte::Error::NoError: {
TRI_ASSERT(res);
if (checkResponse(err, req, res)) {
break;
}
[[fallthrough]];
}
case fuerte::Error::CouldNotConnect:
case fuerte::Error::ConnectionClosed:
case fuerte::Error::Timeout:
case fuerte::Error::Canceled: {
// Note that this case includes the refusal of a leader to accept
// the operation, in which case we have to flush ClusterInfo:
auto const now = std::chrono::steady_clock::now();
auto tryAgainAfter = now - _startTime;
if (tryAgainAfter < std::chrono::milliseconds(200)) {
tryAgainAfter = std::chrono::milliseconds(200);
} else if (tryAgainAfter > std::chrono::seconds(3)) {
tryAgainAfter = std::chrono::seconds(3);
}
if ((now + tryAgainAfter) >= _endTime) { // cancel out
callResponse(err, std::move(res));
} else {
retryLater(tryAgainAfter);
}
break;
}
default: // a "proper error" which has to be returned to the client
callResponse(err, std::move(res));
break;
}
}
bool checkResponse(fuerte::Error err,
std::unique_ptr<fuerte::Request>& req,
std::unique_ptr<fuerte::Response>& res) {
switch (res->statusCode()) {
case fuerte::StatusOK:
case fuerte::StatusCreated:
case fuerte::StatusAccepted:
case fuerte::StatusNoContent:
callResponse(Error::NoError, std::move(res));
return true; // done
case fuerte::StatusUnavailable:
return false; // goto retry
case fuerte::StatusNotFound:
if (_options.retryNotFound &&
TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == network::errorCodeFromBody(res->slice())) {
return false; // goto retry
}
[[fallthrough]];
default: // a "proper error" which has to be returned to the client
callResponse(err, std::move(res));
return true; // done
}
}
/// @broef schedule calling the response promise
void callResponse(Error err, std::unique_ptr<fuerte::Response> res) {
LOG_TOPIC_IF("2713d", DEBUG, Logger::COMMUNICATION, err != fuerte::Error::NoError)
<< "error on request to '" << _destination
<< "' '" << fuerte::to_string(_type) << " " << _path
<< "' '" << fuerte::to_string(err) << "'";
Scheduler* sch = SchedulerFeature::SCHEDULER;
if (_options.skipScheduler || sch == nullptr) {
_promise.setValue(Response{std::move(_destination), err, std::move(res)});
return;
}
_response = std::move(res);
bool queued =
sch->queue(RequestLane::CLUSTER_INTERNAL, [self = shared_from_this(), err]() {
self->_promise.setValue(Response{std::move(self->_destination), err,
std::move(self->_response)});
});
if (ADB_UNLIKELY(!queued)) {
_promise.setValue(Response{std::move(_destination), fuerte::Error::QueueCapacityExceeded, nullptr});
}
}
void retryLater(std::chrono::steady_clock::duration tryAgainAfter) {
LOG_TOPIC("2713e", DEBUG, Logger::COMMUNICATION)
<< "retry request to '" << _destination
<< "' '" << fuerte::to_string(_type) << " " << _path << "'";
auto* sch = SchedulerFeature::SCHEDULER;
if (ADB_UNLIKELY(sch == nullptr)) {
_promise.setValue(Response{std::move(_destination), fuerte::Error::Canceled, nullptr});
return;
}
bool queued;
std::tie(queued, _workItem) =
sch->queueDelay(RequestLane::CLUSTER_INTERNAL, tryAgainAfter,
[self = shared_from_this()](bool canceled) {
if (canceled) {
self->_promise.setValue(Response{self->_destination, Error::Canceled, nullptr});
} else {
self->startRequest();
}
});
if (ADB_UNLIKELY(!queued)) {
// scheduler queue is full, cannot requeue
_promise.setValue(Response{std::move(_destination), Error::QueueCapacityExceeded, nullptr});
}
}
};
/// @brief send a request to a given destination, retry until timeout is exceeded
FutureRes sendRequestRetry(ConnectionPool* pool, DestinationId destination,
arangodb::fuerte::RestVerb type, std::string path,
velocypack::Buffer<uint8_t> payload,
RequestOptions const& options,
Headers headers) {
if (!pool || !pool->config().clusterInfo) {
LOG_TOPIC("59b96", ERR, Logger::COMMUNICATION)
<< "connection pool unavailable";
return futures::makeFuture(Response{destination, Error::Canceled, nullptr});
}
LOG_TOPIC("2713b", DEBUG, Logger::COMMUNICATION)
<< "request to '" << destination
<< "' '" << fuerte::to_string(type) << " " << path << "'";
// auto req = prepareRequest(type, path, std::move(payload), timeout, headers);
auto rs = std::make_shared<RequestsState>(pool, std::move(destination),
type, std::move(path),
std::move(payload),
std::move(headers), options);
rs->startRequest(); // will auto reference itself
return rs->future();
}
} // namespace network
} // namespace arangodb