1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into engine-api

This commit is contained in:
jsteemann 2017-02-09 17:53:34 +01:00
commit 05804be1a4
17 changed files with 298 additions and 29 deletions

View File

@ -59,6 +59,7 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
startsAfter("MMFilesWalRecovery");
startsAfter("Scheduler");
startsAfter("Server");
startsAfter("Aql");
}
AgencyFeature::~AgencyFeature() {}

View File

@ -92,6 +92,7 @@ AqlFunctionFeature::AqlFunctionFeature(
"RELATIONAL_ARRAY_NOT_IN"}} {
setOptional(false);
startsAfter("EngineSelector");
startsAfter("Aql");
}
// This feature does not have any options

View File

@ -41,6 +41,7 @@
#include "Basics/fasthash.h"
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "RestServer/AqlFeature.h"
#include "Utils/Transaction.h"
#include "Utils/AqlTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
@ -166,6 +167,12 @@ Query::Query(bool contextOwnedByExterior, TRI_vocbase_t* vocbase,
_contextOwnedByExterior(contextOwnedByExterior),
_killed(false),
_isModificationQuery(false) {
AqlFeature* aql = AqlFeature::lease();
if (aql == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
// std::cout << TRI_CurrentThreadId() << ", QUERY " << this << " CTOR: " <<
// queryString << "\n";
double tracing = getNumericOption("tracing", 0);
@ -234,6 +241,11 @@ Query::Query(bool contextOwnedByExterior, TRI_vocbase_t* vocbase,
_killed(false),
_isModificationQuery(false) {
AqlFeature* aql = AqlFeature::lease();
if (aql == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
LOG_TOPIC(DEBUG, Logger::QUERIES)
<< TRI_microtime() - _startTime << " "
<< "Query::Query queryStruct: " << queryStruct->slice().toJson()
@ -295,6 +307,7 @@ Query::~Query() {
LOG_TOPIC(DEBUG, Logger::QUERIES)
<< TRI_microtime() - _startTime << " "
<< "Query::~Query this: " << (uintptr_t) this;
AqlFeature::unlease();
}
/// @brief clone a query

View File

@ -264,3 +264,29 @@ void QueryRegistry::expireQueries() {
}
}
}
/// @brief return number of registered queries
size_t QueryRegistry::numberRegisteredQueries() {
READ_LOCKER(readLocker, _lock);
size_t sum = 0;
for (auto const&m : _queries) {
sum += m.second.size();
}
return sum;
}
/// @brief for shutdown, we need to shut down all queries:
void QueryRegistry::destroyAll() {
std::vector<std::pair<std::string, QueryId>> allQueries;
{
READ_LOCKER(readlock, _lock);
for (auto& p : _queries) {
for (auto& q : p.second) {
allQueries.emplace_back(p.first, q.first);
}
}
}
for (auto& p : allQueries) {
destroy(p.first, p.second, TRI_ERROR_SHUTTING_DOWN);
}
}

View File

@ -74,6 +74,12 @@ class QueryRegistry {
/// @brief expireQueries, this deletes all expired queries from the registry
void expireQueries();
/// @brief return number of registered queries
size_t numberRegisteredQueries();
/// @brief for shutdown, we need to shut down all queries:
void destroyAll();
private:
/// @brief a struct for all information regarding one query in the registry
struct QueryInfo {

View File

@ -267,6 +267,7 @@ SET(ARANGOD_SOURCES
RestHandler/RestVocbaseBaseHandler.cpp
RestHandler/RestWalHandler.cpp
RestHandler/WorkMonitorHandler.cpp
RestServer/AqlFeature.cpp
RestServer/BootstrapFeature.cpp
RestServer/CheckVersionFeature.cpp
RestServer/ConsoleFeature.cpp

View File

@ -282,7 +282,7 @@ std::shared_ptr<ClusterComm> ClusterComm::instance() {
// an assertion despite the fact that we have checks for nullptr in
// all places that call this method. Assertions have no effect in released
// code at the customer's site.
// TRI_ASSERT(_theInstance != nullptr); //temporarily disabled until AQLFeature is done
TRI_ASSERT(_theInstance != nullptr);
return _theInstance;
}
@ -801,14 +801,15 @@ ClusterCommThread::~ClusterCommThread() { shutdown(); }
////////////////////////////////////////////////////////////////////////////////
void ClusterCommThread::beginShutdown() {
// Note that this is called from the destructor of the ClusterComm singleton
// object. This means that our pointer _cc is still valid and the condition
// variable in it is still OK. However, this method is called from a
// different thread than the ClusterCommThread. Therefore we can still
// use the condition variable to wake up the ClusterCommThread.
Thread::beginShutdown();
auto cc = ClusterComm::instance();
if (cc != nullptr) {
CONDITION_LOCKER(guard, cc->somethingToSend);
guard.signal();
}
CONDITION_LOCKER(guard, _cc->somethingToSend);
guard.signal();
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -53,10 +53,13 @@ TraverserEngineRegistry::~TraverserEngineRegistry() {
/// @brief Create a new Engine and return it's id
TraverserEngineID TraverserEngineRegistry::createNew(TRI_vocbase_t* vocbase,
VPackSlice engineInfo) {
VPackSlice engineInfo,
double ttl) {
TraverserEngineID id = TRI_NewTickServer();
TRI_ASSERT(id != 0);
auto info = std::make_unique<EngineInfo>(vocbase, engineInfo);
info->_timeToLive = ttl;
info->_expires = TRI_microtime() + ttl;
WRITE_LOCKER(writeLocker, _lock);
TRI_ASSERT(_engines.find(id) == _engines.end());
@ -89,7 +92,7 @@ BaseTraverserEngine* TraverserEngineRegistry::get(TraverserEngineID id) {
}
/// @brief Returns the engine to the registry. Someone else can now use it.
void TraverserEngineRegistry::returnEngine(TraverserEngineID id) {
void TraverserEngineRegistry::returnEngine(TraverserEngineID id, double ttl) {
WRITE_LOCKER(writeLocker, _lock);
auto e = _engines.find(id);
if (e == _engines.end()) {
@ -102,6 +105,11 @@ void TraverserEngineRegistry::returnEngine(TraverserEngineID id) {
auto engine = e->second;
_engines.erase(e);
delete engine;
} else {
if (ttl >= 0.0) {
e->second->_timeToLive = ttl;
}
e->second->_expires = TRI_microtime() + e->second->_timeToLive;
}
}
}
@ -131,3 +139,35 @@ void TraverserEngineRegistry::destroy(TraverserEngineID id, bool doLock) {
delete engine;
}
/// @brief expireEngines
void TraverserEngineRegistry::expireEngines() {
double now = TRI_microtime();
std::vector<TraverserEngineID> toDelete;
{
WRITE_LOCKER(writeLocker, _lock);
for (auto& y : _engines) {
// y.first is an TraverserEngineID and
// y.second is an EngineInfo*
EngineInfo*& ei = y.second;
if (!ei->_isInUse && now > ei->_expires) {
toDelete.emplace_back(y.first);
}
}
}
for (auto& p : toDelete) {
try { // just in case
destroy(p, true);
} catch (...) {
}
}
}
/// @brief return number of registered engines
size_t TraverserEngineRegistry::numberRegisteredEngines() {
READ_LOCKER(readLocker, _lock);
return _engines.size();
}

View File

@ -48,7 +48,8 @@ class TraverserEngineRegistry {
/// It can be referred to by the returned
/// ID. If the returned ID is 0 something
/// internally went wrong.
TraverserEngineID createNew(TRI_vocbase_t*, arangodb::velocypack::Slice);
TraverserEngineID createNew(TRI_vocbase_t*, arangodb::velocypack::Slice,
double ttl = 600.0);
/// @brief Get the engine with the given ID.
/// TODO Test what happens if this pointer
@ -60,8 +61,16 @@ class TraverserEngineRegistry {
/// @brief Returns the engine with the given id.
/// NOTE: Caller is NOT allowed to use the
/// engine after this return.
void returnEngine(TraverserEngineID);
/// engine after this return. If the ttl
/// is negative (the default), then the old
/// one is taken again.
void returnEngine(TraverserEngineID, double ttl = -1.0);
/// @brief expireEngines, this deletes all expired engines from the registry
void expireEngines();
/// @brief return number of registered engines
size_t numberRegisteredEngines();
private:

View File

@ -0,0 +1,112 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
////////////////////////////////////////////////////////////////////////////////
#include "RestServer/AqlFeature.h"
#include "Aql/QueryList.h"
#include "Aql/QueryRegistry.h"
#include "Basics/MutexLocker.h"
#include "Cluster/TraverserEngineRegistry.h"
#include "Logger/Logger.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/TraverserEngineRegistryFeature.h"
using namespace arangodb;
using namespace arangodb::application_features;
AqlFeature* AqlFeature::_AQL = nullptr;
Mutex AqlFeature::_aqlFeatureMutex;
AqlFeature::AqlFeature(
application_features::ApplicationServer* server)
: ApplicationFeature(server, "Aql"), _numberLeases(0), _isStopped(false) {
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("Scheduler");
startsAfter("MMFilesLogfileManager");
startsAfter("Database");
startsAfter("Cluster");
startsAfter("V8Platform");
startsAfter("WorkMonitor");
startsAfter("QueryRegistry");
}
AqlFeature* AqlFeature::lease() {
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
AqlFeature* aql = AqlFeature::_AQL;
if (aql == nullptr) {
return nullptr;
}
if (aql->_isStopped) {
return nullptr;
}
++aql->_numberLeases;
return aql;
}
void AqlFeature::unlease() {
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
AqlFeature* aql = AqlFeature::_AQL;
if (aql == nullptr) {
return;
}
--aql->_numberLeases;
}
void AqlFeature::start() {
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
_AQL = this;
LOG_TOPIC(DEBUG, Logger::QUERIES) << "AQL feature started";
}
void AqlFeature::stop() {
{
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
_isStopped = true; // prevent new AQL queries from being launched
}
LOG_TOPIC(DEBUG, Logger::QUERIES) << "AQL feature stopped";
QueryRegistryFeature::QUERY_REGISTRY->destroyAll();
// Wait until all AQL queries are done
while (true) {
size_t m, n, o;
{
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
m = _numberLeases;
n = QueryRegistryFeature::QUERY_REGISTRY->numberRegisteredQueries();
o = TraverserEngineRegistryFeature::TRAVERSER_ENGINE_REGISTRY
->numberRegisteredEngines();
}
if (n == 0 && m == 0 && o == 0) {
break;
}
LOG_TOPIC(INFO, Logger::QUERIES) << "AQLFeature shutdown, waiting for "
<< o << " registered traverser engines to terminate and for "
<< n << " registered queries to terminate and for "
<< m << " feature leases to be released";
usleep(500000);
}
MUTEX_LOCKER(locker, AqlFeature::_aqlFeatureMutex);
AqlFeature::_AQL = nullptr;
}

View File

@ -0,0 +1,49 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
////////////////////////////////////////////////////////////////////////////////
#ifndef APPLICATION_FEATURES_AQL_FEATURE_H
#define APPLICATION_FEATURES_AQL_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/Mutex.h"
namespace arangodb {
class AqlFeature final : public application_features::ApplicationFeature {
static AqlFeature* _AQL;
static Mutex _aqlFeatureMutex;
public:
explicit AqlFeature(application_features::ApplicationServer*);
public:
static AqlFeature* lease();
static void unlease();
void start() override final;
void stop() override final;
private:
uint64_t _numberLeases;
bool _isStopped;
};
}
#endif

View File

@ -32,6 +32,7 @@
#include "Basics/StringUtils.h"
#include "Basics/files.h"
#include "Cluster/ServerState.h"
#include "Cluster/TraverserEngineRegistry.h"
#include "Cluster/v8-cluster.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Logger/Logger.h"
@ -40,6 +41,7 @@
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/TraverserEngineRegistryFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/MMFilesPersistentIndex.h"
@ -195,11 +197,16 @@ void DatabaseManagerThread::run() {
// The following is only necessary after a wait:
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
if (queryRegistry != nullptr) {
queryRegistry->expireQueries();
}
auto engineRegistry
= TraverserEngineRegistryFeature::TRAVERSER_ENGINE_REGISTRY;
if (engineRegistry != nullptr) {
engineRegistry->expireEngines();
}
// on a coordinator, we have no cleanup threads for the databases
// so we have to do cursor cleanup here
if (++cleanupCycles >= 10 &&
@ -1131,13 +1138,8 @@ int DatabaseFeature::createApplicationDirectory(std::string const& name,
res = TRI_CreateRecursiveDirectory(path.c_str(), systemError, errorMessage);
if (res == TRI_ERROR_NO_ERROR) {
if (MMFilesLogfileManager::instance()->isInRecovery()) {
LOG(TRACE) << "created application directory '" << path
<< "' for database '" << name << "'";
} else {
LOG(INFO) << "created application directory '" << path
<< "' for database '" << name << "'";
}
LOG(TRACE) << "created application directory '" << path
<< "' for database '" << name << "'";
} else if (res == TRI_ERROR_FILE_EXISTS) {
LOG(INFO) << "unable to create application directory '" << path
<< "' for database '" << name << "': " << errorMessage;

View File

@ -46,7 +46,6 @@ DatabasePathFeature::DatabasePathFeature(ApplicationServer* server)
startsAfter("Random");
startsAfter("Temp");
startsAfter("WorkMonitor");
startsAfter("Statistics");
}
void DatabasePathFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {

View File

@ -53,6 +53,7 @@ UpgradeFeature::UpgradeFeature(
startsAfter("Database");
startsAfter("MMFilesWalRecovery");
startsAfter("V8Dealer");
startsAfter("Aql");
}
void UpgradeFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {

View File

@ -49,6 +49,7 @@
#include "Logger/LoggerFeature.h"
#include "ProgramOptions/ProgramOptions.h"
#include "Random/RandomFeature.h"
#include "RestServer/AqlFeature.h"
#include "RestServer/BootstrapFeature.h"
#include "RestServer/CheckVersionFeature.h"
#include "RestServer/ConsoleFeature.h"
@ -121,6 +122,7 @@ static int runServer(int argc, char** argv) {
server.addFeature(new AgencyFeature(&server));
server.addFeature(new aql::AqlFunctionFeature(&server));
server.addFeature(new AuthenticationFeature(&server));
server.addFeature(new AqlFeature(&server));
server.addFeature(new BootstrapFeature(&server));
server.addFeature(new CheckVersionFeature(&server, &ret, nonServerFeatures));
server.addFeature(new ClusterFeature(&server));

View File

@ -122,6 +122,7 @@ StatisticsFeature::StatisticsFeature(
application_features::ApplicationServer* server)
: ApplicationFeature(server, "Statistics"), _statistics(true) {
startsAfter("Logger");
startsAfter("Aql");
}
void StatisticsFeature::collectOptions(
@ -167,18 +168,19 @@ void StatisticsFeature::prepare() {
new StatisticsDistribution(TRI_BytesSentDistributionVectorStatistics);
TRI_BytesReceivedDistributionStatistics =
new StatisticsDistribution(TRI_BytesReceivedDistributionVectorStatistics);
}
void StatisticsFeature::start() {
STATISTICS = this;
if (!_statistics) {
return;
}
ServerStatistics::initialize();
ConnectionStatistics::initialize();
RequestStatistics::initialize();
}
void StatisticsFeature::start() {
if (!_statistics) {
return;
}
_statisticsThread.reset(new StatisticsThread);

View File

@ -1258,7 +1258,7 @@ function shutdownInstance (instanceInfo, options) {
let agentsKilled = false;
let nrAgents = n - nonagencies.length;
let timeout = 60;
let timeout = 666;
if (options.valgrind) {
timeout *= 10;
}
@ -1286,7 +1286,11 @@ function shutdownInstance (instanceInfo, options) {
if (arangod.exitStatus.status === 'RUNNING') {
if ((require('internal').time() - shutdownTime) > timeout) {
let localTimeout = timeout;
if (arangod.role === 'agent') {
localTimeout = localTimeout + 60;
}
if ((require('internal').time() - shutdownTime) > localTimeout) {
print('forcefully terminating ' + yaml.safeDump(arangod.pid) +
' after ' + timeout + 's grace period; marking crashy.');
serverCrashed = true;