1
0
Fork 0

Added collectAll, updated fuerte (#7949)

This commit is contained in:
Simon 2019-01-16 11:31:08 +01:00 committed by Jan
parent afa0889dbd
commit f748aee240
23 changed files with 247 additions and 232 deletions

View File

@ -84,6 +84,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief cancel the connection, unusable afterwards
virtual void cancel() = 0;
/// @brief endpoint we are connected to
std::string endpoint() const;
protected:
@ -117,7 +118,10 @@ class ConnectionBuilder {
/// @brief takes url in the form (http|vst)[s]://(ip|hostname):port
/// also supports the syntax "http+tcp://", "http+unix://" etc
ConnectionBuilder& endpoint(std::string const&);
ConnectionBuilder& endpoint(std::string const& spec);
/// @brief get the normalized endpoint
std::string normalizedEndpoint() const;
// Create an connection and start opening it.
std::shared_ptr<Connection> connect(EventLoopService& eventLoopService);
@ -163,6 +167,12 @@ class ConnectionBuilder {
return *this;
}*/
/// @brief tcp, ssl or unix
inline SocketType socketType() const { return _conf._socketType; }
/// @brief protocol typr
inline ProtocolType protocolType() const { return _conf._protocolType; }
void protocolType(ProtocolType pt) { _conf._protocolType = pt; }
// Set the VST version to use (VST only)
inline vst::VSTVersion vstVersion() const { return _conf._vstVersion; }
ConnectionBuilder& vstVersion(vst::VSTVersion c) {

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2018-2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
@ -30,7 +30,6 @@
#include <fuerte/asio_ns.h>
#include <fuerte/types.h>
#include <boost/optional.hpp>
#include <velocypack/Buffer.h>
#include <velocypack/Builder.h>
@ -56,6 +55,8 @@ public:
// Header metadata helpers
void addMeta(std::string const& key, std::string const& value);
void addMeta(StringMap const&);
// Get value for header metadata key, returns empty string if not found.
std::string const& metaByKey(std::string const& key) const;
@ -117,19 +118,6 @@ private:
MessageType _responseType = MessageType::Response;
};
/*
struct AuthHeader : public MessageHeader {
/// Authentication: encryption field
AuthenticationType authType = AuthenticationType::None;
/// Authentication: username
std::string user;
/// Authentication: password
std::string password;
/// Authentication: JWT token
std::string token;
};*/
// Message is base class for message being send to (Request) or
// from (Response) a server.
class Message {
@ -145,7 +133,7 @@ class Message {
///////////////////////////////////////////////
// get payload
///////////////////////////////////////////////
virtual std::vector<velocypack::Slice> const& slices() = 0;
virtual std::vector<velocypack::Slice> slices() const = 0;
virtual asio_ns::const_buffer payload() const = 0;
virtual size_t payloadSize() const = 0;
std::string payloadAsString() const {
@ -154,6 +142,15 @@ class Message {
asio_ns::buffer_size(p));
}
/// get the content as a slice
velocypack::Slice slice() {
auto slices = this->slices();
if (!slices.empty()) {
return slices[0];
}
return velocypack::Slice::noneSlice();
}
// content-type header accessors
std::string contentTypeString() const;
ContentType contentType() const;
@ -166,20 +163,12 @@ class Request final : public Message {
Request(RequestHeader&& messageHeader = RequestHeader())
: header(std::move(messageHeader)),
_sealed(false),
_modified(true),
_isVpack(boost::none),
_builder(nullptr),
_payloadLength(0),
_isVPack(false),
_timeout(defaultTimeout) {}
Request(RequestHeader const& messageHeader)
: header(messageHeader),
_sealed(false),
_modified(true),
_isVpack(boost::none),
_builder(nullptr),
_payloadLength(0),
_isVPack(false),
_timeout(defaultTimeout) {}
/// @brief request header
@ -203,7 +192,6 @@ class Request final : public Message {
void addVPack(velocypack::Buffer<uint8_t> const& buffer);
void addVPack(velocypack::Buffer<uint8_t>&& buffer);
void addBinary(uint8_t const* data, std::size_t length);
void addBinarySingle(velocypack::Buffer<uint8_t>&& buffer);
///////////////////////////////////////////////
// get payload
@ -211,7 +199,7 @@ class Request final : public Message {
/// @brief get velocypack slices contained in request
/// only valid iff the data was added via addVPack
std::vector<velocypack::Slice> const& slices() override;
std::vector<velocypack::Slice> slices() const override;
asio_ns::const_buffer payload() const override;
size_t payloadSize() const override;
@ -222,14 +210,7 @@ class Request final : public Message {
private:
velocypack::Buffer<uint8_t> _payload;
bool _sealed;
bool _modified;
::boost::optional<bool> _isVpack;
/// used to by addVPack to build a requst buffer
std::shared_ptr<velocypack::Builder> _builder;
std::vector<velocypack::Slice> _slices;
std::size_t _payloadLength; // because VPackBuffer has quirks we need
// to track the Length manually
bool _isVPack;
std::chrono::milliseconds _timeout;
};
@ -248,7 +229,7 @@ class Response final : public Message {
// get / check status
///////////////////////////////////////////////
// statusCode returns the (HTTP) status code for the request (400==OK).
// statusCode returns the (HTTP) status code for the request (200==OK).
StatusCode statusCode() { return header.responseCode; }
// checkStatus returns true if the statusCode equals one of the given valid
// code, false otherwise.
@ -276,16 +257,17 @@ class Response final : public Message {
bool isContentTypeHtml() const;
bool isContentTypeText() const;
/// @brief validates and returns VPack response. Only valid for velocypack
std::vector<velocypack::Slice> const& slices() override;
std::vector<velocypack::Slice> slices() const override;
asio_ns::const_buffer payload() const override;
size_t payloadSize() const override;
std::shared_ptr<velocypack::Buffer<uint8_t>> copyPayload() const;
void setPayload(velocypack::Buffer<uint8_t>&& buffer, size_t payloadOffset);
/// @brief move in the payload
void setPayload(velocypack::Buffer<uint8_t> buffer, size_t payloadOffset);
private:
velocypack::Buffer<uint8_t> _payload;
size_t _payloadOffset;
std::vector<velocypack::Slice> _slices;
};
}}} // namespace arangodb::fuerte::v1
#endif

View File

@ -52,7 +52,7 @@ std::unique_ptr<Request> createRequest(RestVerb const& verb,
// For User
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameter,
velocypack::Buffer<uint8_t>&& payload);
velocypack::Buffer<uint8_t> payload);
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameter,

View File

@ -43,12 +43,17 @@ StatusCode constexpr StatusUndefined = 0;
StatusCode constexpr StatusOK = 200;
StatusCode constexpr StatusCreated = 201;
StatusCode constexpr StatusAccepted = 202;
StatusCode constexpr StatusPartial = 203;
StatusCode constexpr StatusNoContent = 204;
StatusCode constexpr StatusBadRequest = 400;
StatusCode constexpr StatusUnauthorized = 401;
StatusCode constexpr StatusForbidden = 403;
StatusCode constexpr StatusNotFound = 404;
StatusCode constexpr StatusMethodNotAllowed = 405;
StatusCode constexpr StatusConflict = 409;
StatusCode constexpr StatusPreconditionFailed = 412;
StatusCode constexpr StatusInternalError = 500;
StatusCode constexpr StatusUnavailable = 505;
// RequestCallback is called for finished connection requests.
// If the given Error is zero, the request succeeded, otherwise an error
@ -84,7 +89,6 @@ enum class ErrorCondition : Error {
WriteError = 1103,
Canceled = 1104,
MalformedURL = 1105,
ProtocolError = 3000,
};

View File

@ -89,7 +89,7 @@ void parseSchema( std::string const& schema,
conf._socketType = SocketType::Ssl;
} else if (socket == "unix") {
conf._socketType = SocketType::Unix;
} else {
} else if (conf._socketType == SocketType::Undefined) {
throw std::runtime_error(std::string("invalid socket type: ") + proto);
}
@ -97,7 +97,7 @@ void parseSchema( std::string const& schema,
conf._protocolType = ProtocolType::Vst;
} else if (proto == "http") {
conf._protocolType = ProtocolType::Http;
} else {
} else if (conf._protocolType == ProtocolType::Undefined) {
throw std::runtime_error(std::string("invalid protocol: ") + proto);
}
@ -117,49 +117,75 @@ void parseSchema( std::string const& schema,
} else if (proto == "unix") {
conf._socketType = SocketType::Unix;
conf._protocolType = ProtocolType::Http;
} else {
throw std::runtime_error(std::string("invalid protocol: ") + proto);
} else if (conf._socketType == SocketType::Undefined ||
conf._protocolType == ProtocolType::Undefined) {
throw std::runtime_error(std::string("invalid schema: ") + proto);
}
}
}
ConnectionBuilder& ConnectionBuilder::endpoint(std::string const& host) {
ConnectionBuilder& ConnectionBuilder::endpoint(std::string const& spec) {
// we need to handle unix:// urls seperately
size_t pos = host.find("://");
size_t pos = spec.find("://");
if (pos == std::string::npos) {
throw std::runtime_error(std::string("invalid endpoint spec: ") + host);
throw std::runtime_error(std::string("invalid endpoint spec: ") + spec);
}
std::string schema = host.substr(0, pos);
std::string schema = spec.substr(0, pos);
boost::algorithm::to_lower(schema); // in-place
parseSchema(schema, _conf);
if (_conf._socketType == SocketType::Unix) {
// unix:///a/b/c does not contain a port
_conf._host = host.substr(pos + 3);
_conf._host = spec.substr(pos + 3);
return *this;
}
// now lets perform proper URL parsing
struct http_parser_url parsed;
http_parser_url_init(&parsed);
int error = http_parser_parse_url(host.c_str(), host.length(), 0, &parsed);
int error = http_parser_parse_url(spec.c_str(), spec.length(), 0, &parsed);
if (error != 0) {
throw std::runtime_error(std::string("invalid endpoint spec: ") + host);
throw std::runtime_error(std::string("invalid endpoint spec: ") + spec);
}
// put hostname, port and path in seperate strings
if (!(parsed.field_set & (1 << UF_HOST))) {
throw std::runtime_error(std::string("invalid host: ") + host);
throw std::runtime_error(std::string("invalid host: ") + spec);
}
_conf._host = host.substr(parsed.field_data[UF_HOST].off,
_conf._host = spec.substr(parsed.field_data[UF_HOST].off,
parsed.field_data[UF_HOST].len);
if (!(parsed.field_set & (1 << UF_PORT))) {
throw std::runtime_error(std::string("invalid port: ") + host);
throw std::runtime_error(std::string("invalid port: ") + spec);
}
_conf._port = host.substr(parsed.field_data[UF_PORT].off,
_conf._port = spec.substr(parsed.field_data[UF_PORT].off,
parsed.field_data[UF_PORT].len);
return *this;
}
/// @brief get the normalized endpoint
std::string ConnectionBuilder::normalizedEndpoint() const {
std::string endpoint;
if (ProtocolType::Http == _conf._protocolType) {
endpoint.append("http+");
} else if (ProtocolType::Vst == _conf._protocolType) {
endpoint.append("vst+");
}
if (SocketType::Tcp == _conf._socketType) {
endpoint.append("tcp://");
} else if (SocketType::Ssl == _conf._socketType) {
endpoint.append("ssl://");
} else if (SocketType::Unix == _conf._socketType) {
endpoint.append("unix://");
}
endpoint.append(_conf._host);
endpoint.push_back(':');
endpoint.append(_conf._port);
return endpoint;
}
}}} // namespace arangodb::fuerte::v1

View File

@ -37,6 +37,12 @@ void MessageHeader::addMeta(std::string const& key, std::string const& value) {
meta.emplace(key, value);
}
void MessageHeader::addMeta(StringMap const& map) {
for(auto& pair : map) {
meta.insert(pair);
}
}
// Get value for header metadata key, returns empty string if not found.
std::string const& MessageHeader::metaByKey(std::string const& key) const {
static std::string emptyString("");
@ -157,28 +163,17 @@ std::string Request::acceptTypeString() const {
ContentType Request::acceptType() const { return header.acceptType(); }
//// add payload
// add VelocyPackData
//// add payload add VelocyPackData
void Request::addVPack(VPackSlice const& slice) {
#ifdef FUERTE_CHECKED_MODE
// FUERTE_LOG_ERROR << "Checking data that is added to the message: " <<
// std::endl;
vst::parser::validateAndCount(slice.start(), slice.byteSize());
#endif
if (_sealed || (_isVpack && !_isVpack.get())) {
throw std::logic_error("Message is sealed or of wrong type (vst/binary)");
};
if (!_builder) {
_builder = std::make_shared<VPackBuilder>(_payload);
}
header.contentType(ContentType::VPack);
_isVpack = true;
_modified = true;
_builder->add(slice);
_payloadLength += slice.byteSize();
_payload.resetTo(_payloadLength);
_isVPack = _isVPack || _payload.empty();
_payload.append(slice.start(), slice.byteSize());
}
void Request::addVPack(VPackBuffer<uint8_t> const& buffer) {
@ -187,32 +182,9 @@ void Request::addVPack(VPackBuffer<uint8_t> const& buffer) {
// std::endl;
vst::parser::validateAndCount(buffer.data(), buffer.byteSize());
#endif
if (_sealed || (_isVpack && !_isVpack.get())) {
throw std::logic_error("Message is sealed or of wrong type (vst/binary)");
};
_isVpack = true;
header.contentType(ContentType::VPack);
_modified = true;
_modified = true;
auto length = buffer.byteSize();
auto cursor = buffer.data();
if (!_builder) {
_builder = std::make_shared<VPackBuilder>(_payload);
}
while (length) {
VPackSlice slice(cursor);
_builder->add(slice);
auto sliceSize = _slices.back().byteSize();
if (length < sliceSize) {
throw std::logic_error("invalid buffer");
}
cursor += sliceSize;
length -= sliceSize;
_payloadLength += sliceSize;
_payload.resetTo(_payloadLength);
}
_isVPack = _isVPack || _payload.empty();
_payload.append(buffer);
}
void Request::addVPack(VPackBuffer<uint8_t>&& buffer) {
@ -221,68 +193,42 @@ void Request::addVPack(VPackBuffer<uint8_t>&& buffer) {
// std::endl;
vst::parser::validateAndCount(buffer.data(), buffer.byteSize());
#endif
if (_sealed || (_isVpack && !_isVpack.get())) {
throw std::logic_error("Message is sealed or of wrong type (vst/binary)");
};
header.contentType(ContentType::VPack);
_isVpack = true;
_sealed = true;
_modified = true;
_payloadLength += buffer.byteSize();
_isVPack = _isVPack || _payload.empty();
_payload = std::move(buffer);
_payload.resetTo(_payloadLength);
}
// add binary data
void Request::addBinary(uint8_t const* data, std::size_t length) {
if (_sealed || (_isVpack && _isVpack.get())) {
return;
};
_isVpack = false;
_modified = true;
_payloadLength += length;
_payload.append(data, length); // TODO reset to!!! FIXME
_payload.resetTo(_payloadLength);
}
void Request::addBinarySingle(VPackBuffer<uint8_t>&& buffer) {
if (_sealed || (_isVpack && _isVpack.get())) {
return;
};
_isVpack = false;
_sealed = true;
_modified = true;
_payloadLength += buffer.byteSize();
_payload = std::move(buffer);
_payload.resetTo(_payloadLength);
_isVPack = false; // should cause slices() to not return garbage
_payload.append(data, length);
}
// get payload as slices
std::vector<VPackSlice> const& Request::slices() {
if (_isVpack && _modified) {
_slices.clear();
std::vector<VPackSlice> Request::slices() const {
std::vector<VPackSlice> slices;
if (_isVPack) {
auto length = _payload.byteSize();
auto cursor = _payload.data();
while (length) {
_slices.emplace_back(cursor);
auto sliceSize = _slices.back().byteSize();
slices.emplace_back(cursor);
auto sliceSize = slices.back().byteSize();
if (length < sliceSize) {
throw std::logic_error("invalid buffer");
}
cursor += sliceSize;
length -= sliceSize;
}
_modified = false;
}
return _slices;
return slices;
}
// get payload as binary
asio_ns::const_buffer Request::payload() const {
return asio_ns::const_buffer(_payload.data(), _payloadLength);
return asio_ns::const_buffer(_payload.data(), _payload.byteSize());
}
size_t Request::payloadSize() const { return _payloadLength; }
size_t Request::payloadSize() const { return _payload.byteSize(); }
///////////////////////////////////////////////
// class Response
@ -304,9 +250,9 @@ bool Response::isContentTypeText() const {
return (header.contentType() == ContentType::Text);
}
std::vector<VPackSlice> const& Response::slices() {
if (_slices.empty()) {
assert(isContentTypeVPack());
std::vector<VPackSlice> Response::slices() const {
std::vector<VPackSlice> slices;
if (isContentTypeVPack()) {
VPackValidator validator;
auto length = _payload.byteSize() - _payloadOffset;
@ -315,8 +261,8 @@ std::vector<VPackSlice> const& Response::slices() {
// will throw on an error
validator.validate(cursor, length, true);
_slices.emplace_back(cursor);
auto sliceSize = _slices.back().byteSize();
slices.emplace_back(cursor);
auto sliceSize = slices.back().byteSize();
if (length < sliceSize) {
throw std::logic_error("invalid buffer");
}
@ -324,7 +270,7 @@ std::vector<VPackSlice> const& Response::slices() {
length -= sliceSize;
}
}
return _slices;
return slices;
}
asio_ns::const_buffer Response::payload() const {
@ -336,8 +282,14 @@ size_t Response::payloadSize() const {
return _payload.byteSize() - _payloadOffset;
}
void Response::setPayload(VPackBuffer<uint8_t>&& buffer, size_t payloadOffset) {
_slices.clear();
std::shared_ptr<velocypack::Buffer<uint8_t>> Response::copyPayload() const {
auto buffer = std::make_shared<velocypack::Buffer<uint8_t>>();
buffer->append(_payload.data() + _payloadOffset,
_payload.byteSize() - _payloadOffset);
return buffer;
}
void Response::setPayload(VPackBuffer<uint8_t> buffer, size_t payloadOffset) {
_payloadOffset = payloadOffset;
_payload = std::move(buffer);
}

View File

@ -38,12 +38,10 @@ std::unique_ptr<Request> createRequest(RestVerb const& verb,
// For User
std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
StringMap const& parameters,
VPackBuffer<uint8_t>&& payload) {
VPackBuffer<uint8_t> payload) {
auto request = createRequest(verb, ContentType::VPack);
request->header.path = path;
request->header.parameters = parameters;
request->header.contentType(ContentType::VPack);
request->header.acceptType(ContentType::VPack);
request->addVPack(std::move(payload));
return request;
}
@ -54,8 +52,6 @@ std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
auto request = createRequest(verb, ContentType::VPack);
request->header.path = path;
request->header.parameters = parameters;
request->header.contentType(ContentType::VPack);
request->header.acceptType(ContentType::VPack);
request->addVPack(payload);
return request;
}
@ -65,7 +61,6 @@ std::unique_ptr<Request> createRequest(RestVerb verb, std::string const& path,
auto request = createRequest(verb, ContentType::VPack);
request->header.path = path;
request->header.parameters = parameters;
request->header.acceptType(ContentType::VPack);
return request;
}
}}} // namespace arangodb::fuerte::v1

View File

@ -205,7 +205,6 @@ ErrorCondition intToError(Error integral) {
1102, // VstReadError
1103, // VstWriteError
1104, // CancelledDuringReset
1105, // MalformedURL
3000, // CurlError
};
auto pos = std::find(valid.begin(), valid.end(), integral);
@ -242,8 +241,6 @@ std::string to_string(ErrorCondition error) {
return "Error while writing ";
case ErrorCondition::Canceled:
return "Connection was locally canceled";
case ErrorCondition::MalformedURL:
return "Error malformed URL";
case ErrorCondition::ProtocolError:
return "Error: invalid server response";

View File

@ -171,7 +171,7 @@ void SchedulerFeature::start() {
}
TRI_ASSERT(2 <= _nrMinimalThreads);
TRI_ASSERT(_nrMinimalThreads < _nrMaximalThreads);
TRI_ASSERT(_nrMinimalThreads <= _nrMaximalThreads);
signalStuffInit();

View File

@ -50,7 +50,6 @@
#include "Basics/Utf8Helper.h"
#include "Basics/conversions.h"
#include "Basics/tri-strings.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/AuthenticationFeature.h"

View File

@ -886,7 +886,7 @@ Result LogicalCollection::truncate(transaction::Methods& trx, OperationOptions&
Result LogicalCollection::insert(transaction::Methods* trx, VPackSlice const slice,
ManagedDocumentResult& result, OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_tick_t& revisionId, KeyLockInfo* keyLockInfo,
TRI_voc_rid_t& revisionId, KeyLockInfo* keyLockInfo,
std::function<Result(void)> callbackDuringLock) {
TRI_IF_FAILURE("LogicalCollection::insert") {
return Result(TRI_ERROR_DEBUG);

View File

@ -151,7 +151,10 @@ class LogicalCollection : public LogicalDataSource {
TRI_voc_rid_t revision(transaction::Methods*) const;
bool waitForSync() const;
bool isSmart() const;
/// @brief is this a cluster-wide Plan (ClusterInfo) collection
bool isAStub() const { return _isAStub; }
/// @brief is this a cluster-wide Plan (ClusterInfo) collection
bool isClusterGlobal() const { return _isAStub; }
void waitForSync(bool value) { _waitForSync = value; }
@ -274,7 +277,7 @@ class LogicalCollection : public LogicalDataSource {
Result insert(transaction::Methods* trx, velocypack::Slice const slice,
ManagedDocumentResult& result, OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock) {
TRI_voc_tick_t unused;
TRI_voc_rid_t unused;
return insert(trx, slice, result, options, resultMarkerTick, lock, unused,
nullptr, nullptr);
}
@ -287,7 +290,7 @@ class LogicalCollection : public LogicalDataSource {
Result insert(transaction::Methods* trx, velocypack::Slice slice,
ManagedDocumentResult& result, OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_tick_t& revisionId, KeyLockInfo* keyLockInfo,
TRI_voc_rid_t& revisionId, KeyLockInfo* keyLockInfo,
std::function<Result(void)> callbackDuringLock);
Result update(transaction::Methods*, velocypack::Slice,

View File

@ -25,7 +25,6 @@
#include "Agency/AgencyComm.h"
#include "Basics/StringUtils.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Rest/Version.h"

View File

@ -28,7 +28,6 @@
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/files.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"

View File

@ -102,13 +102,17 @@ void V8ClientConnection::createConnection() {
std::shared_ptr<VPackBuilder> parsedBody;
VPackSlice body;
if (res->contentType() == fuerte::ContentType::VPack) {
body = res->slices()[0];
body = res->slice();
} else {
parsedBody =
VPackParser::fromJson(reinterpret_cast<char const*>(res->payload().data()),
res->payload().size());
body = parsedBody->slice();
}
if (!body.isObject()) {
_lastErrorMessage = "invalid response";
_lastHttpReturnCode = 503;
}
std::string const server =
VelocyPackHelper::getStringValue(body, "server", "");

View File

@ -41,5 +41,10 @@ template class Future<double>;
// arangodb types
template class Future<arangodb::Result>;
template class Future<arangodb::OperationResult>;
/// Make a complete void future
Future<Unit> makeFuture() {
return Future<Unit>(unit);
}
} // namespace futures
} // namespace arangodb

View File

@ -24,7 +24,6 @@
#define ARANGOD_FUTURES_FUTURE_H 1
#include <chrono>
#include <future>
#include <thread>
#include "Futures/Exceptions.h"
@ -45,7 +44,6 @@ class Promise;
template <typename T>
struct isFuture {
static constexpr bool value = false;
// typedef T inner;
typedef typename lift_unit<T>::type inner;
};
@ -129,6 +127,13 @@ using decay_t = typename decay<T>::type;
struct EmptyConstructor {};
} // namespace detail
/// @brief Specifies state of a future as returned by wait_for and wait_until
enum class FutureStatus : uint8_t {
Ready,
Timeout,
Deferred
};
/// Simple Future library based on Facebooks Folly
template <typename T>
class Future {
@ -141,6 +146,9 @@ class Future {
friend Future<Unit> makeFuture();
public:
/// @brief value type of the future
typedef T value_type;
/// @brief Constructs a Future with no shared state.
static Future<T> makeEmpty() { return Future<T>(detail::EmptyConstructor{}); }
@ -254,25 +262,25 @@ class Future {
/// waits for the result, returns if it is not available
/// for the specified timeout duration. Future must be valid
template <class Rep, class Period>
std::future_status wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const {
FutureStatus wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) const {
return wait_until(std::chrono::steady_clock::now() + timeout_duration);
}
/// waits for the result, returns if it is not available until
/// specified time point. Future must be valid
template <class Clock, class Duration>
std::future_status wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) const {
FutureStatus wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) const {
if (isReady()) {
return std::future_status::ready;
return FutureStatus::Ready;
}
std::this_thread::yield();
while (!isReady()) {
if (Clock::now() > timeout_time) {
return std::future_status::timeout;
return FutureStatus::Timeout;
}
std::this_thread::yield();
}
return std::future_status::ready;
return FutureStatus::Ready;
}
/// When this Future has completed, execute func which is a function that
@ -286,8 +294,7 @@ class Future {
///
/// Preconditions:
///
/// - `valid() == true` (else throws
/// std::future_error(std::future_errc::no_state))
/// - `valid() == true` (else throws FutureException(ErrorCode::NoState)))
///
/// Postconditions:
///

View File

@ -34,8 +34,10 @@ Future<T> makeFuture(Try<T>&& t) {
return Future<T>(detail::SharedState<T>::make(std::move(t)));
}
Future<Unit> makeFuture() { return Future<Unit>(unit); }
/// Make a complete void future
Future<Unit> makeFuture();
/// Make a completed Future by moving in a value. e.g.
template <class T>
Future<typename std::decay<T>::type> makeFuture(T&& t) {
return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
@ -73,6 +75,87 @@ typename std::enable_if<!isFuture<R>::value, Future<R>>::type makeFutureWith(F&&
makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
namespace detail {
template <typename F>
void _foreach(F&&, size_t) {}
template <typename F, typename Arg, typename... Args>
void _foreach(F&& f, size_t i, Arg&& arg, Args&&... args) {
f(i, std::forward<Arg>(arg));
_foreach(i + 1, std::forward<F>(f), std::forward<Args>(args)...);
}
template <typename F, typename... Args>
void foreach (F&& f, Args && ... args) {
_foreach(std::forward<F>(f), 0, args...);
}
}; // namespace detail
/// @brief When all the input Futures complete, the returned Future will complete.
/// Errors do not cause early termination; this Future will always succeed
/// after all its Futures have finished (whether successfully or with an
/// error).
/// The Futures are moved in, so your copies are invalid. If you need to
/// chain further from these Futures, use the variant with an output iterator.
/// This function is thread-safe for Futures running on different threads.
/// It will complete in whichever thread the last Future completes in.
/// @return for (Future<T1>, Future<T2>, ...) input is Future<std::tuple<Try<T1>, Try<T2>, ...>>.
//template <typename... Fs>
//Future<std::tuple<Try<typename isFuture<Fs>::inner>...>> collectAll(Fs&&... fs) {
// using Result = std::tuple<Try<typename isFuture<Fs>::inner>...>;
// struct Context {
// ~Context() { p.setValue(std::move(results)); }
// Promise<Result> p;
// Result results;
// };
// auto ctx = std::make_shared<Context>();
//
// detail::foreach (
// [&](auto i, auto&& f) {
// f.then([i, ctx](auto&& t) { std::get<i>(ctx->results) = std::move(t); });
// },
// std::move(fs)...);
// return ctx->p.getFuture();
//}
/// @brief When all the input Futures complete, the returned Future will
/// complete. Errors do not cause early termination; this Future will always
/// succeed after all its Futures have finished (whether successfully or with an
/// error).
/// The Futures are moved in, so your copies are invalid. If you need to
/// chain further from these Futures, use the variant with an output iterator.
/// This function is thread-safe for Futures running on different threads. But
/// if you are doing anything non-trivial after, you will probably want to
/// follow with `via(executor)` because it will complete in whichever thread the
/// last Future completes in.
/// The return type for Future<T> input is a Future<std::vector<Try<T>>>
template <class InputIterator>
Future<std::vector<Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>> collectAll(
InputIterator first, InputIterator last) {
using FT = typename std::iterator_traits<InputIterator>::value_type;
using T = typename FT::value_type;
struct Context {
explicit Context(size_t n) : results(n) {}
~Context() { p.setValue(std::move(results)); }
Promise<std::vector<Try<T>>> p;
std::vector<Try<T>> results;
};
auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
for (size_t i = 0; first != last; ++first, ++i) {
first->thenFinal([i, ctx](auto&& t) { ctx->results[i] = std::move(t); });
}
return ctx->p.getFuture();
}
template <class Collection>
auto collectAll(Collection&& c) -> decltype(collectAll(c.begin(), c.end())) {
return collectAll(c.begin(), c.end());
}
} // namespace futures
} // namespace arangodb
#endif // ARANGOD_FUTURES_UTILITIES_H

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for ClusterComm
/// @brief test suite for Supervision
///
/// @file
///

View File

@ -28,12 +28,12 @@
#include "ApplicationFeatures/GreetingsPhase.h"
#include "Futures/Future.h"
#include "Futures/Utilities.h"
#include "RestServer/FileDescriptorsFeature.h"
#include "Scheduler/SchedulerFeature.h"
#include "Scheduler/Scheduler.h"
#include "catch.hpp"
#include <condition_variable>
#include <mutex>
using namespace arangodb::futures;
namespace {
@ -74,56 +74,6 @@ namespace {
Future<int> onErrorHelperGeneric(const std::exception&) {
return makeFuture(20);
}
/*struct SchedulerTestSetup {
arangodb::application_features::ApplicationServer server;
SchedulerTestSetup() : server(nullptr, nullptr) {
using namespace arangodb::application_features;
std::vector<ApplicationFeature*> features;
features.emplace_back(new GreetingsFeaturePhase(server, false));
features.emplace_back(new arangodb::FileDescriptorsFeature(server));
features.emplace_back(new arangodb::SchedulerFeature(server));
for (auto& f : features) {
ApplicationServer::server->addFeature(f);
}
ApplicationServer::server->setupDependencies(false);
ApplicationServer::setStateUnsafe(ServerState::IN_WAIT);
auto orderedFeatures = server.getOrderedFeatures();
for (auto& f : orderedFeatures) {
f->prepare();
}
for (auto& f : orderedFeatures) {
f->start();
}
}
~SchedulerTestSetup() {
using namespace arangodb::application_features;
ApplicationServer::setStateUnsafe(ServerState::IN_STOP);
auto orderedFeatures = server.getOrderedFeatures();
for (auto& f : orderedFeatures) {
f->beginShutdown();
}
for (auto& f : orderedFeatures) {
f->stop();
}
for (auto& f : orderedFeatures) {
f->unprepare();
}
arangodb::application_features::ApplicationServer::server = nullptr;
}
std::vector<std::unique_ptr<arangodb::application_features::ApplicationFeature*>> features;
};*/
} // namespace

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for ClusterComm
/// @brief test suite for MaintenanceFeature
///
/// @file
///

View File

@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite for ClusterComm
/// @brief test suite for MaintenanceRestHandler
///
/// @file
///