1
0
Fork 0

Merge remote-tracking branch 'origin/devel' into documentation/fix-docublock-export-api

This commit is contained in:
Simran Brucherseifer 2019-11-28 15:11:30 +01:00
commit bdb579db7d
58 changed files with 922 additions and 544 deletions

View File

@ -53,7 +53,8 @@ StatusCode constexpr StatusNotAcceptable = 406;
StatusCode constexpr StatusConflict = 409; StatusCode constexpr StatusConflict = 409;
StatusCode constexpr StatusPreconditionFailed = 412; StatusCode constexpr StatusPreconditionFailed = 412;
StatusCode constexpr StatusInternalError = 500; StatusCode constexpr StatusInternalError = 500;
StatusCode constexpr StatusUnavailable = 505; StatusCode constexpr StatusUnavailable = 503;
StatusCode constexpr StatusVersionNotSupported = 505;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- enum class ErrorCondition // --SECTION-- enum class ErrorCondition

View File

@ -34,21 +34,18 @@ void resolveConnect(detail::ConnectionConfiguration const& config,
asio_ns::ip::tcp::resolver& resolver, asio_ns::ip::tcp::resolver& resolver,
SocketT& socket, SocketT& socket,
F&& done) { F&& done) {
auto cb = [&socket, done = std::forward<F>(done)] auto cb = [&socket, done(std::forward<F>(done))](auto ec, auto it) mutable {
(asio_ns::error_code const& ec, if (ec) { // error in address resolver
asio_ns::ip::tcp::resolver::iterator it) {
if (ec) { // error
done(ec); done(ec);
return; return;
} }
// A successful resolve operation is guaranteed to pass a // A successful resolve operation is guaranteed to pass a
// non-empty range to the handler. // non-empty range to the handler.
auto cb = [done](asio_ns::error_code const& ec, asio_ns::async_connect(socket, it,
asio_ns::ip::tcp::resolver::iterator const&) { [done(std::move(done))](auto ec, auto it) mutable {
done(ec); std::forward<F>(done)(ec);
}; });
asio_ns::async_connect(socket, it, std::move(cb));
}; };
// windows does not like async_resolve // windows does not like async_resolve
@ -114,7 +111,9 @@ struct Socket<fuerte::SocketType::Ssl> {
template<typename F> template<typename F>
void connect(detail::ConnectionConfiguration const& config, F&& done) { void connect(detail::ConnectionConfiguration const& config, F&& done) {
auto cb = [this, &config, done = std::forward<F>(done)](asio_ns::error_code const& ec) { bool verify = config._verifyHost;
resolveConnect(config, resolver, socket.next_layer(),
[=, done(std::forward<F>(done))](auto const& ec) mutable {
if (ec) { if (ec) {
done(ec); done(ec);
return; return;
@ -122,7 +121,7 @@ struct Socket<fuerte::SocketType::Ssl> {
// Perform SSL handshake and verify the remote host's certificate. // Perform SSL handshake and verify the remote host's certificate.
socket.next_layer().set_option(asio_ns::ip::tcp::no_delay(true)); socket.next_layer().set_option(asio_ns::ip::tcp::no_delay(true));
if (config._verifyHost) { if (verify) {
socket.set_verify_mode(asio_ns::ssl::verify_peer); socket.set_verify_mode(asio_ns::ssl::verify_peer);
socket.set_verify_callback(asio_ns::ssl::rfc2818_verification(config._host)); socket.set_verify_callback(asio_ns::ssl::rfc2818_verification(config._host));
} else { } else {
@ -130,9 +129,7 @@ struct Socket<fuerte::SocketType::Ssl> {
} }
socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done)); socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done));
}; });
resolveConnect(config, resolver, socket.next_layer(), std::move(cb));
} }
void shutdown() { void shutdown() {

View File

@ -1,6 +1,24 @@
devel devel
----- -----
* Enable the `parallelize-gather` AQL optimizer rule for certain write queries.
The optimization is turned on by default and can be disabled by setting the
startup option `--query.parallelize-gather-writes` to `false`.
* Shard synchronisation readlock aware of rebootId.
* Bugfix: In an AQL cluster query, when gathering unsorted data in combination
with a LIMIT with non-zero offset, if this offset exactly matches the number
of documents in the first shards consumed, the rest of the documents was not
returned.
* Fixed GET _api/gharial to also include the name property in every returned graph.
This is a consistency fix within the API as all other APIs include the name.
As a work around the returned _key can be used, which is identical to the name.
* Updated arangosync to 0.7.0.
* REMOTE and GATHER no longer make subqueries unsuitable for the * REMOTE and GATHER no longer make subqueries unsuitable for the
`splice-subqueries` optimization. `splice-subqueries` optimization.
@ -86,8 +104,8 @@ devel
engine, but comes with a big performance penalty as all collections will be locked engine, but comes with a big performance penalty as all collections will be locked
exclusively for writes. exclusively for writes.
* Added new timeout option for AQL queries. If a query does not finish execution * Added new maxRuntime option for queries. If a query does not finish execution within
within the given time in seconds it will be killed. the given time (in seconds) it will be killed.
* Fixed undefined behaviour with creation of ArangoSearch links with custom * Fixed undefined behaviour with creation of ArangoSearch links with custom
analyzers in cluster environment. analyzers in cluster environment.

View File

@ -129,6 +129,20 @@ to all "follower" replicas, before the write operation is reported successful.
If a server fails, this is detected automatically and one of the servers holding If a server fails, this is detected automatically and one of the servers holding
copies take over, usually without an error being reported. copies take over, usually without an error being reported.
@RESTBODYPARAM{minReplicationFactor,integer,optional,int64}
(optional, default is 1): in a cluster, this attribute determines how many
desired copies of each shard are kept on different DBServers. The value 1 means
that only one copy (no synchronous replication) is kept. A value of k means
that desired k-1 replicas are kept. If in a failover scenario a shard of a
collection has less than minReplicationFactor many insync followers it will go
into "read-only" mode and will reject writes until enough followers are insync
again.
**In more detail**: Having `minReplicationFactor == 1` means as soon as a
"master-copy" is available of the data writes are allowed. Having
`minReplicationFactor > 1` requires additional insync copies on follower
servers to allow writes.
@RESTBODYPARAM{distributeShardsLike,string,optional,string} @RESTBODYPARAM{distributeShardsLike,string,optional,string}
(The default is *""*): in an Enterprise Edition cluster, this attribute binds (The default is *""*): in an Enterprise Edition cluster, this attribute binds
the specifics of sharding for the newly created collection to follow that of a the specifics of sharding for the newly created collection to follow that of a

View File

@ -107,8 +107,8 @@ to bring the satellite collections involved in the query into sync.
The default value is *60.0* (seconds). When the max time has been reached the query The default value is *60.0* (seconds). When the max time has been reached the query
will be stopped. will be stopped.
@RESTSTRUCT{timeout,post_api_cursor_opts,number,optional,double} @RESTSTRUCT{maxRuntime,post_api_cursor_opts,number,optional,double}
The query has to be executed within the given timeout or it will be killed. The query has to be executed within the given runtime or it will be killed.
The value is specified in seconds. The default value is *0.0* (no timeout). The value is specified in seconds. The default value is *0.0* (no timeout).
@RESTSTRUCT{maxTransactionSize,post_api_cursor_opts,integer,optional,int64} @RESTSTRUCT{maxTransactionSize,post_api_cursor_opts,integer,optional,int64}

View File

@ -21,6 +21,8 @@ The response is a JSON object with the following attributes:
- *replicationFactor*: the default replication factor for collections in this database - *replicationFactor*: the default replication factor for collections in this database
- *minReplicationFactor*: the default minimum replication factor for collections in this database
@RESTRETURNCODES @RESTRETURNCODES
@RESTRETURNCODE{200} @RESTRETURNCODE{200}

View File

@ -14,11 +14,16 @@ Optional object which can contain the following attributes:
The sharding method to use for new collections in this database. Valid values The sharding method to use for new collections in this database. Valid values
are: "", "flexible", or "single". The first two are equivalent. are: "", "flexible", or "single". The first two are equivalent.
@RESTSTRUCT{replicationFactor,get_api_database_new_USERS,string,optional,number} @RESTSTRUCT{replicationFactor,get_api_database_new_USERS,string,optional,}
Default replication factor for new collections created in this database. Default replication factor for new collections created in this database.
Special values include "satellite", which will replicate the collection to Special values include "satellite", which will replicate the collection to
every DB-server, and 1, which disables replication. every DB-server, and 1, which disables replication.
@RESTSTRUCT{minReplicationFactor,get_api_database_new_USERS,number,optional,}
Default minimum replication factor for new collections created in this database.
If there are less than minReplicationFactor replicas available the collection
will become read-only.
@RESTBODYPARAM{users,array,optional,get_api_database_new_USERS} @RESTBODYPARAM{users,array,optional,get_api_database_new_USERS}
Has to be an array of user objects to initially create for the new database. Has to be an array of user objects to initially create for the new database.
User information will not be changed for users that already exist. User information will not be changed for users that already exist.

View File

@ -1,6 +1,6 @@
CXX_STANDARD "17" CXX_STANDARD "17"
STARTER_REV "0.14.12" STARTER_REV "0.14.12"
SYNCER_REV "0.6.5" SYNCER_REV "0.7.0"
GCC_LINUX "9.2.0" GCC_LINUX "9.2.0"
MSVC_WINDOWS "2017" MSVC_WINDOWS "2017"
MACOS_MIN "10.14" MACOS_MIN "10.14"

View File

@ -43,6 +43,7 @@
#include "Aql/ModificationNodes.h" #include "Aql/ModificationNodes.h"
#include "Aql/MultiDependencySingleRowFetcher.h" #include "Aql/MultiDependencySingleRowFetcher.h"
#include "Aql/ParallelUnsortedGatherExecutor.h" #include "Aql/ParallelUnsortedGatherExecutor.h"
#include "Aql/OptimizerRulesFeature.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/RemoteExecutor.h" #include "Aql/RemoteExecutor.h"
#include "Aql/ScatterExecutor.h" #include "Aql/ScatterExecutor.h"
@ -421,6 +422,7 @@ CostEstimate DistributeNode::estimateCost() const {
GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base, GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base,
SortElementVector const& elements) SortElementVector const& elements)
: ExecutionNode(plan, base), : ExecutionNode(plan, base),
_vocbase(&(plan->getAst()->query()->vocbase())),
_elements(elements), _elements(elements),
_sortmode(SortMode::MinElement), _sortmode(SortMode::MinElement),
_parallelism(Parallelism::Undefined), _parallelism(Parallelism::Undefined),
@ -444,6 +446,7 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b
GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode, Parallelism parallelism) noexcept GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode, Parallelism parallelism) noexcept
: ExecutionNode(plan, id), : ExecutionNode(plan, id),
_vocbase(&(plan->getAst()->query()->vocbase())),
_sortmode(sortMode), _sortmode(sortMode),
_parallelism(parallelism), _parallelism(parallelism),
_limit(0) {} _limit(0) {}
@ -545,9 +548,13 @@ bool GatherNode::isSortingGather() const noexcept {
/// @brief is the node parallelizable? /// @brief is the node parallelizable?
struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> { struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> {
bool _isParallelizable = true; bool const _parallelizeWrites;
bool _isParallelizable;
explicit ParallelizableFinder(TRI_vocbase_t const& _vocbase)
: _parallelizeWrites(_vocbase.server().getFeature<OptimizerRulesFeature>().parallelizeGatherWrites()),
_isParallelizable(true) {}
ParallelizableFinder() : _isParallelizable(true) {}
~ParallelizableFinder() = default; ~ParallelizableFinder() = default;
bool enterSubquery(ExecutionNode*, ExecutionNode*) override final { bool enterSubquery(ExecutionNode*, ExecutionNode*) override final {
@ -561,19 +568,14 @@ struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> {
_isParallelizable = false; _isParallelizable = false;
return true; // true to abort the whole walking process return true; // true to abort the whole walking process
} }
if (node->isModificationNode()) { // write operations of type REMOVE, REPLACE and UPDATE
/* // can be parallelized, provided the rest of the plan
* TODO: enable parallelization for REMOVE, REPLACE, UPDATE // does not prohibit this
* as well. This seems safe as long as there is no DistributeNode if (node->isModificationNode() &&
* and there is no further communication using Scatter/Gather. _parallelizeWrites &&
* But this needs more testing first
&&
(node->getType() != ExecutionNode::REMOVE && (node->getType() != ExecutionNode::REMOVE &&
node->getType() != ExecutionNode::REPLACE && node->getType() != ExecutionNode::REPLACE &&
node->getType() != ExecutionNode::UPDATE)) { node->getType() != ExecutionNode::UPDATE)) {
*/
// REMOVEs and REPLACEs are actually parallelizable, as they are completely independent
// from each other on different shards
_isParallelizable = false; _isParallelizable = false;
return true; // true to abort the whole walking process return true; // true to abort the whole walking process
} }
@ -590,7 +592,7 @@ bool GatherNode::isParallelizable() const {
return false; return false;
} }
ParallelizableFinder finder; ParallelizableFinder finder(*_vocbase);
for (ExecutionNode* e : _dependencies) { for (ExecutionNode* e : _dependencies) {
e->walk(finder); e->walk(finder);
if (!finder._isParallelizable) { if (!finder._isParallelizable) {

View File

@ -360,6 +360,9 @@ class GatherNode final : public ExecutionNode {
bool isParallelizable() const; bool isParallelizable() const;
private: private:
/// @brief the underlying database
TRI_vocbase_t* _vocbase;
/// @brief sort elements, variable, ascending flags and possible attribute /// @brief sort elements, variable, ascending flags and possible attribute
/// paths. /// paths.
SortElementVector _elements; SortElementVector _elements;

View File

@ -50,7 +50,8 @@ std::vector<OptimizerRule> OptimizerRulesFeature::_rules;
std::unordered_map<velocypack::StringRef, int> OptimizerRulesFeature::_ruleLookup; std::unordered_map<velocypack::StringRef, int> OptimizerRulesFeature::_ruleLookup;
OptimizerRulesFeature::OptimizerRulesFeature(arangodb::application_features::ApplicationServer& server) OptimizerRulesFeature::OptimizerRulesFeature(arangodb::application_features::ApplicationServer& server)
: application_features::ApplicationFeature(server, "OptimizerRules") { : application_features::ApplicationFeature(server, "OptimizerRules"),
_parallelizeGatherWrites(true) {
setOptional(false); setOptional(false);
startsAfter<V8FeaturePhase>(); startsAfter<V8FeaturePhase>();
@ -63,6 +64,11 @@ void OptimizerRulesFeature::collectOptions(std::shared_ptr<arangodb::options::Pr
new arangodb::options::VectorParameter<arangodb::options::StringParameter>(&_optimizerRules), new arangodb::options::VectorParameter<arangodb::options::StringParameter>(&_optimizerRules),
arangodb::options::makeFlags(arangodb::options::Flags::Hidden)) arangodb::options::makeFlags(arangodb::options::Flags::Hidden))
.setIntroducedIn(30600); .setIntroducedIn(30600);
options->addOption("--query.parallelize-gather-writes",
"enable write parallelization for gather nodes",
new arangodb::options::BooleanParameter(&_parallelizeGatherWrites))
.setIntroducedIn(30600);
} }
void OptimizerRulesFeature::prepare() { void OptimizerRulesFeature::prepare() {

View File

@ -44,6 +44,9 @@ class OptimizerRulesFeature final : public application_features::ApplicationFeat
std::vector<std::string> const& optimizerRules() const { return _optimizerRules; } std::vector<std::string> const& optimizerRules() const { return _optimizerRules; }
/// @brief whether or not certain write operations can be parallelized
bool parallelizeGatherWrites() const { return _parallelizeGatherWrites; }
/// @brief translate a list of rule ids into rule name /// @brief translate a list of rule ids into rule name
static std::vector<velocypack::StringRef> translateRules(std::vector<int> const&); static std::vector<velocypack::StringRef> translateRules(std::vector<int> const&);
@ -77,15 +80,20 @@ class OptimizerRulesFeature final : public application_features::ApplicationFeat
std::vector<std::string> _optimizerRules; std::vector<std::string> _optimizerRules;
/// @brief if set to true, a gather node will be parallelized even for
/// certain write operations. this is false by default, enabling it is
/// experimental
bool _parallelizeGatherWrites;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _fixed = false;
#endif
/// @brief the rules database /// @brief the rules database
static std::vector<OptimizerRule> _rules; static std::vector<OptimizerRule> _rules;
/// @brief map to look up rule id by name /// @brief map to look up rule id by name
static std::unordered_map<velocypack::StringRef, int> _ruleLookup; static std::unordered_map<velocypack::StringRef, int> _ruleLookup;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _fixed = false;
#endif
}; };
} // namespace aql } // namespace aql

View File

@ -276,8 +276,8 @@ Query* Query::clone(QueryPart part, bool withPlan) {
} }
bool Query::killed() const { bool Query::killed() const {
if (_queryOptions.timeout > std::numeric_limits<double>::epsilon()) { if(_queryOptions.maxRuntime > std::numeric_limits<double>::epsilon()) {
if (TRI_microtime() > (_startTime + _queryOptions.timeout)) { if(TRI_microtime() > (_startTime + _queryOptions.maxRuntime)) {
return true; return true;
} }
} }

View File

@ -38,7 +38,7 @@ QueryOptions::QueryOptions()
: memoryLimit(0), : memoryLimit(0),
maxNumberOfPlans(0), maxNumberOfPlans(0),
maxWarningCount(10), maxWarningCount(10),
timeout(0), maxRuntime(0),
satelliteSyncWait(60.0), satelliteSyncWait(60.0),
ttl(0), ttl(0),
profile(PROFILE_LEVEL_NONE), profile(PROFILE_LEVEL_NONE),
@ -104,9 +104,9 @@ void QueryOptions::fromVelocyPack(VPackSlice const& slice) {
maxWarningCount = value.getNumber<size_t>(); maxWarningCount = value.getNumber<size_t>();
} }
value = slice.get("timeout"); value = slice.get("maxRuntime");
if (value.isNumber()) { if (value.isNumber()) {
timeout = value.getNumber<double>(); maxRuntime = value.getNumber<double>();
} }
@ -221,7 +221,7 @@ void QueryOptions::toVelocyPack(VPackBuilder& builder, bool disableOptimizerRule
builder.add("memoryLimit", VPackValue(memoryLimit)); builder.add("memoryLimit", VPackValue(memoryLimit));
builder.add("maxNumberOfPlans", VPackValue(maxNumberOfPlans)); builder.add("maxNumberOfPlans", VPackValue(maxNumberOfPlans));
builder.add("maxWarningCount", VPackValue(maxWarningCount)); builder.add("maxWarningCount", VPackValue(maxWarningCount));
builder.add("timeout", VPackValue(timeout)); builder.add("maxRuntime", VPackValue(maxRuntime));
builder.add("satelliteSyncWait", VPackValue(satelliteSyncWait)); builder.add("satelliteSyncWait", VPackValue(satelliteSyncWait));
builder.add("ttl", VPackValue(ttl)); builder.add("ttl", VPackValue(ttl));
builder.add("profile", VPackValue(static_cast<uint32_t>(profile))); builder.add("profile", VPackValue(static_cast<uint32_t>(profile)));

View File

@ -61,7 +61,7 @@ struct QueryOptions {
size_t memoryLimit; size_t memoryLimit;
size_t maxNumberOfPlans; size_t maxNumberOfPlans;
size_t maxWarningCount; size_t maxWarningCount;
double timeout; // query has to execute within the given time or will be killed double maxRuntime; // query has to execute within the given time or will be killed
double satelliteSyncWait; double satelliteSyncWait;
double ttl; // time until query cursor expires - avoids coursors to double ttl; // time until query cursor expires - avoids coursors to
// stick around for ever if client does not collect the data // stick around for ever if client does not collect the data

View File

@ -35,6 +35,7 @@
using namespace arangodb; using namespace arangodb;
using namespace arangodb::aql; using namespace arangodb::aql;
using namespace arangodb::cluster;
QueryRegistry::~QueryRegistry() { QueryRegistry::~QueryRegistry() {
std::vector<std::pair<std::string, QueryId>> toDelete; std::vector<std::pair<std::string, QueryId>> toDelete;
@ -70,7 +71,8 @@ QueryRegistry::~QueryRegistry() {
/// @brief insert /// @brief insert
void QueryRegistry::insert(QueryId id, Query* query, double ttl, void QueryRegistry::insert(QueryId id, Query* query, double ttl,
bool isPrepared, bool keepLease) { bool isPrepared, bool keepLease,
std::unique_ptr<CallbackGuard>&& rGuard) {
TRI_ASSERT(query != nullptr); TRI_ASSERT(query != nullptr);
TRI_ASSERT(query->trx() != nullptr); TRI_ASSERT(query->trx() != nullptr);
LOG_TOPIC("77778", DEBUG, arangodb::Logger::AQL) LOG_TOPIC("77778", DEBUG, arangodb::Logger::AQL)
@ -83,7 +85,7 @@ void QueryRegistry::insert(QueryId id, Query* query, double ttl,
} }
// create the query info object outside of the lock // create the query info object outside of the lock
auto p = std::make_unique<QueryInfo>(id, query, ttl, isPrepared); auto p = std::make_unique<QueryInfo>(id, query, ttl, isPrepared, std::move(rGuard));
p->_isOpen = keepLease; p->_isOpen = keepLease;
// now insert into table of running queries // now insert into table of running queries
@ -226,6 +228,10 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id,
TRI_ERROR_BAD_PARAMETER, "query with given vocbase and id not found"); TRI_ERROR_BAD_PARAMETER, "query with given vocbase and id not found");
} }
if (q->second->_rebootGuard != nullptr) {
q->second->_rebootGuard->callAndClear();
}
if (q->second->_isOpen && !ignoreOpened) { if (q->second->_isOpen && !ignoreOpened) {
// query in use by another thread/request // query in use by another thread/request
q->second->_query->kill(); q->second->_query->kill();
@ -408,13 +414,15 @@ void QueryRegistry::disallowInserts() {
// from here on, there shouldn't be any more inserts into the registry // from here on, there shouldn't be any more inserts into the registry
} }
QueryRegistry::QueryInfo::QueryInfo(QueryId id, Query* query, double ttl, bool isPrepared) QueryRegistry::QueryInfo::QueryInfo(QueryId id, Query* query, double ttl, bool isPrepared,
std::unique_ptr<arangodb::cluster::CallbackGuard>&& rebootGuard)
: _vocbase(&(query->vocbase())), : _vocbase(&(query->vocbase())),
_id(id), _id(id),
_query(query), _query(query),
_isOpen(false), _isOpen(false),
_isPrepared(isPrepared), _isPrepared(isPrepared),
_timeToLive(ttl), _timeToLive(ttl),
_expires(TRI_microtime() + ttl) {} _expires(TRI_microtime() + ttl),
_rebootGuard(std::move(rebootGuard)) {}
QueryRegistry::QueryInfo::~QueryInfo() { delete _query; } QueryRegistry::QueryInfo::~QueryInfo() { delete _query; }

View File

@ -27,11 +27,13 @@
#include "Aql/types.h" #include "Aql/types.h"
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Basics/ReadWriteLock.h" #include "Basics/ReadWriteLock.h"
#include "Cluster/CallbackGuard.h"
#include "Cluster/ResultT.h" #include "Cluster/ResultT.h"
struct TRI_vocbase_t; struct TRI_vocbase_t;
namespace arangodb { namespace arangodb {
namespace aql { namespace aql {
class ExecutionEngine; class ExecutionEngine;
class Query; class Query;
@ -55,7 +57,9 @@ class QueryRegistry {
/// With keepLease == true the query will be kept open and it is guaranteed /// With keepLease == true the query will be kept open and it is guaranteed
/// that the caller can continue to use it exclusively. /// that the caller can continue to use it exclusively.
/// This is identical to an atomic sequence of insert();open(); /// This is identical to an atomic sequence of insert();open();
TEST_VIRTUAL void insert(QueryId id, Query* query, double ttl, bool isPrepare, bool keepLease); TEST_VIRTUAL void insert(
QueryId id, Query* query, double ttl, bool isPrepare, bool keepLease,
std::unique_ptr<arangodb::cluster::CallbackGuard>&& = nullptr);
/// @brief open, find a query in the registry, if none is found, a nullptr /// @brief open, find a query in the registry, if none is found, a nullptr
/// is returned, otherwise, ownership of the query is transferred to the /// is returned, otherwise, ownership of the query is transferred to the
@ -120,7 +124,8 @@ class QueryRegistry {
QueryInfo(QueryInfo const&) = delete; QueryInfo(QueryInfo const&) = delete;
QueryInfo& operator=(QueryInfo const&) = delete; QueryInfo& operator=(QueryInfo const&) = delete;
QueryInfo(QueryId id, Query* query, double ttl, bool isPrepared); QueryInfo(QueryId id, Query* query, double ttl, bool isPrepared,
std::unique_ptr<arangodb::cluster::CallbackGuard>&& rebootGuard = nullptr);
~QueryInfo(); ~QueryInfo();
TRI_vocbase_t* _vocbase; // the vocbase TRI_vocbase_t* _vocbase; // the vocbase
@ -131,6 +136,8 @@ class QueryRegistry {
bool _isPrepared; bool _isPrepared;
double _timeToLive; // in seconds double _timeToLive; // in seconds
double _expires; // UNIX UTC timestamp of expiration double _expires; // UNIX UTC timestamp of expiration
std::unique_ptr<arangodb::cluster::CallbackGuard> _rebootGuard;
// Callback to remove query, when rebootId changes
}; };
/// @brief _queries, the actual map of maps for the registry /// @brief _queries, the actual map of maps for the registry

View File

@ -148,6 +148,7 @@ endif ()
set(LIB_ARANGO_GRAPH_SOURCES set(LIB_ARANGO_GRAPH_SOURCES
Aql/PruneExpressionEvaluator.cpp Aql/PruneExpressionEvaluator.cpp
Cluster/CallbackGuard.cpp
Cluster/ClusterEdgeCursor.cpp Cluster/ClusterEdgeCursor.cpp
Cluster/ClusterTraverser.cpp Cluster/ClusterTraverser.cpp
Cluster/TraverserEngine.cpp Cluster/TraverserEngine.cpp

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#include "CallbackGuard.h"
using namespace arangodb;
using namespace arangodb::cluster;
CallbackGuard::CallbackGuard() : _callback(nullptr) {}
CallbackGuard::CallbackGuard(std::function<void(void)> callback)
: _callback(std::move(callback)) {}
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard::CallbackGuard(CallbackGuard&& other)
: _callback(std::move(other._callback)) {
other._callback = nullptr;
}
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard& CallbackGuard::operator=(CallbackGuard&& other) {
call();
_callback = std::move(other._callback);
other._callback = nullptr;
return *this;
}
CallbackGuard::~CallbackGuard() { call(); }
void CallbackGuard::callAndClear() {
call();
_callback = nullptr;
}
void CallbackGuard::call() {
if (_callback) {
_callback();
}
}

View File

@ -0,0 +1,71 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CLUSTER_CALLBACKGUARD_H
#define ARANGOD_CLUSTER_CALLBACKGUARD_H
#include <functional>
#include "Cluster/ClusterTypes.h"
namespace arangodb {
namespace cluster {
/// @brief If constructed with a callback, the given callback will be called
/// exactly once: Either during destruction, or when the object is overwritten
/// (via operator=()), or when it's explicitly cleared. It's not copyable,
/// but movable.
class CallbackGuard {
public:
// Calls the callback given callback upon destruction.
// Allows only move semantics and no copy semantics.
CallbackGuard();
// IMPORTANT NOTE:
// The passed callback should not throw exceptions, they will not be caught
// here, but thrown by the destructor!
explicit CallbackGuard(std::function<void(void)> callback);
~CallbackGuard();
// Note that the move constructor of std::function is not noexcept until
// C++20. Thus we cannot mark the constructors here noexcept.
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard(CallbackGuard&& other);
// operator= additionally calls the _callback, and this can also throw.
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard& operator=(CallbackGuard&&);
CallbackGuard(CallbackGuard const&) = delete;
CallbackGuard& operator=(CallbackGuard const&) = delete;
/// @brief Call the contained callback, then delete it.
void callAndClear();
private:
void call();
std::function<void(void)> _callback;
};
}}
#endif

View File

@ -23,7 +23,7 @@
#ifndef ARANGOD_CLUSTER_REBOOTTRACKER_H #ifndef ARANGOD_CLUSTER_REBOOTTRACKER_H
#define ARANGOD_CLUSTER_REBOOTTRACKER_H #define ARANGOD_CLUSTER_REBOOTTRACKER_H
#include "Cluster/ClusterTypes.h" #include "Cluster/CallbackGuard.h"
#include "Basics/Mutex.h" #include "Basics/Mutex.h"
#include "Scheduler/Scheduler.h" #include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h" #include "Scheduler/SchedulerFeature.h"
@ -36,42 +36,6 @@
namespace arangodb { namespace arangodb {
namespace cluster { namespace cluster {
/// @brief If constructed with a callback, the given callback will be called
/// exactly once: Either during destruction, or when the object is overwritten
/// (via operator=()), or when it's explicitly cleared. It's not copyable,
/// but movable.
class CallbackGuard {
public:
// Calls the callback given callback upon destruction.
// Allows only move semantics and no copy semantics.
CallbackGuard();
// IMPORTANT NOTE:
// The passed callback should not throw exceptions, they will not be caught
// here, but thrown by the destructor!
explicit CallbackGuard(std::function<void(void)> callback);
~CallbackGuard();
// Note that the move constructor of std::function is not noexcept until
// C++20. Thus we cannot mark the constructors here noexcept.
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard(CallbackGuard&& other);
// operator= additionally calls the _callback, and this can also throw.
// NOLINTNEXTLINE(hicpp-noexcept-move,performance-noexcept-move-constructor)
CallbackGuard& operator=(CallbackGuard&&);
CallbackGuard(CallbackGuard const&) = delete;
CallbackGuard& operator=(CallbackGuard const&) = delete;
/// @brief Call the contained callback, then delete it.
void callAndClear();
private:
void call();
std::function<void(void)> _callback;
};
// Note: // Note:
// Instances of this class must be destructed during shutdown before the // Instances of this class must be destructed during shutdown before the
// scheduler is destroyed. // scheduler is destroyed.

View File

@ -40,7 +40,10 @@ using namespace arangodb::rest;
RestClusterHandler::RestClusterHandler(application_features::ApplicationServer& server, RestClusterHandler::RestClusterHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
std::vector<std::string> const& suffixes = _request->suffixes();
_allowDirectExecution = !suffixes.empty() && suffixes[0] == "endpoints";
}
RestStatus RestClusterHandler::execute() { RestStatus RestClusterHandler::execute() {
if (_request->requestType() != RequestType::GET) { if (_request->requestType() != RequestType::GET) {

View File

@ -40,9 +40,6 @@ class RestClusterHandler : public arangodb::RestBaseHandler {
/// _api/cluster/endpoints /// _api/cluster/endpoints
void handleCommandEndpoints(); void handleCommandEndpoints();
/// _api/cluster/serverInfo
void handleCommandServerInfo();
/// _api/cluster/agency-dump /// _api/cluster/agency-dump
void handleAgencyDump(); void handleAgencyDump();
}; };

View File

@ -414,92 +414,74 @@ static inline bool isStopping() {
return server.isStopping(); return server.isStopping();
} }
arangodb::Result SynchronizeShard::getReadLock(network::ConnectionPool* pool, arangodb::Result SynchronizeShard::getReadLock(
std::string const& endpoint, network::ConnectionPool* pool,
std::string const& database, std::string const& endpoint, std::string const& database,
std::string const& collection, std::string const& collection, std::string const& clientId,
std::string const& clientId, uint64_t rlid, uint64_t rlid, bool soft, double timeout) {
bool soft, double timeout) {
if (pool == nullptr) { // nullptr only happens during controlled shutdown // This function can be implemented in a more robust manner for server
// versions > 3.4. Starting with 3.4 the POST requests to the read lock API
// terminates the server side thread as soon as the lock request comes in.
// The POST request thus is answered immediately back to the caller.
// The servers (<=3.3) with lower versions hold the POST request for as long
// as the corresponding DELETE_REQ has not been successfully submitted.
using namespace std::chrono;
auto const start = steady_clock::now();
// nullptr only happens during controlled shutdown
if (pool == nullptr) {
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, return arangodb::Result(TRI_ERROR_SHUTTING_DOWN,
"startReadLockOnLeader: Shutting down"); "cancelReadLockOnLeader: Shutting down");
} }
VPackBuilder bodyBuilder; VPackBuilder body;
{ { VPackObjectBuilder o(&body);
VPackObjectBuilder o(&bodyBuilder); body.add(ID, VPackValue(std::to_string(rlid)));
bodyBuilder.add(ID, VPackValue(std::to_string(rlid))); body.add(COLLECTION, VPackValue(collection));
bodyBuilder.add(COLLECTION, VPackValue(collection)); body.add(TTL, VPackValue(timeout));
bodyBuilder.add(TTL, VPackValue(timeout)); body.add("serverId", VPackValue(arangodb::ServerState::instance()->getId()));
bodyBuilder.add(StaticStrings::ReplicationSoftLockOnly, VPackValue(soft)); body.add(StaticStrings::RebootId, VPackValue(ServerState::instance()->getRebootId()));
} body.add(StaticStrings::ReplicationSoftLockOnly, VPackValue(soft)); }
auto body = bodyBuilder.steal(); auto buf = body.steal();
auto const url = DB + database + REPL_HOLD_READ_LOCK;
// Try to POST the lock body. If POST fails, we should just exit and retry
// SynchroShard anew.
network::RequestOptions options; network::RequestOptions options;
options.database = database;
options.timeout = network::Timeout(timeout); options.timeout = network::Timeout(timeout);
options.skipScheduler = true; // hack to speed up future.get() auto res = network::sendRequest(
pool, endpoint, fuerte::RestVerb::Post,
url, *buf, options).get();
auto dummy = network::sendRequest(pool, endpoint, fuerte::RestVerb::Post, REPL_HOLD_READ_LOCK, if (!res.fail() && res.response->statusCode() == fuerte::StatusOK) {
*body, options); // Habemus clausum, we have a lock
// Intentionally do not look at the outcome, even in case of an error
// we must make sure that the read lock on the leader is not active!
// This is done automatically below.
size_t const maxTries = 9; // 511s max
double sleepTime = 1.0;
size_t count = 0;
while (++count < maxTries) { // wait for some time until read lock established:
if (isStopping()) {
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN);
}
// Now check that we hold the read lock:
auto res = network::sendRequest(pool, endpoint, fuerte::RestVerb::Put, REPL_HOLD_READ_LOCK,
*body, options)
.get();
if (res.ok() && res.response->statusCode() == fuerte::StatusOK) {
auto const slice = res.response->slice();
TRI_ASSERT(slice.isObject());
VPackSlice lockHeld = slice.get("lockHeld");
if (lockHeld.isBoolean() && lockHeld.getBool()) {
return arangodb::Result(); return arangodb::Result();
} }
LOG_TOPIC("b681f", DEBUG, Logger::MAINTENANCE)
<< "startReadLockOnLeader: Lock not yet acquired..."; LOG_TOPIC("cba32", DEBUG, Logger::MAINTENANCE)
} else { << "startReadLockOnLeader: couldn't POST lock body, "
if (res.ok() && res.response->statusCode() == fuerte::StatusNotFound) { << network::fuerteToArangoErrorMessage(res) << ", giving up.";
auto const slice = res.response->slice();
if (slice.isObject()) { // We MUSTN'T exit without trying to clean up a lock that was maybe acquired
VPackSlice s = slice.get(StaticStrings::ErrorNum); if (res.error == fuerte::Error::CouldNotConnect) {
if (s.isNumber()) { return arangodb::Result(
int errorNum = s.getNumber<int>(); TRI_ERROR_INTERNAL,
if (errorNum == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND) { "startReadLockOnLeader: couldn't POST lock body, giving up.");
// database is gone. we can now give up
break;
}
}
}
// fall-through to other cases intentional here
} }
std::string message = network::fuerteToArangoErrorMessage(res); double timeLeft =
LOG_TOPIC("a82bc", DEBUG, Logger::MAINTENANCE) double(timeout) - duration<double>(steady_clock::now()-start).count() ;
<< "startReadLockOnLeader: Do not see read lock yet:" << message; if (timeLeft < 60.0) {
timeLeft = 60.0;
} }
std::this_thread::sleep_for(duration<double>(sleepTime)); // Ambiguous POST, we'll try to DELETE a potentially acquired lock
sleepTime *= 2.0;
}
LOG_TOPIC("75e2b", ERR, Logger::MAINTENANCE) << "startReadLockOnLeader: giving up";
try { try {
auto r = network::sendRequest(pool, endpoint, fuerte::RestVerb::Delete, REPL_HOLD_READ_LOCK, auto r = network::sendRequest(pool, endpoint, fuerte::RestVerb::Delete, url,
*body, options) *buf, options)
.get(); .get();
if (r.fail() || r.response->statusCode() != fuerte::StatusOK) { if (r.fail() || r.response->statusCode() != fuerte::StatusOK) {
std::string addendum = network::fuerteToArangoErrorMessage(r); std::string addendum = network::fuerteToArangoErrorMessage(r);
@ -511,9 +493,8 @@ arangodb::Result SynchronizeShard::getReadLock(network::ConnectionPool* pool,
LOG_TOPIC("7fcc9", ERR, Logger::MAINTENANCE) LOG_TOPIC("7fcc9", ERR, Logger::MAINTENANCE)
<< "startReadLockOnLeader: exception in cancel: " << e.what(); << "startReadLockOnLeader: exception in cancel: " << e.what();
} }
return arangodb::Result(
return arangodb::Result(TRI_ERROR_CLUSTER_TIMEOUT, TRI_ERROR_CLUSTER_TIMEOUT, "startReadLockOnLeader: giving up");
"startReadLockOnLeader: giving up");
} }
arangodb::Result SynchronizeShard::startReadLockOnLeader( arangodb::Result SynchronizeShard::startReadLockOnLeader(
@ -542,6 +523,7 @@ static arangodb::ResultT<SyncerId> replicationSynchronize(
std::shared_ptr<arangodb::LogicalCollection> const& col, std::shared_ptr<arangodb::LogicalCollection> const& col,
VPackSlice const& config, std::string const& clientInfoString, VPackSlice const& config, std::string const& clientInfoString,
ApplierType applierType, std::shared_ptr<VPackBuilder> sy) { ApplierType applierType, std::shared_ptr<VPackBuilder> sy) {
auto& vocbase = col->vocbase(); auto& vocbase = col->vocbase();
auto database = vocbase.name(); auto database = vocbase.name();
@ -833,7 +815,7 @@ bool SynchronizeShard::first() {
if (asResult.ok()) { if (asResult.ok()) {
if (Logger::isEnabled(LogLevel::DEBUG, Logger::MAINTENANCE)) { if (Logger::isEnabled(LogLevel::DEBUG, Logger::MAINTENANCE)) {
std::stringstream msg; std::stringstream msg;
msg << "synchronizeOneShard: shortcut worked, done, "; msg << "SynchronizeShard: shortcut worked, done, ";
AppendShardInformationToMessage(database, shard, planId, startTime, msg); AppendShardInformationToMessage(database, shard, planId, startTime, msg);
LOG_TOPIC("f4a5b", DEBUG, Logger::MAINTENANCE) << msg.str(); LOG_TOPIC("f4a5b", DEBUG, Logger::MAINTENANCE) << msg.str();
} }
@ -861,7 +843,7 @@ bool SynchronizeShard::first() {
if (isStopping()) { if (isStopping()) {
std::string errorMessage( std::string errorMessage(
"synchronizeOneShard: synchronization failed for shard "); "SynchronizeShard: synchronization failed for shard ");
errorMessage += shard + ": shutdown in progress, giving up"; errorMessage += shard + ": shutdown in progress, giving up";
LOG_TOPIC("a0f9a", INFO, Logger::MAINTENANCE) << errorMessage; LOG_TOPIC("a0f9a", INFO, Logger::MAINTENANCE) << errorMessage;
_result.reset(TRI_ERROR_SHUTTING_DOWN, errorMessage); _result.reset(TRI_ERROR_SHUTTING_DOWN, errorMessage);
@ -933,9 +915,7 @@ bool SynchronizeShard::first() {
if (collections.length() == 0 || collections[0].get("name").copyString() != shard) { if (collections.length() == 0 || collections[0].get("name").copyString() != shard) {
std::stringstream error; std::stringstream error;
error error << "shard " << shard << " seems to be gone from leader, this "
<< "shard " << shard
<< " seems to be gone from leader, this "
"can happen if a collection was dropped during synchronization!"; "can happen if a collection was dropped during synchronization!";
LOG_TOPIC("664ae", WARN, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str(); LOG_TOPIC("664ae", WARN, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str();
_result.reset(TRI_ERROR_INTERNAL, error.str()); _result.reset(TRI_ERROR_INTERNAL, error.str());
@ -943,8 +923,8 @@ bool SynchronizeShard::first() {
} }
auto lastTick = auto lastTick =
arangodb::basics::VelocyPackHelper::getNumericValue<TRI_voc_tick_t>(sy, LAST_LOG_TICK, arangodb::basics::VelocyPackHelper::getNumericValue<TRI_voc_tick_t>(
0); sy, LAST_LOG_TICK, 0);
VPackBuilder builder; VPackBuilder builder;
ResultT<TRI_voc_tick_t> tickResult = ResultT<TRI_voc_tick_t> tickResult =
@ -1008,7 +988,7 @@ ResultT<TRI_voc_tick_t> SynchronizeShard::catchupWithReadLock(
if (isStopping()) { if (isStopping()) {
std::string errorMessage = std::string errorMessage =
"synchronizeOneShard: startReadLockOnLeader (soft): shutting down"; "SynchronizeShard: startReadLockOnLeader (soft): shutting down";
return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_SHUTTING_DOWN, errorMessage); return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_SHUTTING_DOWN, errorMessage);
} }
@ -1023,7 +1003,7 @@ ResultT<TRI_voc_tick_t> SynchronizeShard::catchupWithReadLock(
clientId, lockJobId, true, timeout); clientId, lockJobId, true, timeout);
if (!res.ok()) { if (!res.ok()) {
std::string errorMessage = std::string errorMessage =
"synchronizeOneShard: error in startReadLockOnLeader (soft):" + res.errorMessage(); "SynchronizeShard: error in startReadLockOnLeader (soft):" + res.errorMessage();
return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_INTERNAL, errorMessage); return ResultT<TRI_voc_tick_t>::error(TRI_ERROR_INTERNAL, errorMessage);
} }
@ -1110,7 +1090,7 @@ Result SynchronizeShard::catchupWithExclusiveLock(
lockJobId, false); lockJobId, false);
if (!res.ok()) { if (!res.ok()) {
std::string errorMessage = std::string errorMessage =
"synchronizeOneShard: error in startReadLockOnLeader (hard):" + res.errorMessage(); "SynchronizeShard: error in startReadLockOnLeader (hard):" + res.errorMessage();
return {TRI_ERROR_INTERNAL, errorMessage}; return {TRI_ERROR_INTERNAL, errorMessage};
} }
auto readLockGuard = arangodb::scopeGuard([&, this]() { auto readLockGuard = arangodb::scopeGuard([&, this]() {
@ -1169,9 +1149,12 @@ Result SynchronizeShard::catchupWithExclusiveLock(
void SynchronizeShard::setState(ActionState state) { void SynchronizeShard::setState(ActionState state) {
if ((COMPLETE == state || FAILED == state) && _state != state) { if ((COMPLETE == state || FAILED == state) && _state != state) {
TRI_ASSERT(_description.has("shard")); auto const& shard = _description.get("shard");
_feature.incShardVersion(_description.get("shard")); if (COMPLETE == state) {
LOG_TOPIC("50827", INFO, Logger::MAINTENANCE)
<< "SynchronizeShard: synchronization completed for shard " << shard;
}
_feature.incShardVersion(shard);
} }
ActionBase::setState(state); ActionBase::setState(state);
} }

View File

@ -478,18 +478,19 @@ bool CommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
RequestLane lane = handler->getRequestLane(); RequestLane lane = handler->getRequestLane();
ContentType respType = handler->request()->contentTypeResponse(); ContentType respType = handler->request()->contentTypeResponse();
uint64_t mid = handler->messageId(); uint64_t mid = handler->messageId();
bool handlerAllowDirectExecution = handler->allowDirectExecution();
// queue the operation in the scheduler, and make it eligible for direct execution // queue the operation in the scheduler, and make it eligible for direct execution
// only if the current CommTask type allows it (HttpCommTask: yes, CommTask: no) // only if the current CommTask type allows it (HttpCommTask: yes, CommTask: no)
// and there is currently only a single client handled by the IoContext // and there is currently only a single client handled by the IoContext
auto cb = [self = shared_from_this(), handler = std::move(handler)]() { auto cb = [self = shared_from_this(), handler = std::move(handler)]() mutable {
RequestStatistics::SET_QUEUE_END(handler->statistics()); RequestStatistics::SET_QUEUE_END(handler->statistics());
handler->runHandler([self = std::move(self)](rest::RestHandler* handler) { handler->runHandler([self = std::move(self)](rest::RestHandler* handler) {
// Pass the response the io context // Pass the response the io context
self->sendResponse(handler->stealResponse(), handler->stealStatistics()); self->sendResponse(handler->stealResponse(), handler->stealStatistics());
}); });
}; };
bool ok = SchedulerFeature::SCHEDULER->queue(lane, std::move(cb), allowDirectHandling()); bool ok = SchedulerFeature::SCHEDULER->queue(lane, std::move(cb), allowDirectHandling() && handlerAllowDirectExecution);
if (!ok) { if (!ok) {
addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE, addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE,

View File

@ -108,6 +108,9 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
// what lane to use for this request // what lane to use for this request
virtual RequestLane lane() const = 0; virtual RequestLane lane() const = 0;
// return true if direct handler execution is allowed
bool allowDirectExecution() const { return _allowDirectExecution; }
RequestLane getRequestLane() { RequestLane getRequestLane() {
bool found; bool found;
_request->header(StaticStrings::XArangoFrontend, found); _request->header(StaticStrings::XArangoFrontend, found);
@ -218,6 +221,8 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
protected: protected:
std::atomic<bool> _canceled; std::atomic<bool> _canceled;
bool _allowDirectExecution = false;
}; };
} // namespace rest } // namespace rest

View File

@ -102,7 +102,8 @@ OperationResult GraphManager::createCollection(std::string const& name, TRI_col_
helper.openObject(); helper.openObject();
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
Result res = ShardingInfo::validateShardsAndReplicationFactor(options, vocbase.server()); Result res =
ShardingInfo::validateShardsAndReplicationFactor(options, vocbase.server());
if (res.fail()) { if (res.fail()) {
return OperationResult(res); return OperationResult(res);
} }
@ -111,12 +112,14 @@ OperationResult GraphManager::createCollection(std::string const& name, TRI_col_
vocbase.server().getFeature<ClusterFeature>().forceOneShard() || vocbase.server().getFeature<ClusterFeature>().forceOneShard() ||
(vocbase.sharding() == "single" && (vocbase.sharding() == "single" &&
options.get(StaticStrings::DistributeShardsLike).isNone() && options.get(StaticStrings::DistributeShardsLike).isNone() &&
arangodb::basics::VelocyPackHelper::getNumericValue<uint64_t>(options, StaticStrings::NumberOfShards, 0) <= 1); arangodb::basics::VelocyPackHelper::getNumericValue<uint64_t>(options, StaticStrings::NumberOfShards,
0) <= 1);
if (forceOneShard) { if (forceOneShard) {
// force a single shard with shards distributed like "_graph" // force a single shard with shards distributed like "_graph"
helper.add(StaticStrings::NumberOfShards, VPackValue(1)); helper.add(StaticStrings::NumberOfShards, VPackValue(1));
helper.add(StaticStrings::DistributeShardsLike, VPackValue(vocbase.shardingPrototypeName())); helper.add(StaticStrings::DistributeShardsLike,
VPackValue(vocbase.shardingPrototypeName()));
} }
} }
@ -604,7 +607,8 @@ Result GraphManager::ensureCollections(Graph const* graph, bool waitForSync) con
OperationResult GraphManager::readGraphs(velocypack::Builder& builder, OperationResult GraphManager::readGraphs(velocypack::Builder& builder,
aql::QueryPart const queryPart) const { aql::QueryPart const queryPart) const {
std::string const queryStr{"FOR g IN _graphs RETURN g"}; std::string const queryStr{
"FOR g IN _graphs RETURN MERGE(g, {name: g._key})"};
return readGraphByQuery(builder, queryPart, queryStr); return readGraphByQuery(builder, queryPart, queryStr);
} }
@ -997,7 +1001,9 @@ ResultT<std::unique_ptr<Graph>> GraphManager::buildGraphFromInput(std::string co
TRI_ASSERT(input.isObject()); TRI_ASSERT(input.isObject());
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
// validate numberOfShards and replicationFactor // validate numberOfShards and replicationFactor
Result res = ShardingInfo::validateShardsAndReplicationFactor(input.get("options"), _vocbase.server()); Result res =
ShardingInfo::validateShardsAndReplicationFactor(input.get("options"),
_vocbase.server());
if (res.fail()) { if (res.fail()) {
return res; return res;
} }

View File

@ -825,8 +825,7 @@ bool Index::canUseConditionPart(arangodb::aql::AstNode const* access,
other->type == arangodb::aql::NODE_TYPE_ATTRIBUTE_ACCESS)) { other->type == arangodb::aql::NODE_TYPE_ATTRIBUTE_ACCESS)) {
// value IN a.b OR value IN a.b[*] // value IN a.b OR value IN a.b[*]
arangodb::aql::Ast::getReferencedVariables(access, variables); arangodb::aql::Ast::getReferencedVariables(access, variables);
if (other->type == arangodb::aql::NODE_TYPE_ATTRIBUTE_ACCESS && if (variables.find(reference) != variables.end()) {
variables.find(reference) != variables.end()) {
variables.clear(); variables.clear();
arangodb::aql::Ast::getReferencedVariables(other, variables); arangodb::aql::Ast::getReferencedVariables(other, variables);
} }

View File

@ -151,25 +151,18 @@ void ConnectionPool::pruneConnections() {
} }
/// @brief cancel connections to this endpoint /// @brief cancel connections to this endpoint
void ConnectionPool::cancelConnections(std::string const& endpoint) { size_t ConnectionPool::cancelConnections(std::string const& endpoint) {
fuerte::ConnectionBuilder builder;
builder.endpoint(endpoint);
builder.protocolType(_config.protocol); // always overwrite protocol
std::string normalized = builder.normalizedEndpoint();
WRITE_LOCKER(guard, _lock); WRITE_LOCKER(guard, _lock);
auto const& it = _connections.find(normalized); auto const& it = _connections.find(endpoint);
if (it != _connections.end()) { if (it != _connections.end()) {
// { size_t n = it->second->list.size();
// ConnectionList& list = *(it->second); for (auto& c : it->second->list) {
// std::lock_guard<std::mutex> guard(list.mutex); c.fuerte->cancel();
// for (auto& c : list.connections) {
// c->shutdown();
// }
// }
_connections.erase(it);
} }
_connections.erase(it);
return n;
}
return 0;
} }
/// @brief return the number of open connections /// @brief return the number of open connections
@ -231,6 +224,7 @@ ConnectionPtr ConnectionPool::selectConnection(std::string const& endpoint,
fuerte::ConnectionBuilder builder; fuerte::ConnectionBuilder builder;
builder.endpoint(endpoint); // picks the socket type builder.endpoint(endpoint); // picks the socket type
builder.verifyHost(_config.verifyHosts);
builder.protocolType(_config.protocol); // always overwrite protocol builder.protocolType(_config.protocol); // always overwrite protocol
TRI_ASSERT(builder.socketType() != SocketType::Undefined); TRI_ASSERT(builder.socketType() != SocketType::Undefined);

View File

@ -89,7 +89,7 @@ class ConnectionPool final {
void pruneConnections(); void pruneConnections();
/// @brief cancel connections to this endpoint /// @brief cancel connections to this endpoint
void cancelConnections(std::string const& endpoint); size_t cancelConnections(std::string const& endpoint);
/// @brief return the number of open connections /// @brief return the number of open connections
size_t numOpenConnections() const; size_t numOpenConnections() const;

View File

@ -107,6 +107,10 @@ FutureRes sendRequest(ConnectionPool* pool, DestinationId dest, RestVerb type,
return futures::makeFuture(Response{std::move(dest), Error::Canceled, nullptr}); return futures::makeFuture(Response{std::move(dest), Error::Canceled, nullptr});
} }
LOG_TOPIC("2713a", DEBUG, Logger::COMMUNICATION)
<< "request to '" << dest
<< "' '" << fuerte::to_string(type) << " " << path << "'";
arangodb::network::EndpointSpec spec; arangodb::network::EndpointSpec spec;
int res = resolveDestination(*pool->config().clusterInfo, dest, spec); int res = resolveDestination(*pool->config().clusterInfo, dest, spec);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?! if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
@ -236,20 +240,7 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
switch (err) { switch (err) {
case fuerte::Error::NoError: { case fuerte::Error::NoError: {
TRI_ASSERT(res); TRI_ASSERT(res);
if (res->statusCode() == fuerte::StatusOK || res->statusCode() == fuerte::StatusCreated || if (checkResponse(err, req, res)) {
res->statusCode() == fuerte::StatusAccepted ||
res->statusCode() == fuerte::StatusNoContent) {
callResponse(Error::NoError, std::move(res));
break;
} else if (res->statusCode() == fuerte::StatusNotFound && _options.retryNotFound &&
TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND ==
network::errorCodeFromBody(res->slice())) {
LOG_TOPIC("5a8e9", DEBUG, Logger::COMMUNICATION)
<< "retrying request";
} else { // a "proper error" which has to be returned to the client
LOG_TOPIC("5a8d9", DEBUG, Logger::COMMUNICATION)
<< "canceling request";
callResponse(err, std::move(res));
break; break;
} }
[[fallthrough]]; [[fallthrough]];
@ -257,7 +248,8 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
case fuerte::Error::CouldNotConnect: case fuerte::Error::CouldNotConnect:
case fuerte::Error::ConnectionClosed: case fuerte::Error::ConnectionClosed:
case fuerte::Error::Timeout: { case fuerte::Error::Timeout:
case fuerte::Error::Canceled: {
// Note that this case includes the refusal of a leader to accept // Note that this case includes the refusal of a leader to accept
// the operation, in which case we have to flush ClusterInfo: // the operation, in which case we have to flush ClusterInfo:
@ -283,8 +275,40 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
} }
} }
bool checkResponse(fuerte::Error err,
std::unique_ptr<fuerte::Request>& req,
std::unique_ptr<fuerte::Response>& res) {
switch (res->statusCode()) {
case fuerte::StatusOK:
case fuerte::StatusCreated:
case fuerte::StatusAccepted:
case fuerte::StatusNoContent:
callResponse(Error::NoError, std::move(res));
return true; // done
case fuerte::StatusUnavailable:
return false; // goto retry
case fuerte::StatusNotFound:
if (_options.retryNotFound &&
TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == network::errorCodeFromBody(res->slice())) {
return false; // goto retry
}
[[fallthrough]];
default: // a "proper error" which has to be returned to the client
callResponse(err, std::move(res));
return true; // done
}
}
/// @broef schedule calling the response promise /// @broef schedule calling the response promise
void callResponse(Error err, std::unique_ptr<fuerte::Response> res) { void callResponse(Error err, std::unique_ptr<fuerte::Response> res) {
LOG_TOPIC_IF("2713d", DEBUG, Logger::COMMUNICATION, err != fuerte::Error::NoError)
<< "error on request to '" << _destination
<< "' '" << fuerte::to_string(_type) << " " << _path
<< "' '" << fuerte::to_string(err) << "'";
Scheduler* sch = SchedulerFeature::SCHEDULER; Scheduler* sch = SchedulerFeature::SCHEDULER;
if (_options.skipScheduler || sch == nullptr) { if (_options.skipScheduler || sch == nullptr) {
_promise.setValue(Response{std::move(_destination), err, std::move(res)}); _promise.setValue(Response{std::move(_destination), err, std::move(res)});
@ -303,6 +327,11 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
} }
void retryLater(std::chrono::steady_clock::duration tryAgainAfter) { void retryLater(std::chrono::steady_clock::duration tryAgainAfter) {
LOG_TOPIC("2713e", DEBUG, Logger::COMMUNICATION)
<< "retry request to '" << _destination
<< "' '" << fuerte::to_string(_type) << " " << _path << "'";
auto* sch = SchedulerFeature::SCHEDULER; auto* sch = SchedulerFeature::SCHEDULER;
if (ADB_UNLIKELY(sch == nullptr)) { if (ADB_UNLIKELY(sch == nullptr)) {
_promise.setValue(Response{std::move(_destination), fuerte::Error::Canceled, nullptr}); _promise.setValue(Response{std::move(_destination), fuerte::Error::Canceled, nullptr});
@ -338,6 +367,10 @@ FutureRes sendRequestRetry(ConnectionPool* pool, DestinationId destination,
return futures::makeFuture(Response{destination, Error::Canceled, nullptr}); return futures::makeFuture(Response{destination, Error::Canceled, nullptr});
} }
LOG_TOPIC("2713b", DEBUG, Logger::COMMUNICATION)
<< "request to '" << destination
<< "' '" << fuerte::to_string(type) << " " << path << "'";
// auto req = prepareRequest(type, path, std::move(payload), timeout, headers); // auto req = prepareRequest(type, path, std::move(payload), timeout, headers);
auto rs = std::make_shared<RequestsState>(pool, std::move(destination), auto rs = std::make_shared<RequestsState>(pool, std::move(destination),
type, std::move(path), type, std::move(path),

View File

@ -97,27 +97,6 @@ void NetworkFeature::collectOptions(std::shared_ptr<options::ProgramOptions> opt
options->addOption("--network.verify-hosts", "verify hosts when using TLS", options->addOption("--network.verify-hosts", "verify hosts when using TLS",
new BooleanParameter(&_verifyHosts)) new BooleanParameter(&_verifyHosts))
.setIntroducedIn(30600); .setIntroducedIn(30600);
_gcfunc = [this](bool canceled) {
if (canceled) {
return;
}
_pool->pruneConnections();
if (server().hasFeature<ClusterFeature>()) {
auto& ci = server().getFeature<ClusterFeature>().clusterInfo();
auto failed = ci.getFailedServers();
for (ServerID const& f : failed) {
_pool->cancelConnections(f);
}
}
if (!server().isStopping() && !canceled) {
std::chrono::seconds off(12);
::queueGarbageCollection(_workItemMutex, _workItem, _gcfunc, off);
}
};
} }
void NetworkFeature::validateOptions(std::shared_ptr<options::ProgramOptions>) { void NetworkFeature::validateOptions(std::shared_ptr<options::ProgramOptions>) {
@ -131,17 +110,45 @@ void NetworkFeature::validateOptions(std::shared_ptr<options::ProgramOptions>) {
} }
void NetworkFeature::prepare() { void NetworkFeature::prepare() {
ClusterInfo* ci = nullptr;
if (server().hasFeature<ClusterFeature>() && server().isEnabled<ClusterFeature>()) {
ci = &server().getFeature<ClusterFeature>().clusterInfo();
}
network::ConnectionPool::Config config; network::ConnectionPool::Config config;
config.numIOThreads = static_cast<unsigned>(_numIOThreads); config.numIOThreads = static_cast<unsigned>(_numIOThreads);
config.maxOpenConnections = _maxOpenConnections; config.maxOpenConnections = _maxOpenConnections;
config.idleConnectionMilli = _idleTtlMilli; config.idleConnectionMilli = _idleTtlMilli;
config.verifyHosts = _verifyHosts; config.verifyHosts = _verifyHosts;
if (server().hasFeature<ClusterFeature>() && server().isEnabled<ClusterFeature>()) { config.clusterInfo = ci;
config.clusterInfo = &server().getFeature<ClusterFeature>().clusterInfo();
}
_pool = std::make_unique<network::ConnectionPool>(config); _pool = std::make_unique<network::ConnectionPool>(config);
_poolPtr.store(_pool.get(), std::memory_order_release); _poolPtr.store(_pool.get(), std::memory_order_release);
_gcfunc = [this, ci](bool canceled) {
if (canceled) {
return;
}
_pool->pruneConnections();
if (ci != nullptr) {
auto failed = ci->getFailedServers();
for (ServerID const& srvId : failed) {
std::string endpoint = ci->getServerEndpoint(srvId);
size_t n = _pool->cancelConnections(endpoint);
LOG_TOPIC_IF("15d94", INFO, Logger::COMMUNICATION, n > 0)
<< "canceling " << n << " connections to failed server '"
<< srvId << "' on endpoint '" << endpoint << "'";
}
}
if (!server().isStopping() && !canceled) {
std::chrono::seconds off(12);
::queueGarbageCollection(_workItemMutex, _workItem, _gcfunc, off);
}
};
} }
void NetworkFeature::start() { void NetworkFeature::start() {

View File

@ -38,7 +38,9 @@ using namespace arangodb::rest;
RestAdminLogHandler::RestAdminLogHandler(application_features::ApplicationServer& server, RestAdminLogHandler::RestAdminLogHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
_allowDirectExecution = true;
}
RestStatus RestAdminLogHandler::execute() { RestStatus RestAdminLogHandler::execute() {
auto& server = application_features::ApplicationServer::server(); auto& server = application_features::ApplicationServer::server();

View File

@ -39,7 +39,9 @@ using namespace arangodb::rest;
RestAdminServerHandler::RestAdminServerHandler(application_features::ApplicationServer& server, RestAdminServerHandler::RestAdminServerHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralRequest* request,
GeneralResponse* response) GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
_allowDirectExecution = true;
}
RestStatus RestAdminServerHandler::execute() { RestStatus RestAdminServerHandler::execute() {
std::vector<std::string> const& suffixes = _request->suffixes(); std::vector<std::string> const& suffixes = _request->suffixes();

View File

@ -45,7 +45,12 @@ using namespace arangodb::rest;
RestDocumentHandler::RestDocumentHandler(application_features::ApplicationServer& server, RestDocumentHandler::RestDocumentHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestVocbaseBaseHandler(server, request, response) {} : RestVocbaseBaseHandler(server, request, response) {
if(request->requestType() == rest::RequestType::POST
&& request->contentLength() <= 1024) {
_allowDirectExecution = true;
}
}
RestDocumentHandler::~RestDocumentHandler() = default; RestDocumentHandler::~RestDocumentHandler() = default;

View File

@ -35,7 +35,9 @@ using namespace arangodb::rest;
RestEngineHandler::RestEngineHandler(application_features::ApplicationServer& server, RestEngineHandler::RestEngineHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
_allowDirectExecution = true;
}
RestStatus RestEngineHandler::execute() { RestStatus RestEngineHandler::execute() {
// extract the sub-request type // extract the sub-request type

View File

@ -43,6 +43,7 @@ RestJobHandler::RestJobHandler(application_features::ApplicationServer& server,
AsyncJobManager* jobManager) AsyncJobManager* jobManager)
: RestBaseHandler(server, request, response), _jobManager(jobManager) { : RestBaseHandler(server, request, response), _jobManager(jobManager) {
TRI_ASSERT(jobManager != nullptr); TRI_ASSERT(jobManager != nullptr);
_allowDirectExecution = true;
} }
RestStatus RestJobHandler::execute() { RestStatus RestJobHandler::execute() {

View File

@ -31,7 +31,9 @@ using velocypack::StringRef;
RestPleaseUpgradeHandler::RestPleaseUpgradeHandler(application_features::ApplicationServer& server, RestPleaseUpgradeHandler::RestPleaseUpgradeHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralRequest* request,
GeneralResponse* response) GeneralResponse* response)
: RestHandler(server, request, response) {} : RestHandler(server, request, response) {
_allowDirectExecution = true;
}
RestStatus RestPleaseUpgradeHandler::execute() { RestStatus RestPleaseUpgradeHandler::execute() {
resetResponse(rest::ResponseCode::OK); resetResponse(rest::ResponseCode::OK);

View File

@ -36,6 +36,7 @@
#include "Cluster/ClusterHelpers.h" #include "Cluster/ClusterHelpers.h"
#include "Cluster/ClusterMethods.h" #include "Cluster/ClusterMethods.h"
#include "Cluster/FollowerInfo.h" #include "Cluster/FollowerInfo.h"
#include "Cluster/RebootTracker.h"
#include "Cluster/ResignShardLeadership.h" #include "Cluster/ResignShardLeadership.h"
#include "GeneralServer/AuthenticationFeature.h" #include "GeneralServer/AuthenticationFeature.h"
#include "Indexes/Index.h" #include "Indexes/Index.h"
@ -73,6 +74,7 @@
using namespace arangodb; using namespace arangodb;
using namespace arangodb::basics; using namespace arangodb::basics;
using namespace arangodb::rest; using namespace arangodb::rest;
using namespace arangodb::cluster;
namespace { namespace {
std::string const dataString("data"); std::string const dataString("data");
@ -2529,6 +2531,9 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
return; return;
} }
RebootId rebootId(0);
std::string serverId;
if (!body.isObject()) { if (!body.isObject()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"body needs to be an object with attributes 'collection', " "body needs to be an object with attributes 'collection', "
@ -2536,9 +2541,26 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
return; return;
} }
VPackSlice const collection = body.get("collection"); VPackSlice collection = body.get("collection");
VPackSlice const ttlSlice = body.get("ttl"); VPackSlice ttlSlice = body.get("ttl");
VPackSlice const idSlice = body.get("id"); VPackSlice idSlice = body.get("id");
if (body.hasKey(StaticStrings::RebootId)) {
if (body.get(StaticStrings::RebootId).isInteger()) {
if (body.hasKey("serverId") && body.get("serverId").isString()) {
rebootId = RebootId(body.get(StaticStrings::RebootId).getNumber<uint64_t>());
serverId = body.get("serverId").copyString();
} else {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"'rebootId' must be accompanied by string attribute 'serverId'");
return;
}
} else {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"'rebootId' must be an integer attribute");
return;
}
}
if (!collection.isString() || !ttlSlice.isNumber() || !idSlice.isString()) { if (!collection.isString() || !ttlSlice.isNumber() || !idSlice.isString()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
@ -2584,7 +2606,7 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
LOG_TOPIC("4fac2", DEBUG, Logger::REPLICATION) LOG_TOPIC("4fac2", DEBUG, Logger::REPLICATION)
<< "Attempt to create a Lock: " << id << " for shard: " << _vocbase.name() << "Attempt to create a Lock: " << id << " for shard: " << _vocbase.name()
<< "/" << col->name() << " of type: " << (doSoftLock ? "soft" : "hard"); << "/" << col->name() << " of type: " << (doSoftLock ? "soft" : "hard");
Result res = createBlockingTransaction(id, *col, ttl, lockType); Result res = createBlockingTransaction(id, *col, ttl, lockType, rebootId, serverId);
if (!res.ok()) { if (!res.ok()) {
generateError(res); generateError(res);
return; return;
@ -2918,7 +2940,9 @@ ReplicationApplier* RestReplicationHandler::getApplier(bool& global) {
Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id, Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id,
LogicalCollection& col, double ttl, LogicalCollection& col, double ttl,
AccessMode::Type access) const { AccessMode::Type access,
RebootId const& rebootId,
std::string const& serverId) {
// This is a constant JSON structure for Queries. // This is a constant JSON structure for Queries.
// we actually do not need a plan, as we only want the query registry to have // we actually do not need a plan, as we only want the query registry to have
// a hold of our transaction // a hold of our transaction
@ -2940,6 +2964,7 @@ Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id,
{ {
auto ctx = transaction::StandaloneContext::Create(_vocbase); auto ctx = transaction::StandaloneContext::Create(_vocbase);
auto trx = std::make_shared<SingleCollectionTransaction>(ctx, col, access); auto trx = std::make_shared<SingleCollectionTransaction>(ctx, col, access);
query->setTransactionContext(ctx); query->setTransactionContext(ctx);
// Inject will take over responsiblilty of transaction, even on error case. // Inject will take over responsiblilty of transaction, even on error case.
query->injectTransaction(std::move(trx)); query->injectTransaction(std::move(trx));
@ -2950,8 +2975,29 @@ Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id,
TRI_ASSERT(isLockHeld(id).is(TRI_ERROR_HTTP_NOT_FOUND)); TRI_ASSERT(isLockHeld(id).is(TRI_ERROR_HTTP_NOT_FOUND));
ClusterInfo& ci = server().getFeature<ClusterFeature>().clusterInfo();
std::string vn = _vocbase.name();
try { try {
queryRegistry->insert(id, query.get(), ttl, true, true); std::function<void(void)> f =
[=]() {
try {
// Code does not matter, read only access, so we can roll back.
QueryRegistryFeature::registry()->destroy(vn, id, TRI_ERROR_QUERY_KILLED, false);
} catch (...) {
// All errors that show up here can only be
// triggered if the query is destroyed in between.
}
};
std::string comment = std::string("SynchronizeShard from ") + serverId +
" for " + col.name() + " access mode " + AccessMode::typeString(access);
auto rGuard = std::make_unique<CallbackGuard>(
ci.rebootTracker().callMeOnChange(
RebootTracker::PeerState(serverId, rebootId), f, comment));
queryRegistry->insert(id, query.get(), ttl, true, true, std::move(rGuard));
} catch (...) { } catch (...) {
// For compatibility we only return this error // For compatibility we only return this error
return {TRI_ERROR_TRANSACTION_INTERNAL, "cannot begin read transaction"}; return {TRI_ERROR_TRANSACTION_INTERNAL, "cannot begin read transaction"};

View File

@ -29,6 +29,7 @@
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Basics/Result.h" #include "Basics/Result.h"
#include "Cluster/ResultT.h" #include "Cluster/ResultT.h"
#include "Cluster/ClusterTypes.h"
#include "Replication/Syncer.h" #include "Replication/Syncer.h"
#include "Replication/common-defines.h" #include "Replication/common-defines.h"
#include "RestHandler/RestVocbaseBaseHandler.h" #include "RestHandler/RestVocbaseBaseHandler.h"
@ -476,7 +477,9 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
/// the given time to live. /// the given time to live.
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
Result createBlockingTransaction(aql::QueryId id, LogicalCollection& col, Result createBlockingTransaction(aql::QueryId id, LogicalCollection& col,
double ttl, AccessMode::Type access) const; double ttl, AccessMode::Type access,
RebootId const& rebootId,
std::string const& serverId);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief Test if we already have the read-lock /// @brief Test if we already have the read-lock

View File

@ -39,7 +39,9 @@ using namespace arangodb::rest;
RestShutdownHandler::RestShutdownHandler(application_features::ApplicationServer& server, RestShutdownHandler::RestShutdownHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
_allowDirectExecution = true;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock JSF_get_api_initiate /// @brief was docuBlock JSF_get_api_initiate

View File

@ -49,7 +49,9 @@ using namespace arangodb::rest;
RestStatusHandler::RestStatusHandler(application_features::ApplicationServer& server, RestStatusHandler::RestStatusHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
_allowDirectExecution = true;
}
RestStatus RestStatusHandler::execute() { RestStatus RestStatusHandler::execute() {
auto& server = application_features::ApplicationServer::server(); auto& server = application_features::ApplicationServer::server();
@ -77,6 +79,7 @@ RestStatus RestStatusHandler::execute() {
auto& serverFeature = server.getFeature<ServerFeature>(); auto& serverFeature = server.getFeature<ServerFeature>();
result.add("mode", VPackValue(serverFeature.operationModeString())); // to be deprecated - 3.3 compat result.add("mode", VPackValue(serverFeature.operationModeString())); // to be deprecated - 3.3 compat
result.add("operationMode", VPackValue(serverFeature.operationModeString())); result.add("operationMode", VPackValue(serverFeature.operationModeString()));
result.add("foxxApi", VPackValue(!security.isFoxxApiDisabled()));
std::string host = ServerState::instance()->getHost(); std::string host = ServerState::instance()->getHost();

View File

@ -41,7 +41,9 @@ using namespace arangodb::rest;
RestVersionHandler::RestVersionHandler(application_features::ApplicationServer& server, RestVersionHandler::RestVersionHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response) GeneralRequest* request, GeneralResponse* response)
: RestBaseHandler(server, request, response) {} : RestBaseHandler(server, request, response) {
_allowDirectExecution = true;
}
RestStatus RestVersionHandler::execute() { RestStatus RestVersionHandler::execute() {
VPackBuilder result; VPackBuilder result;

View File

@ -96,7 +96,7 @@ void ServerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addSection("vst", "Configure the VelocyStream protocol"); options->addSection("vst", "Configure the VelocyStream protocol");
options->addObsoleteOption("--vst.maxsize", "maximal size (in bytes) " options->addObsoleteOption("--vst.maxsize", "maximal size (in bytes) "
"for a VelocyPack chunk", false); "for a VelocyPack chunk", true);
#if _WIN32 #if _WIN32
options->addOption("--console.code-page", options->addOption("--console.code-page",

View File

@ -126,6 +126,7 @@ void StatisticsWorker::collectGarbage() {
// but only one task at a time. this should spread the load more evenly // but only one task at a time. this should spread the load more evenly
auto time = TRI_microtime(); auto time = TRI_microtime();
try {
if (_gcTask == GC_STATS) { if (_gcTask == GC_STATS) {
collectGarbage(statisticsCollection, time - 3600.0); // 1 hour collectGarbage(statisticsCollection, time - 3600.0); // 1 hour
_gcTask = GC_STATS_RAW; _gcTask = GC_STATS_RAW;
@ -136,6 +137,13 @@ void StatisticsWorker::collectGarbage() {
collectGarbage(statistics15Collection, time - 30.0 * 86400.0); // 30 days collectGarbage(statistics15Collection, time - 30.0 * 86400.0); // 30 days
_gcTask = GC_STATS; _gcTask = GC_STATS;
} }
} catch (basics::Exception const& ex) {
if (ex.code() != TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
// if the underlying collection does not exist, it does not matter
// that the garbage collection query failed
throw;
}
}
} }
void StatisticsWorker::collectGarbage(std::string const& name, double start) const { void StatisticsWorker::collectGarbage(std::string const& name, double start) const {
@ -1304,6 +1312,7 @@ void StatisticsWorker::run() {
uint64_t seconds = 0; uint64_t seconds = 0;
while (!isStopping() && StatisticsFeature::enabled()) { while (!isStopping() && StatisticsFeature::enabled()) {
seconds++;
try { try {
if (seconds % STATISTICS_INTERVAL == 0) { if (seconds % STATISTICS_INTERVAL == 0) {
// new stats are produced every 10 seconds // new stats are produced every 10 seconds
@ -1326,8 +1335,6 @@ void StatisticsWorker::run() {
<< "caught unknown exception in StatisticsWorker"; << "caught unknown exception in StatisticsWorker";
} }
seconds++;
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
guard.wait(1000 * 1000); guard.wait(1000 * 1000);
} }

View File

@ -816,13 +816,32 @@ arangodb::Result restoreView(arangodb::httpclient::SimpleHttpClient& httpClient,
arangodb::Result triggerFoxxHeal(arangodb::httpclient::SimpleHttpClient& httpClient) { arangodb::Result triggerFoxxHeal(arangodb::httpclient::SimpleHttpClient& httpClient) {
using arangodb::Logger; using arangodb::Logger;
using arangodb::httpclient::SimpleHttpResult; using arangodb::httpclient::SimpleHttpResult;
const std::string FoxxHealUrl = "/_api/foxx/_local/heal";
std::string body = ""; std::string body = "";
// check if the foxx api is available.
const std::string statusUrl = "/_admin/status";
std::unique_ptr<SimpleHttpResult> response( std::unique_ptr<SimpleHttpResult> response(
httpClient.request(arangodb::rest::RequestType::POST, FoxxHealUrl, httpClient.request(arangodb::rest::RequestType::POST, statusUrl,
body.c_str(), body.length())); body.c_str(), body.length()));
auto res = ::checkHttpResponse(httpClient, response, "check status", body);
if (res.ok() && response) {
try {
if(!response->getBodyVelocyPack()->slice().get("foxxApi").getBool()) {
LOG_TOPIC("9e9b9", INFO, Logger::RESTORE)
<< "skipping foxx self-healing because Foxx API is disabled";
return { };
}
} catch (...) {
//API Not available because of older version or whatever
}
}
const std::string FoxxHealUrl = "/_api/foxx/_local/heal";
response.reset(
httpClient.request(arangodb::rest::RequestType::POST, FoxxHealUrl,
body.c_str(), body.length())
);
return ::checkHttpResponse(httpClient, response, "trigger self heal", body); return ::checkHttpResponse(httpClient, response, "trigger self heal", body);
} }

View File

@ -37,6 +37,7 @@
#include "Aql/ExecutionNode.h" #include "Aql/ExecutionNode.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/QueryRegistry.h" #include "Aql/QueryRegistry.h"
#include "Cluster/RebootTracker.h"
#include "Transaction/Methods.h" #include "Transaction/Methods.h"
using namespace arangodb; using namespace arangodb;
@ -273,7 +274,8 @@ TEST(EngineInfoContainerTest,
// Mock the Registry // Mock the Registry
fakeit::When(Method(mockRegistry, insert)) fakeit::When(Method(mockRegistry, insert))
.Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease) { .Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease,
std::unique_ptr<arangodb::cluster::CallbackGuard>&) {
ASSERT_NE(id, 0); ASSERT_NE(id, 0);
ASSERT_NE(query, nullptr); ASSERT_NE(query, nullptr);
ASSERT_TRUE(isPrepared); ASSERT_TRUE(isPrepared);
@ -497,7 +499,8 @@ TEST(EngineInfoContainerTest, snippets_are_a_stack_insert_node_always_into_top_s
// handled first. With same fakeit magic we could make this ordering // handled first. With same fakeit magic we could make this ordering
// independent which is is fine as well for the production code. // independent which is is fine as well for the production code.
fakeit::When(Method(mockRegistry, insert)) fakeit::When(Method(mockRegistry, insert))
.Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease) { .Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease,
std::unique_ptr<arangodb::cluster::CallbackGuard>&) {
ASSERT_NE(id, 0); ASSERT_NE(id, 0);
ASSERT_NE(query, nullptr); ASSERT_NE(query, nullptr);
ASSERT_TRUE(isPrepared); ASSERT_TRUE(isPrepared);
@ -506,7 +509,8 @@ TEST(EngineInfoContainerTest, snippets_are_a_stack_insert_node_always_into_top_s
ASSERT_EQ(query, &queryClone); ASSERT_EQ(query, &queryClone);
secondId = id; secondId = id;
}) })
.Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease) { .Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease,
std::unique_ptr<arangodb::cluster::CallbackGuard>&) {
ASSERT_NE(id, 0); ASSERT_NE(id, 0);
ASSERT_NE(query, nullptr); ASSERT_NE(query, nullptr);
ASSERT_EQ(timeout, 600.0); ASSERT_EQ(timeout, 600.0);
@ -683,7 +687,8 @@ TEST(EngineInfoContainerTest, error_cases_cloning_of_a_query_fails_throws_an_err
// Mock the Registry // Mock the Registry
fakeit::When(Method(mockRegistry, insert)) fakeit::When(Method(mockRegistry, insert))
.Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease) { .Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease,
std::unique_ptr<arangodb::cluster::CallbackGuard>&) {
ASSERT_NE(id, 0); ASSERT_NE(id, 0);
ASSERT_NE(query, nullptr); ASSERT_NE(query, nullptr);
ASSERT_EQ(timeout, 600.0); ASSERT_EQ(timeout, 600.0);
@ -847,7 +852,8 @@ TEST(EngineInfoContainerTest, error_cases_cloning_of_a_query_fails_returns_a_nul
// Mock the Registry // Mock the Registry
fakeit::When(Method(mockRegistry, insert)) fakeit::When(Method(mockRegistry, insert))
.Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease) { .Do([&](QueryId id, Query* query, double timeout, bool isPrepared, bool keepLease,
std::unique_ptr<arangodb::cluster::CallbackGuard>&) {
ASSERT_NE(id, 0); ASSERT_NE(id, 0);
ASSERT_NE(query, nullptr); ASSERT_NE(query, nullptr);
ASSERT_EQ(timeout, 600.0); ASSERT_EQ(timeout, 600.0);

View File

@ -26,6 +26,7 @@
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
const chai = require('chai'); const chai = require('chai');
const Joi = require('joi');
const expect = chai.expect; const expect = chai.expect;
chai.Assertion.addProperty('does', function () { chai.Assertion.addProperty('does', function () {
return this; return this;
@ -87,6 +88,28 @@ describe('_api/gharial', () => {
} }
}; };
const validateGraphFormat = (graph) => {
const edgeDefinition = Joi.object({
collection: Joi.string().required(),
from: Joi.array().items(Joi.string()).required(),
to: Joi.array().items(Joi.string()).required()
});
const schema = Joi.object({
"_key": Joi.string().required(),
"_rev": Joi.string().required(),
"_id": Joi.string().required(),
name: Joi.string().required(),
numberOfShards: Joi.number().integer().min(1).required(),
replicationFactor: Joi.number().integer().min(1).required(),
minReplicationFactor: Joi.number().integer().min(1).required(),
isSmart: Joi.boolean().required(),
orphanCollections: Joi.array().items(Joi.string()).required(),
edgeDefinitions: Joi.array().items(edgeDefinition).required()
});
const res = schema.validate(graph);
expect(res.error).to.be.null;
};
beforeEach(cleanup); beforeEach(cleanup);
afterEach(cleanup); afterEach(cleanup);
@ -106,7 +129,10 @@ describe('_api/gharial', () => {
expect(db._collection(eColName)).to.be.null; expect(db._collection(eColName)).to.be.null;
expect(db._collection(vColName)).to.be.null; expect(db._collection(vColName)).to.be.null;
let req = arango.POST(url, graphDef); let req = arango.POST(url, graphDef);
expect(req).to.have.keys("error", "code", "graph");
expect(req.code).to.equal(202); expect(req.code).to.equal(202);
expect(req.error).to.be.false;
validateGraphFormat(req.graph);
// This is all async give it some time // This is all async give it some time
do { do {
@ -116,6 +142,11 @@ describe('_api/gharial', () => {
expect(db._collection(eColName)).to.not.be.null; expect(db._collection(eColName)).to.not.be.null;
expect(db._collection(vColName)).to.not.be.null; expect(db._collection(vColName)).to.not.be.null;
expect(req).to.have.keys("error", "code", "graph");
expect(req.code).to.equal(200);
expect(req.error).to.be.false;
validateGraphFormat(req.graph);
}); });
it('should create a graph with orphans', () => { it('should create a graph with orphans', () => {
@ -138,7 +169,10 @@ describe('_api/gharial', () => {
expect(db._collection(oColName)).to.be.null; expect(db._collection(oColName)).to.be.null;
expect(db._collection(oColName2)).to.be.null; expect(db._collection(oColName2)).to.be.null;
let req = arango.POST(url, graphDef); let req = arango.POST(url, graphDef);
expect(req).to.have.keys("error", "code", "graph");
expect(req.code).to.equal(202); expect(req.code).to.equal(202);
expect(req.error).to.be.false;
validateGraphFormat(req.graph);
// This is all async give it some time // This is all async give it some time
do { do {
@ -150,6 +184,72 @@ describe('_api/gharial', () => {
expect(db._collection(vColName)).to.not.be.null; expect(db._collection(vColName)).to.not.be.null;
expect(db._collection(oColName)).to.not.be.null; expect(db._collection(oColName)).to.not.be.null;
expect(db._collection(oColName2)).to.not.be.null; expect(db._collection(oColName2)).to.not.be.null;
expect(req).to.have.keys("error", "code", "graph");
expect(req.code).to.equal(200);
expect(req.error).to.be.false;
validateGraphFormat(req.graph);
});
});
describe('graph modification test suite', function () {
const vertexUrl = `${url}/${graphName}/vertex`;
const edgeUrl = `${url}/${graphName}/edge`;
beforeEach(() => {
const graphDef = {
"name": graphName,
"edgeDefinitions": [{
"collection": eColName,
"from": [vColName],
"to": [vColName]
}
],
"isSmart": false
};
expect(db._collection(eColName)).to.be.null;
expect(db._collection(vColName)).to.be.null;
let req = arango.POST(url, graphDef);
// Just make sure the graph exists
do {
wait(0.1);
req = arango.GET(url + "/" + graphName);
} while (req.code !== 200);
});
it('should list all graphs in correct format', () => {
const res = arango.GET(url);
expect(res).to.have.keys("error", "code", "graphs");
expect(res.code).to.equal(200);
expect(res.error).to.be.false;
res.graphs.map(validateGraphFormat);
});
it('should be able to add an orphan', () => {
const res = arango.POST(vertexUrl, {collection: oColName});
expect(res).to.have.keys("error", "code", "graph");
expect(res.code).to.equal(202);
expect(res.error).to.be.false;
validateGraphFormat(res.graph);
expect(db._collection(oColName)).to.not.be.null;
});
it('should be able to modify edge definition', () => {
const res = arango.PUT(`${edgeUrl}/${eColName}`, {
"collection": eColName,
"from": [vColName, oColName],
"to": [vColName]
});
expect(res).to.have.keys("error", "code", "graph");
expect(res.code).to.equal(202);
expect(res.error).to.be.false;
validateGraphFormat(res.graph);
expect(db._collection(oColName)).to.not.be.null;
}); });
}); });

View File

@ -100,6 +100,7 @@ function testSuite() {
assertTrue(result.hasOwnProperty("serverInfo")); assertTrue(result.hasOwnProperty("serverInfo"));
assertTrue(result.hasOwnProperty("server")); assertTrue(result.hasOwnProperty("server"));
assertTrue(result.hasOwnProperty("pid")); assertTrue(result.hasOwnProperty("pid"));
assertTrue(result.hasOwnProperty("foxxApi"));
}, },
testCanAccessAdminStatusRo : function() { testCanAccessAdminStatusRo : function() {
@ -111,6 +112,7 @@ function testSuite() {
assertFalse(result.hasOwnProperty("serverInfo")); assertFalse(result.hasOwnProperty("serverInfo"));
assertFalse(result.hasOwnProperty("server")); assertFalse(result.hasOwnProperty("server"));
assertFalse(result.hasOwnProperty("pid")); assertFalse(result.hasOwnProperty("pid"));
assertFalse(result.hasOwnProperty("foxxApi"));
}, },
testCanAccessAdminLogRw : function() { testCanAccessAdminLogRw : function() {

View File

@ -30,7 +30,7 @@ function sendQuery (count, async) {
} }
} }
if (async === true) { if (async === true) {
internal.wait(1); internal.wait(1, false);
} }
} }
@ -104,7 +104,27 @@ describe('AQL query analyzer', function () {
if (isServer && internal.debugCanUseFailAt()) { if (isServer && internal.debugCanUseFailAt()) {
internal.debugClearFailAt(); internal.debugClearFailAt();
} }
// kill all async tasks that will execute the query that we
// are looking for
tasks.get().forEach(function(task) {
if (task.command.match(/SLEEP\(@value\)/)) {
try {
tasks.unregister(task.id);
} catch (err) {
// not an error if this fails, as the task may have completed
// between `tasks.get()` and `tasks.unregister()`
}
}
});
// wait a bit for tasks to be finished
internal.wait(0.2, false);
// now kill all queries we are looking for that may still be
// executed
while (true) {
const list = testee.current().filter(filterQueries); const list = testee.current().filter(filterQueries);
if (list.length === 0) {
break;
}
for (let item of list) { for (let item of list) {
try { try {
testee.kill(item.id); testee.kill(item.id);
@ -112,6 +132,8 @@ describe('AQL query analyzer', function () {
// noop // noop
} }
} }
internal.wait(0.1, false);
}
}); });
if (isServer && internal.debugCanUseFailAt()) { if (isServer && internal.debugCanUseFailAt()) {
@ -182,7 +204,7 @@ describe('AQL query analyzer', function () {
if (testee.current().filter(filterQueries).length === 0) { if (testee.current().filter(filterQueries).length === 0) {
break; break;
} }
internal.wait(1); internal.wait(1, false);
} }
expect(testee.current().filter(filterQueries).length).to.equal(0); expect(testee.current().filter(filterQueries).length).to.equal(0);

View File

@ -454,12 +454,12 @@ function TaskSuite () {
assertEqual(1, t.length); assertEqual(1, t.length);
var tries = 0; var tries = 0;
while (tries++ < 10) { while (tries++ < 15) {
if (db[cn].count() === 1) { if (db[cn].count() === 1) {
return; // alright return; // alright
} }
internal.wait(1); internal.wait(2);
} }
fail(); fail();
@ -490,12 +490,12 @@ function TaskSuite () {
assertEqual("_system", task.database); assertEqual("_system", task.database);
var tries = 0; var tries = 0;
while (tries++ < 10) { while (tries++ < 15) {
if (db[cn].count() === 1) { if (db[cn].count() === 1) {
return; // alright return; // alright
} }
internal.wait(1); internal.wait(2);
} }
fail(); fail();
@ -525,13 +525,15 @@ function TaskSuite () {
assertEqual(5, task.offset); assertEqual(5, task.offset);
assertEqual("_system", task.database); assertEqual("_system", task.database);
internal.wait(5);
var tries = 0; var tries = 0;
while (tries++ < 20) { while (tries++ < 15) {
if (db[cn].count() === 1) { if (db[cn].count() === 1) {
return; // alright return; // alright
} }
internal.wait(1); internal.wait(2);
} }
// task hasn't been executed // task hasn't been executed
@ -553,13 +555,13 @@ function TaskSuite () {
var task = tasks.register({ var task = tasks.register({
name: "UnitTests1", name: "UnitTests1",
command: command, command: command,
offset: 10, offset: 15,
params: 23 params: 23
}); });
assertEqual("UnitTests1", task.name); assertEqual("UnitTests1", task.name);
assertEqual("timed", task.type); assertEqual("timed", task.type);
assertEqual(10, task.offset); assertEqual(15, task.offset);
assertEqual("_system", task.database); assertEqual("_system", task.database);
tasks.unregister(task); tasks.unregister(task);
@ -599,13 +601,13 @@ function TaskSuite () {
assertEqual("_system", task.database); assertEqual("_system", task.database);
var tries = 0; var tries = 0;
while (tries++ < 20) { while (tries++ < 15) {
if (db[cn].count() > 0) { if (db[cn].count() > 0) {
assertTrue(db[cn].byExample({ value: 17 }).toArray().length > 0); assertTrue(db[cn].byExample({ value: 17 }).toArray().length > 0);
return; // alright return; // alright
} }
internal.wait(1); internal.wait(2);
} }
fail(); fail();
@ -639,13 +641,13 @@ function TaskSuite () {
assertEqual("_system", task.database); assertEqual("_system", task.database);
var tries = 0; var tries = 0;
while (tries++ < 20) { while (tries++ < 15) {
if (db[cn].count() > 0) { if (db[cn].count() > 0) {
assertTrue(db[cn].byExample({ value: 42 }).toArray().length > 0); assertTrue(db[cn].byExample({ value: 42 }).toArray().length > 0);
return; // alright return; // alright
} }
internal.wait(1); internal.wait(2);
} }
fail(); fail();

View File

@ -55,7 +55,7 @@ function optimizerIndexesTestSuite () {
c = db._create("UnitTestsCollection"); c = db._create("UnitTestsCollection");
let docs = []; let docs = [];
for (var i = 0; i < 2000; ++i) { for (let i = 0; i < 2000; ++i) {
docs.push({ _key: "test" + i, value: i }); docs.push({ _key: "test" + i, value: i });
} }
c.insert(docs); c.insert(docs);
@ -80,6 +80,40 @@ function optimizerIndexesTestSuite () {
} }
}, },
testIndexUsedForExpansion1 : function () {
let query = "LET test = NOOPT([{ value: 1 }, { value : 2 }]) FOR doc IN " + c.name() + " FILTER doc.value IN test[*].value SORT doc.value RETURN doc.value";
let plan = AQL_EXPLAIN(query).plan;
let nodeTypes = plan.nodes.map(function(node) {
return node.type;
});
assertEqual("SingletonNode", nodeTypes[0], query);
assertNotEqual(-1, nodeTypes.indexOf("IndexNode"), query);
let results = AQL_EXECUTE(query);
assertEqual([ 1, 2 ], results.json, query);
assertEqual(0, results.stats.scannedFull);
assertTrue(results.stats.scannedIndex > 0);
},
testIndexUsedForExpansion2 : function () {
let query = "LET test = NOOPT([1, 2]) FOR doc IN " + c.name() + " FILTER doc.value IN test[*] SORT doc.value RETURN doc.value";
let plan = AQL_EXPLAIN(query).plan;
let nodeTypes = plan.nodes.map(function(node) {
return node.type;
});
assertEqual("SingletonNode", nodeTypes[0], query);
assertNotEqual(-1, nodeTypes.indexOf("IndexNode"), query);
let results = AQL_EXECUTE(query);
assertEqual([ 1, 2 ], results.json, query);
assertEqual(0, results.stats.scannedFull);
assertTrue(results.stats.scannedIndex > 0);
},
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief test same results for const access queries /// @brief test same results for const access queries
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -31,10 +31,6 @@
let db = require("@arangodb").db; let db = require("@arangodb").db;
let jsunity = require("jsunity"); let jsunity = require("jsunity");
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function optimizerRuleTestSuite () { function optimizerRuleTestSuite () {
const ruleName = "parallelize-gather"; const ruleName = "parallelize-gather";
const cn = "UnitTestsAqlOptimizerRule"; const cn = "UnitTestsAqlOptimizerRule";
@ -94,7 +90,10 @@ function optimizerRuleTestSuite () {
"FOR doc IN " + cn + " SORT doc.value1 RETURN doc", "FOR doc IN " + cn + " SORT doc.value1 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000 RETURN doc", "FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000, 1000 RETURN doc", "FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000, 1000 RETURN doc",
/* TODO ];
if (require("internal").options()["query.parallelize-gather-writes"]) {
queries.concat([
"FOR doc IN " + cn + " REMOVE doc IN " + cn, "FOR doc IN " + cn + " REMOVE doc IN " + cn,
"FOR doc IN " + cn + " REMOVE doc._key IN " + cn, "FOR doc IN " + cn + " REMOVE doc._key IN " + cn,
"FOR doc IN " + cn + " REPLACE doc WITH {} IN " + cn, "FOR doc IN " + cn + " REPLACE doc WITH {} IN " + cn,
@ -105,9 +104,40 @@ function optimizerRuleTestSuite () {
"FOR doc IN " + cn + " UPDATE doc WITH {a: 1} IN " + cn, "FOR doc IN " + cn + " UPDATE doc WITH {a: 1} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {} IN " + cn, "FOR doc IN " + cn + " UPDATE doc._key WITH {} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {a:1} IN " + cn, "FOR doc IN " + cn + " UPDATE doc._key WITH {a:1} IN " + cn,
*/ ]);
}
queries.forEach(function(query) {
let result = AQL_EXPLAIN(query,);
assertNotEqual(-1, result.plan.rules.indexOf(ruleName), query);
});
},
testRuleHasEffectWrites : function () {
let queries = [
"FOR doc IN " + cn + " RETURN doc",
"FOR doc IN " + cn + " LIMIT 1000 RETURN doc",
"FOR doc IN " + cn + " LIMIT 1000, 1000 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000, 1000 RETURN doc",
]; ];
if (require("internal").options()["query.parallelize-gather-writes"]) {
queries.concat([
"FOR doc IN " + cn + " REMOVE doc IN " + cn,
"FOR doc IN " + cn + " REMOVE doc._key IN " + cn,
"FOR doc IN " + cn + " REPLACE doc WITH {} IN " + cn,
"FOR doc IN " + cn + " REPLACE doc WITH {a: 1} IN " + cn,
"FOR doc IN " + cn + " REPLACE doc._key WITH {} IN " + cn,
"FOR doc IN " + cn + " REPLACE doc._key WITH {a:1} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc WITH {} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc WITH {a: 1} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {a:1} IN " + cn,
]);
}
queries.forEach(function(query) { queries.forEach(function(query) {
let result = AQL_EXPLAIN(query,); let result = AQL_EXPLAIN(query,);
assertNotEqual(-1, result.plan.rules.indexOf(ruleName), query); assertNotEqual(-1, result.plan.rules.indexOf(ruleName), query);

View File

@ -41,12 +41,12 @@ function aqlOptionsTestSuite () {
return { return {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief test timeout option /// @brief test maxRuntime option
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
testTimeOut : function () { testMaxRuntime : function () {
try { try {
internal.db._query("LET x = SLEEP(10) RETURN 1", {} /*bind*/, { timeout : 1} /*options*/); internal.db._query("LET x = SLEEP(10) RETURN 1", {} /*bind*/, { maxRuntime : 1} /*options*/);
fail(); fail();
} catch (e) { } catch (e) {
assertEqual(e.errorNum, errors.ERROR_QUERY_KILLED.code); assertEqual(e.errorNum, errors.ERROR_QUERY_KILLED.code);

View File

@ -1,5 +1,5 @@
/*jshint globalstrict:false, strict:false, maxlen: 500 */ /*jshint globalstrict:false, strict:false, maxlen: 500 */
/*global assertEqual, assertFalse, assertTrue, assertNotEqual, AQL_EXPLAIN, AQL_EXECUTE */ /*global assertEqual, assertFalse, assertTrue, assertNotEqual, AQL_EXPLAIN, AQL_EXECUTE, print */
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief tests that tests if collecting results from multiple /// @brief tests that tests if collecting results from multiple
@ -124,7 +124,6 @@ function gatherBlocksTestSuite() {
{ test : "q3s3if", query: query3, bindvars: bindvars3, collection: colName1, gathernodes: 1, sortmode: 'minelement' , count: 4000 , sorted: true }, { test : "q3s3if", query: query3, bindvars: bindvars3, collection: colName1, gathernodes: 1, sortmode: 'minelement' , count: 4000 , sorted: true },
{ test : "q3s9if", query: query3, bindvars: bindvars3, collection: colName2, gathernodes: 1, sortmode: 'heap' , count: documents , sorted: true }, { test : "q3s9if", query: query3, bindvars: bindvars3, collection: colName2, gathernodes: 1, sortmode: 'heap' , count: documents , sorted: true },
]; ];
let loop = 10; let loop = 10;
tests.forEach(t => { tests.forEach(t => {
var assertMessage; var assertMessage;
@ -156,10 +155,19 @@ function gatherBlocksTestSuite() {
var time = 0; var time = 0;
for(var i=0; i < loop; i++){ for(var i=0; i < loop; i++){
let start = Date.now(); let start = Date.now();
try {
rv = db._query(t.query , bindvars); rv = db._query(t.query , bindvars);
rv = rv.toArray().map(doc => { return doc.value; } ); rv = rv.toArray().map(doc => { return doc.value; } );
time += (Date.now() - start); time += (Date.now() - start);
}
catch (ex) {
print("Failed in " + t.query);
print(bindvars);
db._explain(t.query , bindvars);
print(ex);
throw ex;
}
// check number of returned documents // check number of returned documents
assertEqual(rv.length, t.count); assertEqual(rv.length, t.count);

View File

@ -1,130 +0,0 @@
/*jshint globalstrict:false, strict:false */
/*global assertFalse */
////////////////////////////////////////////////////////////////////////////////
/// @brief test the performance of removal with a skip-list index
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2013 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
/// @author Copyright 2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
var jsunity = require("jsunity");
var internal = require("internal");
var db = require("internal").db;
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite: Creation
////////////////////////////////////////////////////////////////////////////////
function SkipListPerfSuite() {
'use strict';
var cn = "UnitTestsCollectionSkiplistPerf";
var collection = null;
return {
////////////////////////////////////////////////////////////////////////////////
/// @brief set up
////////////////////////////////////////////////////////////////////////////////
setUp : function () {
internal.db._drop(cn);
collection = internal.db._create(cn, { waitForSync : false });
},
////////////////////////////////////////////////////////////////////////////////
/// @brief tear down
////////////////////////////////////////////////////////////////////////////////
tearDown : function () {
// try...catch is necessary as some tests delete the collection itself!
try {
collection.unload();
collection.drop();
}
catch (err) {
}
collection = null;
internal.wait(0.0);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test: performance of deletion with skip-list index
////////////////////////////////////////////////////////////////////////////////
testDeletionPerformance : function () {
var time = require("internal").time;
collection.ensureSkiplist("value");
var N=100000;
var p=14777; // must be coprime to N
if (db._engine().name === "rocksdb") {
N = 1000;
p = 333;
}
for (i = 0;i < N;i++) {
collection.save({value:i});
}
var l = collection.toArray();
var t = time();
var j = 0;
var x;
for (var i = 0;i < l.length;i++) {
x = l[j];
j = (j+p) % l.length;
collection.remove(x._key);
}
var t1 = time()-t;
internal.db._drop(cn);
collection = internal.db._create(cn);
collection.ensureSkiplist("value");
for (i = 0;i < N;i++) {
collection.save({value: i % 10});
}
l = collection.toArray();
t = time();
j = 0;
for (i = 0;i < l.length;i++) {
x = l[j];
j = (j+p) % l.length;
collection.remove(x._key);
}
var t2 = time()-t;
assertFalse(t2 > 5*t1,"Removal with skip-list index is slow");
}
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suites
////////////////////////////////////////////////////////////////////////////////
jsunity.run(SkipListPerfSuite);
return jsunity.done();

View File

@ -52,13 +52,33 @@ describe ArangoDB do
return false return false
end end
def wait_for_query (query, type, maxWait)
if type == "slow"
doc = ArangoDB.log_get("#{@prefix}-slow", @slow)
elsif type == "current"
doc = ArangoDB.log_get("#{@prefix}-current", @current)
end
doc.code.should eq(200)
while true
found = contains_query doc.body, query
if found
return found
end
maxWait -= 1
if maxWait == 0
return false
end
sleep 1
end
end
it "should activate tracking" do it "should activate tracking" do
doc = ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({enable: true})) doc = ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({enable: true}))
doc.code.should eq(200) doc.code.should eq(200)
end end
describe "tracking" do describe "tracking" do
before do before do
ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({enable: true, slowQueryThreshold: 20, trackSlowQueries: true})) ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({enable: true, slowQueryThreshold: 20, trackSlowQueries: true}))
end end
@ -69,22 +89,29 @@ describe ArangoDB do
after(:each) do after(:each) do
# Let the queries finish # Let the queries finish
doc = ArangoDB.log_get("#{@prefix}-current", @current)
res = JSON.parse doc.body
count = 0 count = 0
while res.length > 0 && count < 10 do while true
sleep 1
doc = ArangoDB.log_get("#{@prefix}-current", @current) doc = ArangoDB.log_get("#{@prefix}-current", @current)
res = JSON.parse doc.body res = JSON.parse doc.body
if res.length == 0
break
end
res.each do |q|
if q["query"].match(/SLEEP/)
doc = ArangoDB.log_delete(@prefix, "#{@api}/" + q["id"])
end
end
count += 1 count += 1
if count == 10
break
end
sleep 1
end end
end end
it "should track running queries" do it "should track running queries" do
send_queries send_queries
doc = ArangoDB.log_get("#{@prefix}-current", @current) found = wait_for_query @query, "current", 10
doc.code.should eq(200)
found = contains_query doc.body, @query
found.should_not eq(false) found.should_not eq(false)
found.should have_key("id") found.should have_key("id")
found["id"].should match(/^\d+$/) found["id"].should match(/^\d+$/)
@ -101,9 +128,7 @@ describe ArangoDB do
it "should track running queries, with bind parameters" do it "should track running queries, with bind parameters" do
send_queries_with_bind send_queries_with_bind
doc = ArangoDB.log_get("#{@prefix}-current-bind", @current) found = wait_for_query @queryWithBind, "current", 10
doc.code.should eq(200)
found = contains_query doc.body, @queryWithBind
found.should_not eq(false) found.should_not eq(false)
found.should have_key("id") found.should have_key("id")
found["id"].should match(/^\d+$/) found["id"].should match(/^\d+$/)
@ -120,21 +145,15 @@ describe ArangoDB do
it "should track slow queries by threshold" do it "should track slow queries by threshold" do
send_fast_queries 1, "false" send_fast_queries 1, "false"
doc = ArangoDB.log_get("#{@prefix}-slow", @slow) found = wait_for_query @fastQuery, "slow", 1
doc.code.should eq(200)
found = contains_query doc.body, @fastQuery
found.should eq(false) found.should eq(false)
ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({slowQueryThreshold: 0.1})) ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({slowQueryThreshold: 0.1}))
send_fast_queries 1, "false" send_fast_queries 1, "false"
doc = ArangoDB.log_get("#{@prefix}-current", @current) found = wait_for_query @fastQuery, "current", 1
doc.code.should eq(200)
found = contains_query doc.body, @fastQuery
found.should eq(false) found.should eq(false)
doc = ArangoDB.log_get("#{@prefix}-slow", @slow) found = wait_for_query @fastQuery, "slow", 1
doc.code.should eq(200)
found = contains_query doc.body, @fastQuery
found.should_not eq(false) found.should_not eq(false)
found.should have_key("query") found.should have_key("query")
found["query"].should eq(@fastQuery) found["query"].should eq(@fastQuery)
@ -157,17 +176,13 @@ describe ArangoDB do
end end
it "should not track slow queries if turned off" do it "should not track slow queries if turned off" do
doc = ArangoDB.log_get("#{@prefix}-slow", @slow) found = wait_for_query @fastQuery, "slow", 1
doc.code.should eq(200)
found = contains_query doc.body, @fastQuery
found.should eq(false) found.should eq(false)
ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({slowQueryThreshold: 0.1, trackSlowQueries: false})) ArangoDB.log_put("#{@prefix}-properties", @properties, :body => JSON.dump({slowQueryThreshold: 0.1, trackSlowQueries: false}))
send_fast_queries 1, "false" send_fast_queries 1, "false"
doc = ArangoDB.log_get("#{@prefix}-slow", @slow) found = wait_for_query @fastQuery, "slow", 1
doc.code.should eq(200)
found = contains_query doc.body, @fastQuery
found.should eq(false) found.should eq(false)
end end