mirror of https://gitee.com/bigwinds/arangodb
try to fix some undefined behavior on cluster startup
This commit is contained in:
parent
5ea01c02b0
commit
c8c8625727
|
@ -50,6 +50,7 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
|
||||||
startsAfter("Database");
|
startsAfter("Database");
|
||||||
startsAfter("Dispatcher");
|
startsAfter("Dispatcher");
|
||||||
startsAfter("Endpoint");
|
startsAfter("Endpoint");
|
||||||
|
startsAfter("QueryRegistry");
|
||||||
startsAfter("Scheduler");
|
startsAfter("Scheduler");
|
||||||
startsAfter("Server");
|
startsAfter("Server");
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
|
|
||||||
#include "Basics/ConditionLocker.h"
|
#include "Basics/ConditionLocker.h"
|
||||||
#include "RestServer/DatabaseFeature.h"
|
#include "RestServer/DatabaseFeature.h"
|
||||||
|
#include "RestServer/QueryRegistryFeature.h"
|
||||||
#include "VocBase/server.h"
|
#include "VocBase/server.h"
|
||||||
#include "VocBase/vocbase.h"
|
#include "VocBase/vocbase.h"
|
||||||
|
|
||||||
|
@ -279,13 +280,12 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) {
|
||||||
0, true);
|
0, true);
|
||||||
|
|
||||||
return append_entries_t(t, true);
|
return append_entries_t(t, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @brief load persistent state
|
// @brief load persistent state
|
||||||
bool Agent::load () {
|
bool Agent::load() {
|
||||||
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
|
DatabaseFeature* database =
|
||||||
ApplicationServer::lookupFeature("Database"));
|
ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
|
|
||||||
auto vocbase = database->vocbase();
|
auto vocbase = database->vocbase();
|
||||||
|
|
||||||
|
@ -310,7 +310,9 @@ bool Agent::load () {
|
||||||
_readDB.start(this);
|
_readDB.start(this);
|
||||||
|
|
||||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality.";
|
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality.";
|
||||||
_constituent.start(vocbase);
|
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
|
||||||
|
TRI_ASSERT(queryRegistry != nullptr);
|
||||||
|
_constituent.start(vocbase, queryRegistry);
|
||||||
|
|
||||||
if (_config.supervision) {
|
if (_config.supervision) {
|
||||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting cluster sanity facilities";
|
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting cluster sanity facilities";
|
||||||
|
|
|
@ -370,8 +370,10 @@ void Constituent::beginShutdown() {
|
||||||
Thread::beginShutdown();
|
Thread::beginShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Constituent::start (TRI_vocbase_t* vocbase) {
|
bool Constituent::start(TRI_vocbase_t* vocbase,
|
||||||
|
aql::QueryRegistry* queryRegistry) {
|
||||||
_vocbase = vocbase;
|
_vocbase = vocbase;
|
||||||
|
_queryRegistry = queryRegistry;
|
||||||
return Thread::start();
|
return Thread::start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,9 +96,9 @@ public:
|
||||||
size_t size() const;
|
size_t size() const;
|
||||||
|
|
||||||
/// @brief Orderly shutdown of thread
|
/// @brief Orderly shutdown of thread
|
||||||
void beginShutdown () override;
|
void beginShutdown() override;
|
||||||
|
|
||||||
bool start (TRI_vocbase_t* vocbase);
|
bool start(TRI_vocbase_t* vocbase, aql::QueryRegistry*);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
|
|
@ -821,6 +821,8 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(
|
||||||
bool const isCoordinator =
|
bool const isCoordinator =
|
||||||
arangodb::ServerState::instance()->isCoordinator(role);
|
arangodb::ServerState::instance()->isCoordinator(role);
|
||||||
bool const isDBServer = arangodb::ServerState::instance()->isDBServer(role);
|
bool const isDBServer = arangodb::ServerState::instance()->isDBServer(role);
|
||||||
|
|
||||||
|
TRI_ASSERT(queryRegistry != nullptr);
|
||||||
|
|
||||||
ExecutionEngine* engine = nullptr;
|
ExecutionEngine* engine = nullptr;
|
||||||
|
|
||||||
|
@ -836,7 +838,6 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(
|
||||||
|
|
||||||
if (isCoordinator) {
|
if (isCoordinator) {
|
||||||
// instantiate the engine on the coordinator
|
// instantiate the engine on the coordinator
|
||||||
|
|
||||||
auto inst =
|
auto inst =
|
||||||
std::make_unique<CoordinatorInstanciator>(query, queryRegistry);
|
std::make_unique<CoordinatorInstanciator>(query, queryRegistry);
|
||||||
plan->root()->walk(inst.get());
|
plan->root()->walk(inst.get());
|
||||||
|
|
|
@ -391,6 +391,8 @@ void Query::registerWarning(int code, char const* details) {
|
||||||
/// to be able to only prepare a query from VelocyPack and then store it in the
|
/// to be able to only prepare a query from VelocyPack and then store it in the
|
||||||
/// QueryRegistry.
|
/// QueryRegistry.
|
||||||
QueryResult Query::prepare(QueryRegistry* registry) {
|
QueryResult Query::prepare(QueryRegistry* registry) {
|
||||||
|
TRI_ASSERT(registry != nullptr);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
init();
|
init();
|
||||||
enterState(PARSING);
|
enterState(PARSING);
|
||||||
|
@ -513,6 +515,8 @@ QueryResult Query::prepare(QueryRegistry* registry) {
|
||||||
|
|
||||||
/// @brief execute an AQL query
|
/// @brief execute an AQL query
|
||||||
QueryResult Query::execute(QueryRegistry* registry) {
|
QueryResult Query::execute(QueryRegistry* registry) {
|
||||||
|
TRI_ASSERT(registry != nullptr);
|
||||||
|
|
||||||
std::unique_ptr<AqlWorkStack> work;
|
std::unique_ptr<AqlWorkStack> work;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -62,6 +62,7 @@ ClusterFeature::ClusterFeature(application_features::ApplicationServer* server)
|
||||||
startsAfter("Dispatcher");
|
startsAfter("Dispatcher");
|
||||||
startsAfter("Scheduler");
|
startsAfter("Scheduler");
|
||||||
startsAfter("V8Dealer");
|
startsAfter("V8Dealer");
|
||||||
|
startsAfter("Database");
|
||||||
}
|
}
|
||||||
|
|
||||||
ClusterFeature::~ClusterFeature() {
|
ClusterFeature::~ClusterFeature() {
|
||||||
|
|
|
@ -68,18 +68,15 @@ void CheckVersionFeature::validateOptions(
|
||||||
|
|
||||||
ApplicationServer::forceDisableFeatures(_nonServerFeatures);
|
ApplicationServer::forceDisableFeatures(_nonServerFeatures);
|
||||||
|
|
||||||
LoggerFeature* logger =
|
LoggerFeature* logger = ApplicationServer::getFeature<LoggerFeature>("Logger");
|
||||||
dynamic_cast<LoggerFeature*>(ApplicationServer::lookupFeature("Logger"));
|
|
||||||
logger->disableThreaded();
|
logger->disableThreaded();
|
||||||
|
|
||||||
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
|
DatabaseFeature* database = ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
ApplicationServer::lookupFeature("Database"));
|
|
||||||
database->disableReplicationApplier();
|
database->disableReplicationApplier();
|
||||||
database->disableCompactor();
|
database->disableCompactor();
|
||||||
database->enableCheckVersion();
|
database->enableCheckVersion();
|
||||||
|
|
||||||
V8DealerFeature* v8dealer = dynamic_cast<V8DealerFeature*>(
|
V8DealerFeature* v8dealer = ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||||
ApplicationServer::lookupFeature("V8Dealer"));
|
|
||||||
|
|
||||||
v8dealer->setNumberContexts(1);
|
v8dealer->setNumberContexts(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,23 +182,25 @@ void DatabaseFeature::updateContexts() {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
|
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
|
||||||
|
TRI_ASSERT(queryRegistry != nullptr);
|
||||||
|
|
||||||
auto server = DatabaseServerFeature::SERVER;
|
auto server = DatabaseServerFeature::SERVER;
|
||||||
|
TRI_ASSERT(server != nullptr);
|
||||||
|
|
||||||
auto vocbase = _vocbase;
|
auto vocbase = _vocbase;
|
||||||
|
|
||||||
V8DealerFeature* dealer = dynamic_cast<V8DealerFeature*>(
|
V8DealerFeature* dealer =
|
||||||
ApplicationServer::lookupFeature("V8Dealer"));
|
ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||||
|
|
||||||
if (dealer != nullptr) {
|
dealer->defineContextUpdate(
|
||||||
dealer->defineContextUpdate(
|
[queryRegistry, server, vocbase](
|
||||||
[queryRegistry, server, vocbase](
|
v8::Isolate* isolate, v8::Handle<v8::Context> context, size_t i) {
|
||||||
v8::Isolate* isolate, v8::Handle<v8::Context> context, size_t i) {
|
TRI_InitV8VocBridge(isolate, context, queryRegistry, server, vocbase,
|
||||||
TRI_InitV8VocBridge(isolate, context, queryRegistry, server, vocbase,
|
i);
|
||||||
i);
|
TRI_InitV8Queries(isolate, context);
|
||||||
TRI_InitV8Queries(isolate, context);
|
TRI_InitV8Cluster(isolate, context);
|
||||||
TRI_InitV8Cluster(isolate, context);
|
},
|
||||||
},
|
vocbase);
|
||||||
vocbase);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseFeature::shutdownCompactor() {
|
void DatabaseFeature::shutdownCompactor() {
|
||||||
|
|
|
@ -71,15 +71,18 @@ void QueryRegistryFeature::prepare() {
|
||||||
std::pair<std::string, size_t> cacheProperties{_queryCacheMode,
|
std::pair<std::string, size_t> cacheProperties{_queryCacheMode,
|
||||||
_queryCacheEntries};
|
_queryCacheEntries};
|
||||||
arangodb::aql::QueryCache::instance()->setProperties(cacheProperties);
|
arangodb::aql::QueryCache::instance()->setProperties(cacheProperties);
|
||||||
|
|
||||||
|
// create the query registery
|
||||||
|
_queryRegistry.reset(new aql::QueryRegistry());
|
||||||
|
QUERY_REGISTRY = _queryRegistry.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryRegistryFeature::start() {
|
void QueryRegistryFeature::start() {
|
||||||
// create the query registery
|
|
||||||
_queryRegistry.reset(new aql::QueryRegistry());
|
|
||||||
DatabaseServerFeature::SERVER->_queryRegistry = _queryRegistry.get();
|
DatabaseServerFeature::SERVER->_queryRegistry = _queryRegistry.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryRegistryFeature::stop() {
|
void QueryRegistryFeature::stop() {
|
||||||
// clear the query registery
|
// clear the query registery
|
||||||
DatabaseServerFeature::SERVER->_queryRegistry = nullptr;
|
DatabaseServerFeature::SERVER->_queryRegistry = nullptr;
|
||||||
|
// TODO: reset QUERY_REGISTRY as well?
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,17 +130,17 @@ void ServerFeature::validateOptions(std::shared_ptr<ProgramOptions>) {
|
||||||
"RestServer", "Scheduler", "Ssl",
|
"RestServer", "Scheduler", "Ssl",
|
||||||
"Supervisor"});
|
"Supervisor"});
|
||||||
|
|
||||||
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
|
DatabaseFeature* database =
|
||||||
ApplicationServer::lookupFeature("Database"));
|
ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
database->disableReplicationApplier();
|
database->disableReplicationApplier();
|
||||||
|
|
||||||
StatisticsFeature* statistics = dynamic_cast<StatisticsFeature*>(
|
StatisticsFeature* statistics =
|
||||||
ApplicationServer::lookupFeature("Statistics"));
|
ApplicationServer::getFeature<StatisticsFeature>("Statistics");
|
||||||
statistics->disableStatistics();
|
statistics->disableStatistics();
|
||||||
}
|
}
|
||||||
|
|
||||||
V8DealerFeature* v8dealer = dynamic_cast<V8DealerFeature*>(
|
V8DealerFeature* v8dealer =
|
||||||
ApplicationServer::lookupFeature("V8Dealer"));
|
ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||||
|
|
||||||
if (_operationMode == OperationMode::MODE_SCRIPT ||
|
if (_operationMode == OperationMode::MODE_SCRIPT ||
|
||||||
_operationMode == OperationMode::MODE_UNITTESTS) {
|
_operationMode == OperationMode::MODE_UNITTESTS) {
|
||||||
|
@ -162,8 +162,8 @@ void ServerFeature::validateOptions(std::shared_ptr<ProgramOptions>) {
|
||||||
|
|
||||||
void ServerFeature::start() {
|
void ServerFeature::start() {
|
||||||
if (_operationMode != OperationMode::MODE_CONSOLE && _restServer) {
|
if (_operationMode != OperationMode::MODE_CONSOLE && _restServer) {
|
||||||
auto scheduler = dynamic_cast<SchedulerFeature*>(
|
auto scheduler =
|
||||||
ApplicationServer::lookupFeature("Scheduler"));
|
ApplicationServer::getFeature<SchedulerFeature>("Scheduler");
|
||||||
|
|
||||||
if (scheduler != nullptr) {
|
if (scheduler != nullptr) {
|
||||||
scheduler->buildControlCHandler();
|
scheduler->buildControlCHandler();
|
||||||
|
@ -236,8 +236,8 @@ std::string ServerFeature::operationModeString(OperationMode mode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int ServerFeature::runUnitTests() {
|
int ServerFeature::runUnitTests() {
|
||||||
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
|
DatabaseFeature* database =
|
||||||
ApplicationServer::lookupFeature("Database"));
|
ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
V8Context* context =
|
V8Context* context =
|
||||||
V8DealerFeature::DEALER->enterContext(database->vocbase(), true);
|
V8DealerFeature::DEALER->enterContext(database->vocbase(), true);
|
||||||
|
|
||||||
|
@ -295,8 +295,8 @@ int ServerFeature::runUnitTests() {
|
||||||
int ServerFeature::runScript() {
|
int ServerFeature::runScript() {
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
|
||||||
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
|
DatabaseFeature* database =
|
||||||
ApplicationServer::lookupFeature("Database"));
|
ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
V8Context* context =
|
V8Context* context =
|
||||||
V8DealerFeature::DEALER->enterContext(database->vocbase(), true);
|
V8DealerFeature::DEALER->enterContext(database->vocbase(), true);
|
||||||
|
|
||||||
|
|
|
@ -101,6 +101,7 @@ V8DealerFeature::V8DealerFeature(
|
||||||
startsAfter("Action");
|
startsAfter("Action");
|
||||||
startsAfter("Database");
|
startsAfter("Database");
|
||||||
startsAfter("Dispatcher");
|
startsAfter("Dispatcher");
|
||||||
|
startsAfter("QueryRegistry");
|
||||||
startsAfter("Scheduler");
|
startsAfter("Scheduler");
|
||||||
startsAfter("V8Platform");
|
startsAfter("V8Platform");
|
||||||
startsAfter("WorkMonitor");
|
startsAfter("WorkMonitor");
|
||||||
|
@ -178,8 +179,8 @@ void V8DealerFeature::start() {
|
||||||
|
|
||||||
// try to guess a suitable number of contexts
|
// try to guess a suitable number of contexts
|
||||||
if (0 == _nrContexts && 0 == _forceNrContexts) {
|
if (0 == _nrContexts && 0 == _forceNrContexts) {
|
||||||
DispatcherFeature* dispatcher = dynamic_cast<DispatcherFeature*>(
|
DispatcherFeature* dispatcher =
|
||||||
ApplicationServer::lookupFeature("Dispatcher"));
|
ApplicationServer::getFeature<DispatcherFeature>("Dispatcher");
|
||||||
|
|
||||||
if (dispatcher != nullptr) {
|
if (dispatcher != nullptr) {
|
||||||
_nrContexts = dispatcher->concurrency();
|
_nrContexts = dispatcher->concurrency();
|
||||||
|
@ -215,8 +216,8 @@ void V8DealerFeature::start() {
|
||||||
|
|
||||||
applyContextUpdates();
|
applyContextUpdates();
|
||||||
|
|
||||||
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
|
DatabaseFeature* database =
|
||||||
ApplicationServer::lookupFeature("Database"));
|
ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
|
|
||||||
loadJavascript(database->vocbase(), "server/initialize.js");
|
loadJavascript(database->vocbase(), "server/initialize.js");
|
||||||
|
|
||||||
|
|
|
@ -3447,6 +3447,7 @@ void TRI_InitV8VocBridge(v8::Isolate* isolate, v8::Handle<v8::Context> context,
|
||||||
static_cast<V8TransactionContext*>(v8g->_transactionContext)->makeGlobal();
|
static_cast<V8TransactionContext*>(v8g->_transactionContext)->makeGlobal();
|
||||||
|
|
||||||
// register the query registry
|
// register the query registry
|
||||||
|
TRI_ASSERT(queryRegistry != nullptr);
|
||||||
v8g->_queryRegistry = queryRegistry;
|
v8g->_queryRegistry = queryRegistry;
|
||||||
|
|
||||||
// register the server
|
// register the server
|
||||||
|
|
|
@ -1071,8 +1071,7 @@ static void DatabaseManager(void* data) {
|
||||||
|
|
||||||
usleep(DATABASE_MANAGER_INTERVAL);
|
usleep(DATABASE_MANAGER_INTERVAL);
|
||||||
// The following is only necessary after a wait:
|
// The following is only necessary after a wait:
|
||||||
auto queryRegistry =
|
auto queryRegistry = server->_queryRegistry.load();
|
||||||
static_cast<arangodb::aql::QueryRegistry*>(server->_queryRegistry);
|
|
||||||
|
|
||||||
if (queryRegistry != nullptr) {
|
if (queryRegistry != nullptr) {
|
||||||
queryRegistry->expireQueries();
|
queryRegistry->expireQueries();
|
||||||
|
@ -1092,8 +1091,7 @@ static void DatabaseManager(void* data) {
|
||||||
for (auto& p : theLists->_coordinatorDatabases) {
|
for (auto& p : theLists->_coordinatorDatabases) {
|
||||||
TRI_vocbase_t* vocbase = p.second;
|
TRI_vocbase_t* vocbase = p.second;
|
||||||
TRI_ASSERT(vocbase != nullptr);
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
auto cursorRepository = static_cast<arangodb::CursorRepository*>(
|
auto cursorRepository = vocbase->_cursorRepository;
|
||||||
vocbase->_cursorRepository);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cursorRepository->garbageCollect(false);
|
cursorRepository->garbageCollect(false);
|
||||||
|
|
Loading…
Reference in New Issue