1
0
Fork 0

address review concerns (#8544)

This commit is contained in:
Jan 2019-03-23 22:10:19 +01:00 committed by GitHub
parent 329acea21c
commit 0a1ab07c64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 76 additions and 24 deletions

View File

@ -146,7 +146,7 @@ ExecutionEngineResult EngineInfoContainerCoordinator::buildEngines(
// destroy all query snippets in case of error
auto guard = scopeGuard([&dbname, &registry, &coordinatorQueryIds]() {
for (auto const& it : coordinatorQueryIds) {
registry->destroy(dbname, it, TRI_ERROR_INTERNAL);
registry->destroy(dbname, it, TRI_ERROR_INTERNAL, false);
}
});

View File

@ -22,6 +22,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "QueryRegistry.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/Query.h"
@ -60,7 +61,7 @@ QueryRegistry::~QueryRegistry() {
// holding the lock
for (auto& p : toDelete) {
try { // just in case
destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED);
destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED, false);
} catch (...) {
}
}
@ -82,6 +83,9 @@ void QueryRegistry::insert(QueryId id, Query* query, double ttl,
// now insert into table of running queries
{
WRITE_LOCKER(writeLocker, _lock);
if (_disallowInserts) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
auto result = _queries[vocbase.name()].emplace(id, std::move(p));
if (!result.second) {
@ -119,7 +123,13 @@ Query* QueryRegistry::open(TRI_vocbase_t* vocbase, QueryId id) {
qi->_isOpen = true;
if (!qi->_isPrepared) {
qi->_query->prepare(this);
try {
qi->_query->prepare(this);
} catch (...) {
qi->_isOpen = false;
qi->_expires = 0.0;
throw;
}
qi->_isPrepared = true;
}
@ -152,7 +162,13 @@ void QueryRegistry::close(TRI_vocbase_t* vocbase, QueryId id, double ttl) {
if (!qi->_isPrepared) {
qi->_isPrepared = true;
qi->_query->prepare(this);
try {
qi->_query->prepare(this);
} catch (...) {
qi->_isOpen = false;
qi->_expires = 0.0;
throw;
}
}
qi->_isOpen = false;
@ -161,7 +177,7 @@ void QueryRegistry::close(TRI_vocbase_t* vocbase, QueryId id, double ttl) {
}
/// @brief destroy
void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCode) {
void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened) {
std::unique_ptr<QueryInfo> queryInfo;
{
@ -181,9 +197,10 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCod
TRI_ERROR_BAD_PARAMETER, "query with given vocbase and id not found");
}
if (q->second->_isOpen) {
if (q->second->_isOpen && !ignoreOpened) {
// query in use by another thread/request
q->second->_query->kill();
q->second->_expires = 0.0;
return;
}
@ -213,11 +230,6 @@ void QueryRegistry::destroy(std::string const& vocbase, QueryId id, int errorCod
LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "query with id " << id << " is now destroyed";
}
/// @brief destroy
void QueryRegistry::destroy(TRI_vocbase_t* vocbase, QueryId id, int errorCode) {
destroy(vocbase->name(), id, errorCode);
}
ResultT<bool> QueryRegistry::isQueryInUse(TRI_vocbase_t* vocbase, QueryId id) {
LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "Test if query with id " << id << "is in use.";
@ -268,7 +280,7 @@ void QueryRegistry::expireQueries() {
for (auto& p : toDelete) {
try { // just in case
LOG_TOPIC(DEBUG, arangodb::Logger::AQL) << "timeout for query with id " << p.second;
destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED);
destroy(p.first, p.second, TRI_ERROR_TRANSACTION_ABORTED, false);
} catch (...) {
}
}
@ -299,13 +311,32 @@ void QueryRegistry::destroyAll() {
try {
LOG_TOPIC(DEBUG, arangodb::Logger::AQL)
<< "Timeout for query with id " << p.second << " due to shutdown";
destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN);
destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN, false);
} catch (...) {
// ignore any errors here
}
}
size_t count = 0;
{
READ_LOCKER(readlock, _lock);
for (auto& p : _queries) {
count += p.second.size();
}
}
if (count > 0) {
LOG_TOPIC(INFO, arangodb::Logger::AQL)
<< "number of remaining queries in query registry at shutdown: " << count;
}
}
void QueryRegistry::disallowInserts() {
WRITE_LOCKER(writelock, _lock);
_disallowInserts = true;
// from here on, there shouldn't be any more inserts into the registry
}
QueryRegistry::QueryInfo::QueryInfo(QueryId id, Query* query, double ttl, bool isPrepared)
: _vocbase(&(query->vocbase())),
_id(id),

View File

@ -38,7 +38,7 @@ class Query;
class QueryRegistry {
public:
explicit QueryRegistry(double defTTL) : _defaultTTL(defTTL) {}
explicit QueryRegistry(double defTTL) : _defaultTTL(defTTL), _disallowInserts(false) {}
TEST_VIRTUAL ~QueryRegistry();
@ -75,9 +75,10 @@ class QueryRegistry {
/// from the same thread that has opened it! Note that if the query is
/// "open", then this will set the "killed" flag in the query and do not
/// more.
TEST_VIRTUAL void destroy(std::string const& vocbase, QueryId id, int errorCode);
void destroy(TRI_vocbase_t* vocbase, QueryId id, int errorCode);
/// if the ignoreOpened flag is set, it means the query will be shut down
/// and removed regardless if it is in use by anything else. this is only
/// safe to call if the current thread is currently using the query itself
TEST_VIRTUAL void destroy(std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened);
ResultT<bool> isQueryInUse(TRI_vocbase_t* vocbase, QueryId id);
@ -90,6 +91,9 @@ class QueryRegistry {
/// @brief for shutdown, we need to shut down all queries:
void destroyAll();
/// @brief from here on, disallow entering new queries into the registry
void disallowInserts();
/// @brief return the default TTL value
TEST_VIRTUAL double defaultTTL() const { return _defaultTTL; }
@ -130,6 +134,8 @@ class QueryRegistry {
/// @brief the default TTL value
double const _defaultTTL;
bool _disallowInserts;
};
} // namespace aql

View File

@ -830,7 +830,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q
closeGuard.cancel();
// delete the query from the registry
_queryRegistry->destroy(&_vocbase, _qId, errorCode);
_queryRegistry->destroy(_vocbase.name(), _qId, errorCode, false);
_qId = 0;
answerBuilder.add(StaticStrings::Error, VPackValue(res.fail()));
answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber()));

View File

@ -2733,7 +2733,9 @@ Result RestReplicationHandler::createBlockingTransaction(aql::QueryId id,
if (isTombstoned(id)) {
try {
// Code does not matter, read only access, so we can roll back.
queryRegistry->destroy(&_vocbase, id, TRI_ERROR_QUERY_KILLED);
// we can ignore the openness here, as it was our thread that had
// inserted the query just a couple of instructions before
queryRegistry->destroy(_vocbase.name(), id, TRI_ERROR_QUERY_KILLED, true /*ignoreOpened*/);
} catch (...) {
// Maybe thrown in shutdown.
}
@ -2780,7 +2782,7 @@ ResultT<bool> RestReplicationHandler::cancelBlockingTransaction(aql::QueryId id)
}
try {
// Code does not matter, read only access, so we can roll back.
queryRegistry->destroy(&_vocbase, id, TRI_ERROR_QUERY_KILLED);
queryRegistry->destroy(_vocbase.name(), id, TRI_ERROR_QUERY_KILLED, false);
} catch (...) {
// All errors that show up here can only be
// triggered if the query is destroyed in between.

View File

@ -177,6 +177,17 @@ void QueryRegistryFeature::prepare() {
void QueryRegistryFeature::start() {}
void QueryRegistryFeature::beginShutdown() {
TRI_ASSERT(_queryRegistry != nullptr);
_queryRegistry->disallowInserts();
}
void QueryRegistryFeature::stop() {
TRI_ASSERT(_queryRegistry != nullptr);
_queryRegistry->disallowInserts();
_queryRegistry->destroyAll();
}
void QueryRegistryFeature::unprepare() {
// clear the query registery
QUERY_REGISTRY.store(nullptr, std::memory_order_release);

View File

@ -44,6 +44,8 @@ class QueryRegistryFeature final : public application_features::ApplicationFeatu
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void beginShutdown() override final;
void stop() override final;
void unprepare() override final;
bool trackSlowQueries() const { return _trackSlowQueries; }

View File

@ -647,8 +647,8 @@ TEST_CASE("EngineInfoContainerCoordinator", "[aql][cluster][coordinator]") {
.AlwaysReturn(&block);
fakeit::When(OverloadedMethod(mockRegistry, destroy,
void(std::string const&, QueryId, int)))
.Do([&](std::string const& vocbase, QueryId id, int errorCode) {
void(std::string const&, QueryId, int, bool)))
.Do([&](std::string const& vocbase, QueryId id, int errorCode, bool ignoreOpened) {
REQUIRE(vocbase == dbname);
REQUIRE(id == secondId);
REQUIRE(errorCode == TRI_ERROR_INTERNAL);
@ -742,7 +742,7 @@ TEST_CASE("EngineInfoContainerCoordinator", "[aql][cluster][coordinator]") {
// Assert unregister of second engine.
fakeit::Verify(OverloadedMethod(mockRegistry, destroy,
void(std::string const&, QueryId, int)))
void(std::string const&, QueryId, int, bool)))
.Exactly(1);
}
@ -788,7 +788,7 @@ TEST_CASE("EngineInfoContainerCoordinator", "[aql][cluster][coordinator]") {
fakeit::Verify(Method(mockRegistry, insert)).Exactly(1);
// Assert unregister of second engine.
fakeit::Verify(OverloadedMethod(mockRegistry, destroy, void(std::string const&, QueryId, int))).Exactly(0);
fakeit::Verify(OverloadedMethod(mockRegistry, destroy, void(std::string const&, QueryId, int, bool))).Exactly(0);
}
*/