1
0
Fork 0

added LogfileManager as feature

This commit is contained in:
Frank Celler 2016-04-22 21:15:14 -07:00
parent d6e8046992
commit c46eb8e65e
20 changed files with 764 additions and 505 deletions

View File

@ -1,15 +1,27 @@
!CHAPTER General Upgrade Information
!SUBSECTION Recommended upgrade procedure
!SUBSECTION Recommended major upgrade procedure
*TODO*
!SUBSECTION Recommended minor upgrade procedure
To upgrade an existing ArangoDB database to a newer version of ArangoDB
(e.g. 2.2 to 2.3, or 2.3 to 2.4), the following method is recommended:
(e.g. 3.0 to 3.1, or 3.3 to 3.4), the following method is recommended:
* Check the *CHANGELOG* and the [list of incompatible changes](../Upgrading/UpgradingChanges28.md) for API or other changes in the new version of ArangoDB and make sure your applications can deal with them
* Check the *CHANGELOG* and the
[list of incompatible changes](../Upgrading/UpgradingChanges28.md) for API or
other changes in the new version of ArangoDB and make sure your applications
can deal with them
* Stop the "old" arangod service or binary
* Copy the entire "old" data directory to a safe place (that is, a backup)
* Install the new version of ArangoDB and start the server with the *--database.upgrade* option once. This might write to the logfile of ArangoDB, so you may want to check the logs for any issues before going on.
* Start the "new" arangod service or binary regularly and check the logs for any issues. When you're confident everything went well, you may want to check the database directory for any files with the ending *.old*. These files are created by ArangoDB during upgrades and can be safely removed manually later.
* Install the new version of ArangoDB and start the server with
the *--database.upgrade* option once. This might write to the logfile of ArangoDB,
so you may want to check the logs for any issues before going on.
* Start the "new" arangod service or binary regularly and check the logs for any
issues. When you're confident everything went well, you may want to check the
database directory for any files with the ending *.old*. These files are
created by ArangoDB during upgrades and can be safely removed manually later.
If anything goes wrong during or shortly after the upgrade:
@ -17,4 +29,7 @@ If anything goes wrong during or shortly after the upgrade:
* Revert to the "old" arangod binary and restore the "old" data directory
* Start the "old" version again
It is not supported to use datafiles created or modified by a newer version of ArangoDB with an older ArangoDB version. For example, it is unsupported and is likely to cause problems when using 2.3 datafiles with an ArangoDB 2.2 instance.
It is not supported to use datafiles created or modified by a newer
version of ArangoDB with an older ArangoDB version. For example, it is
unsupported and is likely to cause problems when using 2.3 datafiles
with an ArangoDB 2.2 instance.

View File

@ -7,6 +7,8 @@
* [ARM](Installing/ARM.md)
* [Compiling](Installing/Compiling.md)
* [Upgrading](Installing/Upgrading.md)
* [Incompatible changes in 3.0](Upgrading/UpgradingChanges30.md)
* [Upgrading to 3.0](Upgrading/Upgrading30.md)
* [Incompatible changes in 2.8](Upgrading/UpgradingChanges28.md)
* [Upgrading to 2.8](Upgrading/Upgrading28.md)
* [Incompatible changes in 2.7](Upgrading/UpgradingChanges27.md)

View File

@ -0,0 +1,7 @@
!CHAPTER Upgrading to ArangoDB 3.0
Please read the following sections if you upgrade from a previous
version to ArangoDB 3.0. Please be sure that you have checked the list
of [changes in 3.0](../Upgrading/UpgradingChanges30.md) before
upgrading.

View File

@ -0,0 +1,37 @@
!CHAPTER Incompatible changes in ArangoDB 3.0
It is recommended to check the following list of incompatible changes **before**
upgrading to ArangoDB 2.8, and adjust any client programs if necessary.
!SECTION Command Line Options
Quite a few options in ArangoDB 2 where double negations (like
`disable-authentication`). In ArangoDB 3 these are now express as positives
(e. g. `authentication`). Also the options between the various programm have
being unified. For example, the logger options are now the same over all client
and server programs.
!SECTION Logging
Logging now supports log topics. You can controll these by specifying a log
topic in front of a log level or an output. For example
```
--log.level startup=trace --log.level info
```
will log messages concerning startup at trace level, everything else at info
level.
```
--log.output requests=file://requests.log --log.level requests=trace
```
will log all requests to a file called `requests.log`.
!SECTION log.file
`--log.file filename` ist still available for convenience. It is a shortcut
for `--log.output file://filename`.

View File

@ -23,16 +23,6 @@
#include "State.h"
#include "Aql/Query.h"
#include "Basics/VelocyPackHelper.h"
#include "RestServer/DatabaseFeature.h"
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/collection.h"
#include "VocBase/vocbase.h"
#include <velocypack/Buffer.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
@ -42,6 +32,16 @@
#include <sstream>
#include <thread>
#include "Aql/Query.h"
#include "Basics/VelocyPackHelper.h"
#include "RestServer/QueryRegistryFeature.h"
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/collection.h"
#include "VocBase/vocbase.h"
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::consensus;
@ -240,10 +240,7 @@ bool State::loadCollection(std::string const& name) {
aql.c_str(), aql.size(), bindVars, nullptr,
arangodb::aql::PART_MAIN);
DatabaseFeature* database = dynamic_cast<DatabaseFeature*>(
ApplicationServer::lookupFeature("Database"));
auto queryResult = query.execute(database->queryRegistry());
auto queryResult = query.execute(QueryRegistryFeature::QUERY_REGISTRY);
if (queryResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details);

View File

@ -218,9 +218,11 @@ add_executable(${BIN_ARANGOD}
RestServer/ConsoleFeature.cpp
RestServer/ConsoleThread.cpp
RestServer/DatabaseFeature.cpp
RestServer/DatabaseServerFeature.cpp
RestServer/EndpointFeature.cpp
RestServer/FileDescriptorsFeature.cpp
RestServer/FrontendFeature.cpp
RestServer/QueryRegistryFeature.cpp
RestServer/RestServerFeature.cpp
RestServer/ServerFeature.cpp
RestServer/UpgradeFeature.cpp

View File

@ -37,7 +37,7 @@
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabaseServerFeature.h"
#include "SimpleHttpClient/ConnectionManager.h"
#include "VocBase/server.h"
@ -398,7 +398,7 @@ void ClusterFeature::start() {
}
// start heartbeat thread
_heartbeatThread = new HeartbeatThread(DatabaseFeature::DATABASE->server(),
_heartbeatThread = new HeartbeatThread(DatabaseServerFeature::SERVER,
_agencyCallbackRegistry.get(),
_heartbeatInterval * 1000, 5);

View File

@ -26,6 +26,7 @@
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabaseServerFeature.h"
#include "V8Server/V8Context.h"
#include "V8Server/V8DealerFeature.h"
#include "V8Server/v8-query.h"
@ -132,7 +133,7 @@ void CheckVersionFeature::checkVersion() {
LOG(DEBUG) << "running database version check";
// can do this without a lock as this is the startup
auto server = DatabaseFeature::DATABASE->server();
auto server = DatabaseServerFeature::SERVER;
auto unuser = server->_databasesProtector.use();
auto theLists = server->_databasesLists.load();

View File

@ -22,16 +22,15 @@
#include "DatabaseFeature.h"
#include "Aql/QueryCache.h"
#include "Aql/QueryRegistry.h"
#include "Basics/StringUtils.h"
#include "Basics/ThreadPool.h"
#include "Cluster/v8-cluster.h"
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "Rest/Version.h"
#include "RestServer/DatabaseServerFeature.h"
#include "RestServer/RestServerFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "V8Server/V8DealerFeature.h"
#include "V8Server/v8-query.h"
#include "V8Server/v8-vocbase.h"
@ -51,16 +50,11 @@ DatabaseFeature::DatabaseFeature(ApplicationServer* server)
: ApplicationFeature(server, "Database"),
_directory(""),
_maximalJournalSize(TRI_JOURNAL_DEFAULT_MAXIMAL_SIZE),
_queryTracking(true),
_queryCacheMode("off"),
_queryCacheEntries(128),
_indexThreads(2),
_defaultWaitForSync(false),
_forceSyncProperties(true),
_ignoreDatafileErrors(false),
_throwCollectionNotLoadedError(false),
_vocbase(nullptr),
_server(nullptr),
_isInitiallyEmpty(false),
_replicationApplier(true),
_disableCompactor(false),
@ -68,13 +62,8 @@ DatabaseFeature::DatabaseFeature(ApplicationServer* server)
_upgrade(false) {
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("FileDescriptors");
startsAfter("Language");
startsAfter("Logger");
startsAfter("Random");
startsAfter("Temp");
startsAfter("WorkMonitor");
startsAfter("Statistics");
startsAfter("DatabaseServer");
startsAfter("LogfileManager");
}
void DatabaseFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
@ -101,11 +90,6 @@ void DatabaseFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"turned off",
new BooleanParameter(&_forceSyncProperties));
options->addHiddenOption(
"--database.index-threads",
"threads to start for parallel background index creation",
new UInt64Parameter(&_indexThreads));
options->addHiddenOption(
"--database.ignore-datafile-errors",
"load collections even if datafiles may contain errors",
@ -120,21 +104,6 @@ void DatabaseFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"--database.replication-applier",
"switch to enable or disable the replication applier",
new BooleanParameter(&_replicationApplier));
options->addSection("query", "Configure queries");
options->addOption("--query.tracking", "wether to track queries",
new BooleanParameter(&_queryTracking));
options->addOption("--query.cache-mode",
"mode for the AQL query cache (on, off, demand)",
new StringParameter(&_queryCacheMode));
options->addOption("--query.cache-entries",
"maximum number of results in query cache per database",
new UInt64Parameter(&_queryCacheEntries));
wal::LogfileManager::collectOptions(options);
}
void DatabaseFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
@ -159,11 +128,6 @@ void DatabaseFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
// strip trailing separators
_databasePath = StringUtils::rTrim(_directory, TRI_DIR_SEPARATOR_STR);
// some arbitrary limit
if (_indexThreads > 128) {
_indexThreads = 128;
}
if (_maximalJournalSize < TRI_JOURNAL_MINIMAL_SIZE) {
LOG(FATAL) << "invalid value for '--database.maximal-journal-size'. "
"expected at least "
@ -172,18 +136,6 @@ void DatabaseFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
}
}
void DatabaseFeature::prepare() {
// set global query tracking flag
arangodb::aql::Query::DisableQueryTracking(!_queryTracking);
// configure the query cache
{
std::pair<std::string, size_t> cacheProperties{_queryCacheMode,
_queryCacheEntries};
arangodb::aql::QueryCache::instance()->setProperties(cacheProperties);
}
}
void DatabaseFeature::start() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::start";
@ -202,26 +154,7 @@ void DatabaseFeature::start() {
// set throw collection not loaded behavior
TRI_SetThrowCollectionNotLoadedVocBase(_throwCollectionNotLoadedError);
// create the server
TRI_InitServerGlobals();
_server.reset(new TRI_server_t());
// create the query registery
_queryRegistry.reset(new aql::QueryRegistry());
_server->_queryRegistry = _queryRegistry.get();
// start the WAL manager (but do not open it yet)
LOG(TRACE) << "starting WAL logfile manager";
wal::LogfileManager::initialize(&_databasePath, _server.get());
if (!wal::LogfileManager::instance()->prepare() ||
!wal::LogfileManager::instance()->start()) {
// unable to initialize & start WAL logfile manager
LOG(FATAL) << "unable to start WAL logfile manager";
FATAL_ERROR_EXIT();
}
// init key generator
KeyGenerator::Initialize();
// open all databases
@ -239,26 +172,16 @@ void DatabaseFeature::start() {
void DatabaseFeature::stop() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::stop";
// clear the query registery
_server->_queryRegistry = nullptr;
// close all databases
closeDatabases();
// delete the server
TRI_StopServer(_server.get());
// clear singleton
DATABASE = nullptr;
// turn off index threads
_indexPool.reset();
LOG(INFO) << "ArangoDB has been shut down";
}
void DatabaseFeature::updateContexts() {
_vocbase = TRI_UseDatabaseServer(_server.get(), TRI_VOC_SYSTEM_DATABASE);
_vocbase = TRI_UseDatabaseServer(DatabaseServerFeature::SERVER,
TRI_VOC_SYSTEM_DATABASE);
if (_vocbase == nullptr) {
LOG(FATAL)
@ -266,8 +189,8 @@ void DatabaseFeature::updateContexts() {
FATAL_ERROR_EXIT();
}
auto queryRegistry = _queryRegistry.get();
auto server = _server.get();
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
auto server = DatabaseServerFeature::SERVER;
auto vocbase = _vocbase;
V8DealerFeature* dealer = dynamic_cast<V8DealerFeature*>(
@ -287,8 +210,8 @@ void DatabaseFeature::updateContexts() {
}
void DatabaseFeature::shutdownCompactor() {
auto unuser = _server->_databasesProtector.use();
auto theLists = _server->_databasesLists.load();
auto unuser = DatabaseServerFeature::SERVER->_databasesProtector.use();
auto theLists = DatabaseServerFeature::SERVER->_databasesLists.load();
for (auto& p : theLists->_databases) {
TRI_vocbase_t* vocbase = p.second;
@ -312,7 +235,8 @@ void DatabaseFeature::openDatabases() {
TRI_vocbase_defaults_t defaults;
// override with command-line options
defaults.defaultMaximalSize = static_cast<TRI_voc_size_t>(_maximalJournalSize);
defaults.defaultMaximalSize =
static_cast<TRI_voc_size_t>(_maximalJournalSize);
defaults.defaultWaitForSync = _defaultWaitForSync;
defaults.forceSyncProperties = _forceSyncProperties;
@ -322,7 +246,8 @@ void DatabaseFeature::openDatabases() {
if (rest != nullptr) {
defaults.requireAuthentication = rest->authentication();
defaults.requireAuthenticationUnixSockets = rest->authenticationUnixSockets();
defaults.requireAuthenticationUnixSockets =
rest->authenticationUnixSockets();
defaults.authenticateSystemOnly = rest->authenticationSystemOnly();
} else {
defaults.requireAuthentication = true;
@ -332,23 +257,20 @@ void DatabaseFeature::openDatabases() {
TRI_ASSERT(_server != nullptr);
if (_indexThreads > 0) {
_indexPool.reset(new ThreadPool(_indexThreads, "IndexBuilder"));
}
bool const iterateMarkersOnOpen =
!wal::LogfileManager::instance()->hasFoundLastTick();
int res = TRI_InitServer(
_server.get(), _indexPool.get(), _databasePath.c_str(), &defaults,
!_replicationApplier, _disableCompactor, iterateMarkersOnOpen);
DatabaseServerFeature::SERVER, DatabaseServerFeature::INDEX_POOL,
_databasePath.c_str(), &defaults, !_replicationApplier, _disableCompactor,
iterateMarkersOnOpen);
if (res != TRI_ERROR_NO_ERROR) {
LOG(FATAL) << "cannot create server instance: out of memory";
FATAL_ERROR_EXIT();
}
res = TRI_StartServer(_server.get(), _checkVersion, _upgrade);
res = TRI_StartServer(DatabaseServerFeature::SERVER, _checkVersion, _upgrade);
if (res != TRI_ERROR_NO_ERROR) {
if (_checkVersion && res == TRI_ERROR_ARANGO_EMPTY_DATADIR) {
@ -367,10 +289,6 @@ void DatabaseFeature::closeDatabases() {
// stop the replication appliers so all replication transactions can end
if (_replicationApplier) {
TRI_StopReplicationAppliersServer(_server.get());
TRI_StopReplicationAppliersServer(DatabaseServerFeature::SERVER);
}
// enforce logfile manager shutdown so we are sure no one else will
// write to the logs
wal::LogfileManager::instance()->stop();
}

View File

@ -26,17 +26,8 @@
#include "ApplicationFeatures/ApplicationFeature.h"
struct TRI_vocbase_t;
struct TRI_server_t;
namespace arangodb {
namespace basics {
class ThreadPool;
}
namespace aql {
class QueryRegistry;
}
class DatabaseFeature final : public application_features::ApplicationFeature {
public:
static DatabaseFeature* DATABASE;
@ -47,14 +38,11 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
public:
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void stop() override final;
public:
TRI_vocbase_t* vocbase() const { return _vocbase; }
TRI_server_t* server() const { return _server.get(); }
aql::QueryRegistry* queryRegistry() const { return _queryRegistry.get(); }
bool ignoreDatafileErrors() const { return _ignoreDatafileErrors; }
bool isInitiallyEmpty() const { return _isInitiallyEmpty; }
@ -64,13 +52,11 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
void enableCheckVersion() { _checkVersion = true; }
void enableUpgrade() { _upgrade = true; }
std::string const& directory() { return _directory; }
private:
std::string _directory;
uint64_t _maximalJournalSize;
bool _queryTracking;
std::string _queryCacheMode;
uint64_t _queryCacheEntries;
uint64_t _indexThreads;
bool _defaultWaitForSync;
bool _forceSyncProperties;
bool _ignoreDatafileErrors;
@ -84,10 +70,7 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
private:
TRI_vocbase_t* _vocbase;
std::unique_ptr<TRI_server_t> _server;
std::unique_ptr<aql::QueryRegistry> _queryRegistry;
std::string _databasePath;
std::unique_ptr<basics::ThreadPool> _indexPool;
bool _isInitiallyEmpty;
bool _replicationApplier;
bool _disableCompactor;

View File

@ -0,0 +1,103 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
////////////////////////////////////////////////////////////////////////////////
#include "DatabaseServerFeature.h"
#include "Basics/ThreadPool.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "VocBase/server.h"
#include "Wal/LogfileManager.h"
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::basics;
using namespace arangodb::options;
TRI_server_t* DatabaseServerFeature::SERVER;
basics::ThreadPool* DatabaseServerFeature::INDEX_POOL;
DatabaseServerFeature::DatabaseServerFeature(ApplicationServer* server)
: ApplicationFeature(server, "DatabaseServer"),
_indexThreads(2),
_server(nullptr) {
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("FileDescriptors");
startsAfter("Language");
startsAfter("Logger");
startsAfter("Random");
startsAfter("Temp");
startsAfter("WorkMonitor");
startsAfter("Statistics");
}
void DatabaseServerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::collectOptions";
options->addSection("database", "Configure the database");
options->addHiddenOption(
"--database.index-threads",
"threads to start for parallel background index creation",
new UInt64Parameter(&_indexThreads));
}
void DatabaseServerFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::validateOptions";
// some arbitrary limit
if (_indexThreads > 128) {
_indexThreads = 128;
}
}
void DatabaseServerFeature::start() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::start";
// create the index thread pool
if (_indexThreads > 0) {
_indexPool.reset(new ThreadPool(_indexThreads, "IndexBuilder"));
INDEX_POOL = _indexPool.get();
}
// create the server
TRI_InitServerGlobals();
_server.reset(new TRI_server_t());
SERVER = _server.get();
}
void DatabaseServerFeature::stop() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::stop";
// turn off index threads
INDEX_POOL = nullptr;
_indexPool.reset();
// delete the server
TRI_StopServer(_server.get());
SERVER = nullptr;
_server.reset(nullptr);
// done
LOG(INFO) << "ArangoDB has been shut down";
}

View File

@ -0,0 +1,63 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
////////////////////////////////////////////////////////////////////////////////
#ifndef APPLICATION_FEATURES_DATABASE_SERVER_FEATURE_H
#define APPLICATION_FEATURES_DATABASE_SERVER_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
struct TRI_server_t;
namespace arangodb {
namespace basics {
class ThreadPool;
}
class DatabaseServerFeature final
: public application_features::ApplicationFeature {
public:
static TRI_server_t* SERVER;
static basics::ThreadPool* INDEX_POOL;
public:
explicit DatabaseServerFeature(
application_features::ApplicationServer* server);
public:
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void start() override final;
void stop() override final;
private:
uint64_t _indexThreads = 2;
public:
TRI_server_t* server() const { return _server.get(); }
private:
std::unique_ptr<TRI_server_t> _server;
std::unique_ptr<basics::ThreadPool> _indexPool;
};
}
#endif

View File

@ -146,7 +146,7 @@ void FileDescriptorsFeature::adjustFileDescriptors() {
LOG(INFO) << "file-descriptors (nofiles) new hard limit is "
<< StringifyLimitValue(rlim.rlim_max) << ", new soft limit is "
<< ", soft limit is " << StringifyLimitValue(rlim.rlim_cur);
<< StringifyLimitValue(rlim.rlim_cur);
}
// the select backend has more restrictions

View File

@ -0,0 +1,94 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
////////////////////////////////////////////////////////////////////////////////
#include "QueryRegistryFeature.h"
#include "Aql/QueryCache.h"
#include "Aql/QueryRegistry.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/DatabaseServerFeature.h"
#include "VocBase/server.h"
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::basics;
using namespace arangodb::options;
aql::QueryRegistry* QueryRegistryFeature::QUERY_REGISTRY = nullptr;
QueryRegistryFeature::QueryRegistryFeature(ApplicationServer* server)
: ApplicationFeature(server, "QueryRegistry") {
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("DatabaseServer");
}
void QueryRegistryFeature::collectOptions(
std::shared_ptr<ProgramOptions> options) {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::collectOptions";
options->addSection("query", "Configure queries");
options->addOption("--query.tracking", "wether to track queries",
new BooleanParameter(&_queryTracking));
options->addOption("--query.cache-mode",
"mode for the AQL query cache (on, off, demand)",
new StringParameter(&_queryCacheMode));
options->addOption("--query.cache-entries",
"maximum number of results in query cache per database",
new UInt64Parameter(&_queryCacheEntries));
}
void QueryRegistryFeature::validateOptions(
std::shared_ptr<ProgramOptions> options) {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::validateOptions";
}
void QueryRegistryFeature::prepare() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::prepare";
// set global query tracking flag
arangodb::aql::Query::DisableQueryTracking(!_queryTracking);
// configure the query cache
std::pair<std::string, size_t> cacheProperties{_queryCacheMode,
_queryCacheEntries};
arangodb::aql::QueryCache::instance()->setProperties(cacheProperties);
}
void QueryRegistryFeature::start() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::start";
// create the query registery
_queryRegistry.reset(new aql::QueryRegistry());
DatabaseServerFeature::SERVER->_queryRegistry = _queryRegistry.get();
}
void QueryRegistryFeature::stop() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::stop";
// clear the query registery
DatabaseServerFeature::SERVER->_queryRegistry = nullptr;
}

View File

@ -0,0 +1,60 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
////////////////////////////////////////////////////////////////////////////////
#ifndef APPLICATION_FEATURES_QUERY_REGISTRY_FEATUREx_H
#define APPLICATION_FEATURES_QUERY_REGISTRY_FEATUREx_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
namespace arangodb {
namespace aql {
class QueryRegistry;
}
class QueryRegistryFeature final : public application_features::ApplicationFeature {
public:
static aql::QueryRegistry* QUERY_REGISTRY;
public:
explicit QueryRegistryFeature(application_features::ApplicationServer* server);
public:
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void stop() override final;
private:
bool _queryTracking = true;
std::string _queryCacheMode = "off";
uint64_t _queryCacheEntries = 128;
public:
aql::QueryRegistry* queryRegistry() const { return _queryRegistry.get(); }
private:
std::unique_ptr<aql::QueryRegistry> _queryRegistry;
};
}
#endif

View File

@ -60,6 +60,8 @@
#include "RestHandler/RestVersionHandler.h"
#include "RestHandler/WorkMonitorHandler.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabaseServerFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/EndpointFeature.h"
#include "RestServer/ServerFeature.h"
#include "Scheduler/SchedulerFeature.h"
@ -204,10 +206,9 @@ void RestServerFeature::start() {
V8DealerFeature::DEALER->loadJavascript(vocbase, "server/server.js");
_httpOptions._vocbase = vocbase;
auto server = DatabaseFeature::DATABASE->server();
_handlerFactory.reset(
new HttpHandlerFactory(_authenticationRealm, _defaultApiCompatibility,
_allowMethodOverride, &SetRequestContext, server));
_handlerFactory.reset(new HttpHandlerFactory(
_authenticationRealm, _defaultApiCompatibility, _allowMethodOverride,
&SetRequestContext, DatabaseServerFeature::SERVER));
defineHandlers();
buildServers();
@ -231,7 +232,7 @@ void RestServerFeature::start() {
<< (_authenticationUnixSockets ? "on" : "off");
#endif
}
LOG(INFO) << "ArangoDB (version " << ARANGODB_VERSION_FULL
<< ") is ready for business. Have fun!";
}
@ -255,8 +256,9 @@ void RestServerFeature::stop() {
}
void RestServerFeature::buildServers() {
EndpointFeature* endpoint =
application_features::ApplicationServer::getFeature<EndpointFeature>("Endpoint");
EndpointFeature* endpoint =
application_features::ApplicationServer::getFeature<EndpointFeature>(
"Endpoint");
// unencrypted HTTP endpoints
HttpServer* httpServer = new HttpServer(
@ -270,7 +272,7 @@ void RestServerFeature::buildServers() {
// ssl endpoints
if (endpointList.hasSsl()) {
SslFeature* ssl =
SslFeature* ssl =
application_features::ApplicationServer::getFeature<SslFeature>("Ssl");
// check the ssl context
@ -294,15 +296,17 @@ void RestServerFeature::buildServers() {
}
void RestServerFeature::defineHandlers() {
AgencyFeature* agency =
application_features::ApplicationServer::getFeature<AgencyFeature>("Agency");
AgencyFeature* agency =
application_features::ApplicationServer::getFeature<AgencyFeature>(
"Agency");
TRI_ASSERT(agency != nullptr);
ClusterFeature* cluster =
application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster");
application_features::ApplicationServer::getFeature<ClusterFeature>(
"Cluster");
TRI_ASSERT(cluster != nullptr);
auto queryRegistry = DatabaseFeature::DATABASE->queryRegistry();
auto queryRegistry = QueryRegistryFeature::QUERY_REGISTRY;
// ...........................................................................
// /_msg

View File

@ -26,6 +26,7 @@
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabaseServerFeature.h"
#include "V8/v8-globals.h"
#include "V8Server/V8Context.h"
#include "V8Server/V8DealerFeature.h"
@ -119,7 +120,7 @@ void UpgradeFeature::start() {
void UpgradeFeature::upgradeDatabase() {
LOG(TRACE) << "starting database init/upgrade";
auto* server = DatabaseFeature::DATABASE->server();
auto* server = DatabaseServerFeature::SERVER;
auto* systemVocbase = DatabaseFeature::DATABASE->vocbase();
// enter context and isolate

View File

@ -46,17 +46,21 @@
#include "RestServer/CheckVersionFeature.h"
#include "RestServer/ConsoleFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabaseServerFeature.h"
#include "RestServer/EndpointFeature.h"
#include "RestServer/FileDescriptorsFeature.h"
#include "RestServer/FrontendFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/RestServerFeature.h"
#include "RestServer/ServerFeature.h"
#include "RestServer/UpgradeFeature.h"
#include "Scheduler/SchedulerFeature.h"
#include "Statistics/StatisticsFeature.h"
#include "V8Server/V8DealerFeature.h"
#include "Wal/LogfileManager.h"
using namespace arangodb;
using namespace arangodb::wal;
////////////////////////////////////////////////////////////////////////////////
/// @brief Hooks for OS-Specific functions
@ -104,14 +108,17 @@ int main(int argc, char* argv[]) {
server.addFeature(new ConfigFeature(&server, name));
server.addFeature(new ConsoleFeature(&server));
server.addFeature(new DatabaseFeature(&server));
server.addFeature(new DatabaseServerFeature(&server));
server.addFeature(new DispatcherFeature(&server));
server.addFeature(new EndpointFeature(&server));
server.addFeature(new FileDescriptorsFeature(&server));
server.addFeature(new FrontendFeature(&server));
server.addFeature(new LanguageFeature(&server));
server.addFeature(new LogfileManager(&server));
server.addFeature(new LoggerBufferFeature(&server));
server.addFeature(new LoggerFeature(&server, true));
server.addFeature(new NonceFeature(&server));
server.addFeature(new QueryRegistryFeature(&server));
server.addFeature(new RandomFeature(&server));
server.addFeature(new RestServerFeature(&server, "arangodb"));
server.addFeature(new SchedulerFeature(&server));

View File

@ -36,6 +36,8 @@
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabaseServerFeature.h"
#include "VocBase/server.h"
#include "Wal/AllocatorThread.h"
#include "Wal/CollectorThread.h"
@ -44,21 +46,22 @@
#include "Wal/Slots.h"
#include "Wal/SynchronizerThread.h"
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::basics;
using namespace arangodb::options;
using namespace arangodb::wal;
using namespace arangodb;
/// @brief the logfile manager singleton
static LogfileManager* Instance = nullptr;
// the logfile manager singleton
LogfileManager* LogfileManager::Instance = nullptr;
/// @brief minimum value for --wal.throttle-when-pending
// minimum value for --wal.throttle-when-pending
static inline uint64_t MinThrottleWhenPending() { return 1024 * 1024; }
/// @brief minimum value for --wal.sync-interval
// minimum value for --wal.sync-interval
static inline uint64_t MinSyncInterval() { return 5; }
/// @brief minimum value for --wal.logfile-size
// minimum value for --wal.logfile-size
static inline uint32_t MinFileSize() {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// this allows testing with smaller logfile-sizes
@ -68,35 +71,21 @@ static inline uint32_t MinFileSize() {
#endif
}
/// @brief get the maximum size of a logfile entry
// get the maximum size of a logfile entry
static inline uint32_t MaxEntrySize() {
return 2 << 30; // 2 GB
}
/// @brief minimum number of slots
// minimum number of slots
static inline uint32_t MinSlots() { return 1024 * 8; }
/// @brief maximum number of slots
// maximum number of slots
static inline uint32_t MaxSlots() { return 1024 * 1024 * 16; }
//YYY #warning JAN should not be static
bool LogfileManager::_allowOversizeEntries = true;
std::string LogfileManager::_directory;
uint32_t LogfileManager::_historicLogfiles = 10;
bool LogfileManager::_ignoreLogfileErrors = false;
bool LogfileManager::_ignoreRecoveryErrors = false;
uint32_t LogfileManager::_filesize = 32 * 1024 * 1024;
uint32_t LogfileManager::_maxOpenLogfiles = 0;
uint32_t LogfileManager::_reserveLogfiles = 4;
uint32_t LogfileManager::_numberOfSlots = 1048576;
uint64_t LogfileManager::_syncInterval = 100;
uint64_t LogfileManager::_throttleWhenPending = 0;
uint64_t LogfileManager::_maxThrottleWait = 15000;
/// @brief create the logfile manager
LogfileManager::LogfileManager(TRI_server_t* server, std::string* databasePath)
: _server(server),
_databasePath(databasePath),
// create the logfile manager
LogfileManager::LogfileManager(ApplicationServer* server)
: ApplicationFeature(server, "LogfileManager"),
_server(nullptr),
_recoverState(nullptr),
_allowWrites(false), // start in read-only mode
_hasFoundLastTick(false),
@ -124,11 +113,16 @@ LogfileManager::LogfileManager(TRI_server_t* server, std::string* databasePath)
LOG(TRACE) << "creating WAL logfile manager";
TRI_ASSERT(!_allowWrites);
setOptional(false);
requiresElevatedPrivileges(false);
startsAfter("DatabaseServer");
startsAfter("QueryRegistry");
_transactions.reserve(32);
_failedTransactions.reserve(32);
}
/// @brief destroy the logfile manager
// destroy the logfile manager
LogfileManager::~LogfileManager() {
LOG(TRACE) << "shutting down WAL logfile manager";
@ -150,19 +144,6 @@ LogfileManager::~LogfileManager() {
}
}
/// @brief get the logfile manager instance
LogfileManager* LogfileManager::instance() {
TRI_ASSERT(Instance != nullptr);
return Instance;
}
/// @brief initialize the logfile manager instance
void LogfileManager::initialize(std::string* path, TRI_server_t* server) {
TRI_ASSERT(Instance == nullptr);
Instance = new LogfileManager(server, path);
}
void LogfileManager::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addSection(
Section("wal", "Configure the WAL", "wal options", false, false));
@ -222,53 +203,7 @@ void LogfileManager::collectOptions(std::shared_ptr<ProgramOptions> options) {
new UInt64Parameter(&_maxThrottleWait));
}
bool LogfileManager::prepare() {
static bool Prepared = false;
if (Prepared) {
return true;
}
Prepared = true;
if (_directory.empty()) {
// use global configuration variable
_directory = *_databasePath;
if (!basics::FileUtils::isDirectory(_directory)) {
std::string systemErrorStr;
long errorNo;
int res = TRI_CreateRecursiveDirectory(_directory.c_str(), errorNo,
systemErrorStr);
if (res == TRI_ERROR_NO_ERROR) {
LOG(INFO) << "created database directory '" << _directory << "'.";
} else {
LOG(FATAL) << "unable to create database directory: " << systemErrorStr;
FATAL_ERROR_EXIT();
}
}
// append "/journals"
if (_directory[_directory.size() - 1] != TRI_DIR_SEPARATOR_CHAR) {
// append a trailing slash to directory name
_directory.push_back(TRI_DIR_SEPARATOR_CHAR);
}
_directory.append("journals");
}
if (_directory.empty()) {
LOG(FATAL) << "no directory specified for WAL logfiles. Please use the "
"--wal.directory option";
FATAL_ERROR_EXIT();
}
if (_directory[_directory.size() - 1] != TRI_DIR_SEPARATOR_CHAR) {
// append a trailing slash to directory name
_directory.push_back(TRI_DIR_SEPARATOR_CHAR);
}
void LogfileManager::prepare() {
if (_filesize < MinFileSize()) {
// minimum filesize per logfile
LOG(FATAL) << "invalid value for --wal.logfile-size. Please use a value of "
@ -277,8 +212,6 @@ bool LogfileManager::prepare() {
FATAL_ERROR_EXIT();
}
_filesize = (uint32_t)(((_filesize + PageSize - 1) / PageSize) * PageSize);
if (_numberOfSlots < MinSlots() || _numberOfSlots > MaxSlots()) {
// invalid number of slots
LOG(FATAL) << "invalid value for --wal.slots. Please use a value between "
@ -304,30 +237,66 @@ bool LogfileManager::prepare() {
// sync interval is specified in milliseconds by the user, but internally
// we use microseconds
_syncInterval = _syncInterval * 1000;
}
void LogfileManager::start() {
_server = DatabaseServerFeature::SERVER;
_databasePath = DatabaseFeature::DATABASE->directory();
// needs server initialized
_filesize = (uint32_t)(((_filesize + PageSize - 1) / PageSize) * PageSize);
if (_directory.empty()) {
// use global configuration variable
_directory = _databasePath;
if (!basics::FileUtils::isDirectory(_directory)) {
std::string systemErrorStr;
long errorNo;
int res = TRI_CreateRecursiveDirectory(_directory.c_str(), errorNo,
systemErrorStr);
if (res == TRI_ERROR_NO_ERROR) {
LOG(INFO) << "created database directory '" << _directory << "'.";
} else {
LOG(FATAL) << "unable to create database directory: " << systemErrorStr;
FATAL_ERROR_EXIT();
}
}
// append "/journals"
if (_directory[_directory.size() - 1] != TRI_DIR_SEPARATOR_CHAR) {
// append a trailing slash to directory name
_directory.push_back(TRI_DIR_SEPARATOR_CHAR);
}
_directory.append("journals");
}
if (_directory.empty()) {
LOG(FATAL) << "no directory specified for WAL logfiles. Please use the "
"--wal.directory option";
FATAL_ERROR_EXIT();
}
if (_directory[_directory.size() - 1] != TRI_DIR_SEPARATOR_CHAR) {
// append a trailing slash to directory name
_directory.push_back(TRI_DIR_SEPARATOR_CHAR);
}
// initialize some objects
_slots = new Slots(this, _numberOfSlots, 0);
_recoverState = new RecoverState(_server, _ignoreRecoveryErrors);
return true;
}
bool LogfileManager::start() {
static bool started = false;
if (started) {
// we were already started
return true;
}
TRI_ASSERT(!_allowWrites);
int res = inventory();
if (res != TRI_ERROR_NO_ERROR) {
LOG(ERR) << "could not create WAL logfile inventory: "
<< TRI_errno_string(res);
return false;
LOG(FATAL) << "could not create WAL logfile inventory: "
<< TRI_errno_string(res);
FATAL_ERROR_EXIT();
}
std::string const shutdownFile = shutdownFilename();
@ -339,9 +308,9 @@ bool LogfileManager::start() {
res = readShutdownInfo();
if (res != TRI_ERROR_NO_ERROR) {
LOG(ERR) << "could not open shutdown file '" << shutdownFile
<< "': " << TRI_errno_string(res);
return false;
LOG(FATAL) << "could not open shutdown file '" << shutdownFile
<< "': " << TRI_errno_string(res);
FATAL_ERROR_EXIT();
}
} else {
LOG(TRACE) << "no shutdown file found";
@ -350,18 +319,14 @@ bool LogfileManager::start() {
res = inspectLogfiles();
if (res != TRI_ERROR_NO_ERROR) {
LOG(ERR) << "could not inspect WAL logfiles: " << TRI_errno_string(res);
return false;
LOG(FATAL) << "could not inspect WAL logfiles: " << TRI_errno_string(res);
FATAL_ERROR_EXIT();
}
started = true;
LOG(TRACE) << "WAL logfile manager configuration: historic logfiles: "
<< _historicLogfiles << ", reserve logfiles: " << _reserveLogfiles
<< ", filesize: " << _filesize
<< ", sync interval: " << _syncInterval;
return true;
}
bool LogfileManager::open() {
@ -517,7 +482,7 @@ void LogfileManager::stop() {
// notify slots that we're shutting down
_slots->shutdown();
// finalize allocator thread
// this prevents creating new (empty) WAL logfile once we flush
// the current logfile
@ -585,7 +550,7 @@ void LogfileManager::stop() {
}
}
/// @brief registers a transaction
// registers a transaction
int LogfileManager::registerTransaction(TRI_voc_tid_t transactionId) {
auto lastCollectedId = _lastCollectedId.load();
auto lastSealedId = _lastSealedId.load();
@ -610,7 +575,7 @@ int LogfileManager::registerTransaction(TRI_voc_tid_t transactionId) {
}
}
/// @brief unregisters a transaction
// unregisters a transaction
void LogfileManager::unregisterTransaction(TRI_voc_tid_t transactionId,
bool markAsFailed) {
WRITE_LOCKER(writeLocker, _transactionsLock);
@ -622,7 +587,7 @@ void LogfileManager::unregisterTransaction(TRI_voc_tid_t transactionId,
}
}
/// @brief return the set of failed transactions
// return the set of failed transactions
std::unordered_set<TRI_voc_tid_t> LogfileManager::getFailedTransactions() {
std::unordered_set<TRI_voc_tid_t> failedTransactions;
@ -634,7 +599,7 @@ std::unordered_set<TRI_voc_tid_t> LogfileManager::getFailedTransactions() {
return failedTransactions;
}
/// @brief return the set of dropped collections
// return the set of dropped collections
/// this is used during recovery and not used afterwards
std::unordered_set<TRI_voc_cid_t> LogfileManager::getDroppedCollections() {
std::unordered_set<TRI_voc_cid_t> droppedCollections;
@ -647,7 +612,7 @@ std::unordered_set<TRI_voc_cid_t> LogfileManager::getDroppedCollections() {
return droppedCollections;
}
/// @brief return the set of dropped databases
// return the set of dropped databases
/// this is used during recovery and not used afterwards
std::unordered_set<TRI_voc_tick_t> LogfileManager::getDroppedDatabases() {
std::unordered_set<TRI_voc_tick_t> droppedDatabases;
@ -660,7 +625,7 @@ std::unordered_set<TRI_voc_tick_t> LogfileManager::getDroppedDatabases() {
return droppedDatabases;
}
/// @brief unregister a list of failed transactions
// unregister a list of failed transactions
void LogfileManager::unregisterFailedTransactions(
std::unordered_set<TRI_voc_tid_t> const& failedTransactions) {
WRITE_LOCKER(writeLocker, _transactionsLock);
@ -669,7 +634,7 @@ void LogfileManager::unregisterFailedTransactions(
[&](TRI_voc_tid_t id) { _failedTransactions.erase(id); });
}
/// @brief whether or not it is currently allowed to create an additional
// whether or not it is currently allowed to create an additional
/// logfile
bool LogfileManager::logfileCreationAllowed(uint32_t size) {
if (size + DatafileHelper::JournalOverhead() > filesize()) {
@ -702,7 +667,7 @@ bool LogfileManager::logfileCreationAllowed(uint32_t size) {
return (numberOfLogfiles <= _maxOpenLogfiles);
}
/// @brief whether or not there are reserve logfiles
// whether or not there are reserve logfiles
bool LogfileManager::hasReserveLogfiles() {
uint32_t numberOfLogfiles = 0;
@ -726,12 +691,12 @@ bool LogfileManager::hasReserveLogfiles() {
return false;
}
/// @brief signal that a sync operation is required
void LogfileManager::signalSync(bool waitForSync) {
_synchronizerThread->signalSync(waitForSync);
// signal that a sync operation is required
void LogfileManager::signalSync(bool waitForSync) {
_synchronizerThread->signalSync(waitForSync);
}
/// @brief allocate space in a logfile for later writing
// allocate space in a logfile for later writing
SlotInfo LogfileManager::allocate(uint32_t size) {
if (!_allowWrites) {
// no writes allowed
@ -751,7 +716,7 @@ SlotInfo LogfileManager::allocate(uint32_t size) {
return _slots->nextUnused(size);
}
/// @brief allocate space in a logfile for later writing
// allocate space in a logfile for later writing
SlotInfo LogfileManager::allocate(TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId, uint32_t size) {
if (!_allowWrites) {
@ -772,7 +737,7 @@ SlotInfo LogfileManager::allocate(TRI_voc_tick_t databaseId,
return _slots->nextUnused(databaseId, collectionId, size);
}
/// @brief write data into the logfile, using database id and collection id
// write data into the logfile, using database id and collection id
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy LogfileManager::allocateAndWrite(TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
@ -803,7 +768,7 @@ SlotInfoCopy LogfileManager::allocateAndWrite(TRI_voc_tick_t databaseId,
}
}
/// @brief write data into the logfile
// write data into the logfile
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size,
bool waitForSync) {
@ -832,19 +797,19 @@ SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size,
}
}
/// @brief write data into the logfile
// write data into the logfile
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy LogfileManager::allocateAndWrite(Marker const& marker,
bool waitForSync) {
return allocateAndWrite(marker.mem(), marker.size(), waitForSync);
}
/// @brief finalize a log entry
// finalize a log entry
void LogfileManager::finalize(SlotInfo& slotInfo, bool waitForSync) {
_slots->returnUsed(slotInfo, waitForSync);
}
/// @brief wait for the collector queue to get cleared for the given collection
// wait for the collector queue to get cleared for the given collection
int LogfileManager::waitForCollectorQueue(TRI_voc_cid_t cid, double timeout) {
double const end = TRI_microtime() + timeout;
@ -859,7 +824,7 @@ int LogfileManager::waitForCollectorQueue(TRI_voc_cid_t cid, double timeout) {
return TRI_ERROR_NO_ERROR;
}
/// @brief finalize and seal the currently open logfile
// finalize and seal the currently open logfile
/// this is useful to ensure that any open writes up to this point have made
/// it into a logfile
int LogfileManager::flush(bool waitForSync, bool waitForCollector,
@ -956,7 +921,7 @@ bool LogfileManager::waitForSync(double maxWait) {
}
}
/// @brief re-inserts a logfile back into the inventory only
// re-inserts a logfile back into the inventory only
void LogfileManager::relinkLogfile(Logfile* logfile) {
Logfile::IdType const id = logfile->id();
@ -964,7 +929,7 @@ void LogfileManager::relinkLogfile(Logfile* logfile) {
_logfiles.emplace(id, logfile);
}
/// @brief remove a logfile from the inventory only
// remove a logfile from the inventory only
bool LogfileManager::unlinkLogfile(Logfile* logfile) {
Logfile::IdType const id = logfile->id();
@ -980,7 +945,7 @@ bool LogfileManager::unlinkLogfile(Logfile* logfile) {
return true;
}
/// @brief remove a logfile from the inventory only
// remove a logfile from the inventory only
Logfile* LogfileManager::unlinkLogfile(Logfile::IdType id) {
WRITE_LOCKER(writeLocker, _logfilesLock);
auto it = _logfiles.find(id);
@ -994,7 +959,7 @@ Logfile* LogfileManager::unlinkLogfile(Logfile::IdType id) {
return (*it).second;
}
/// @brief removes logfiles that are allowed to be removed
// removes logfiles that are allowed to be removed
bool LogfileManager::removeLogfiles() {
int iterations = 0;
bool worked = false;
@ -1013,7 +978,7 @@ bool LogfileManager::removeLogfiles() {
return worked;
}
/// @brief sets the status of a logfile to open
// sets the status of a logfile to open
void LogfileManager::setLogfileOpen(Logfile* logfile) {
TRI_ASSERT(logfile != nullptr);
@ -1021,7 +986,7 @@ void LogfileManager::setLogfileOpen(Logfile* logfile) {
logfile->setStatus(Logfile::StatusType::OPEN);
}
/// @brief sets the status of a logfile to seal-requested
// sets the status of a logfile to seal-requested
void LogfileManager::setLogfileSealRequested(Logfile* logfile) {
TRI_ASSERT(logfile != nullptr);
@ -1033,14 +998,14 @@ void LogfileManager::setLogfileSealRequested(Logfile* logfile) {
signalSync(true);
}
/// @brief sets the status of a logfile to sealed
// sets the status of a logfile to sealed
void LogfileManager::setLogfileSealed(Logfile* logfile) {
TRI_ASSERT(logfile != nullptr);
setLogfileSealed(logfile->id());
}
/// @brief sets the status of a logfile to sealed
// sets the status of a logfile to sealed
void LogfileManager::setLogfileSealed(Logfile::IdType id) {
{
WRITE_LOCKER(writeLocker, _logfilesLock);
@ -1060,7 +1025,7 @@ void LogfileManager::setLogfileSealed(Logfile::IdType id) {
}
}
/// @brief return the status of a logfile
// return the status of a logfile
Logfile::StatusType LogfileManager::getLogfileStatus(Logfile::IdType id) {
READ_LOCKER(readLocker, _logfilesLock);
@ -1073,7 +1038,7 @@ Logfile::StatusType LogfileManager::getLogfileStatus(Logfile::IdType id) {
return (*it).second->status();
}
/// @brief return the file descriptor of a logfile
// return the file descriptor of a logfile
int LogfileManager::getLogfileDescriptor(Logfile::IdType id) {
READ_LOCKER(readLocker, _logfilesLock);
@ -1091,7 +1056,7 @@ int LogfileManager::getLogfileDescriptor(Logfile::IdType id) {
return logfile->fd();
}
/// @brief get the current open region of a logfile
// get the current open region of a logfile
/// this uses the slots lock
void LogfileManager::getActiveLogfileRegion(Logfile* logfile,
char const*& begin,
@ -1099,7 +1064,7 @@ void LogfileManager::getActiveLogfileRegion(Logfile* logfile,
_slots->getActiveLogfileRegion(logfile, begin, end);
}
/// @brief garbage collect expired logfile barriers
// garbage collect expired logfile barriers
void LogfileManager::collectLogfileBarriers() {
auto now = TRI_microtime();
@ -1121,7 +1086,7 @@ void LogfileManager::collectLogfileBarriers() {
}
}
/// @brief returns a list of all logfile barrier ids
// returns a list of all logfile barrier ids
std::vector<TRI_voc_tick_t> LogfileManager::getLogfileBarriers() {
std::vector<TRI_voc_tick_t> result;
@ -1137,7 +1102,7 @@ std::vector<TRI_voc_tick_t> LogfileManager::getLogfileBarriers() {
return result;
}
/// @brief remove a specific logfile barrier
// remove a specific logfile barrier
bool LogfileManager::removeLogfileBarrier(TRI_voc_tick_t id) {
WRITE_LOCKER(barrierLock, _barriersLock);
@ -1157,7 +1122,7 @@ bool LogfileManager::removeLogfileBarrier(TRI_voc_tick_t id) {
return true;
}
/// @brief adds a barrier that prevents removal of logfiles
// adds a barrier that prevents removal of logfiles
TRI_voc_tick_t LogfileManager::addLogfileBarrier(TRI_voc_tick_t minTick,
double ttl) {
TRI_voc_tick_t id = TRI_NewTickServer();
@ -1178,7 +1143,7 @@ TRI_voc_tick_t LogfileManager::addLogfileBarrier(TRI_voc_tick_t minTick,
return id;
}
/// @brief extend the lifetime of a logfile barrier
// extend the lifetime of a logfile barrier
bool LogfileManager::extendLogfileBarrier(TRI_voc_tick_t id, double ttl,
TRI_voc_tick_t tick) {
WRITE_LOCKER(barrierLock, _barriersLock);
@ -1204,7 +1169,7 @@ bool LogfileManager::extendLogfileBarrier(TRI_voc_tick_t id, double ttl,
return true;
}
/// @brief get minimum tick value from all logfile barriers
// get minimum tick value from all logfile barriers
TRI_voc_tick_t LogfileManager::getMinBarrierTick() {
TRI_voc_tick_t value = 0;
@ -1224,7 +1189,7 @@ TRI_voc_tick_t LogfileManager::getMinBarrierTick() {
return value;
}
/// @brief get logfiles for a tick range
// get logfiles for a tick range
std::vector<Logfile*> LogfileManager::getLogfilesForTickRange(
TRI_voc_tick_t minTick, TRI_voc_tick_t maxTick, bool& minTickIncluded) {
std::vector<Logfile*> temp;
@ -1287,14 +1252,14 @@ std::vector<Logfile*> LogfileManager::getLogfilesForTickRange(
return matching;
}
/// @brief return logfiles for a tick range
// return logfiles for a tick range
void LogfileManager::returnLogfiles(std::vector<Logfile*> const& logfiles) {
for (auto& logfile : logfiles) {
logfile->release();
}
}
/// @brief get a logfile by id
// get a logfile by id
Logfile* LogfileManager::getLogfile(Logfile::IdType id) {
READ_LOCKER(readLocker, _logfilesLock);
@ -1307,7 +1272,7 @@ Logfile* LogfileManager::getLogfile(Logfile::IdType id) {
return nullptr;
}
/// @brief get a logfile and its status by id
// get a logfile and its status by id
Logfile* LogfileManager::getLogfile(Logfile::IdType id,
Logfile::StatusType& status) {
READ_LOCKER(readLocker, _logfilesLock);
@ -1324,7 +1289,7 @@ Logfile* LogfileManager::getLogfile(Logfile::IdType id,
return nullptr;
}
/// @brief get a logfile for writing. this may return nullptr
// get a logfile for writing. this may return nullptr
int LogfileManager::getWriteableLogfile(uint32_t size,
Logfile::StatusType& status,
Logfile*& result) {
@ -1405,7 +1370,7 @@ int LogfileManager::getWriteableLogfile(uint32_t size,
return TRI_ERROR_LOCK_TIMEOUT;
}
/// @brief get a logfile to collect. this may return nullptr
// get a logfile to collect. this may return nullptr
Logfile* LogfileManager::getCollectableLogfile() {
// iterate over all active readers and find their minimum used logfile id
Logfile::IdType minId = UINT64_MAX;
@ -1448,7 +1413,7 @@ Logfile* LogfileManager::getCollectableLogfile() {
return nullptr;
}
/// @brief get a logfile to remove. this may return nullptr
// get a logfile to remove. this may return nullptr
/// if it returns a logfile, the logfile is removed from the list of available
/// logfiles
Logfile* LogfileManager::getRemovableLogfile() {
@ -1513,17 +1478,17 @@ Logfile* LogfileManager::getRemovableLogfile() {
return nullptr;
}
/// @brief increase the number of collect operations for a logfile
// increase the number of collect operations for a logfile
void LogfileManager::increaseCollectQueueSize(Logfile* logfile) {
logfile->increaseCollectQueueSize();
}
/// @brief decrease the number of collect operations for a logfile
// decrease the number of collect operations for a logfile
void LogfileManager::decreaseCollectQueueSize(Logfile* logfile) {
logfile->decreaseCollectQueueSize();
}
/// @brief mark a file as being requested for collection
// mark a file as being requested for collection
void LogfileManager::setCollectionRequested(Logfile* logfile) {
TRI_ASSERT(logfile != nullptr);
@ -1545,7 +1510,7 @@ void LogfileManager::setCollectionRequested(Logfile* logfile) {
}
}
/// @brief mark a file as being done with collection
// mark a file as being done with collection
void LogfileManager::setCollectionDone(Logfile* logfile) {
TRI_ASSERT(logfile != nullptr);
Logfile::IdType id = logfile->id();
@ -1569,7 +1534,7 @@ void LogfileManager::setCollectionDone(Logfile* logfile) {
}
}
/// @brief force the status of a specific logfile
// force the status of a specific logfile
void LogfileManager::forceStatus(Logfile* logfile, Logfile::StatusType status) {
TRI_ASSERT(logfile != nullptr);
@ -1579,7 +1544,7 @@ void LogfileManager::forceStatus(Logfile* logfile, Logfile::StatusType status) {
}
}
/// @brief return the current state
// return the current state
LogfileManagerState LogfileManager::state() {
LogfileManagerState state;
@ -1591,7 +1556,7 @@ LogfileManagerState LogfileManager::state() {
return state;
}
/// @brief return the current available logfile ranges
// return the current available logfile ranges
LogfileRanges LogfileManager::ranges() {
LogfileRanges result;
@ -1617,7 +1582,7 @@ LogfileRanges LogfileManager::ranges() {
return result;
}
/// @brief get information about running transactions
// get information about running transactions
std::tuple<size_t, Logfile::IdType, Logfile::IdType>
LogfileManager::runningTransactions() {
size_t count = 0;
@ -1647,7 +1612,7 @@ LogfileManager::runningTransactions() {
count, lastCollectedId, lastSealedId);
}
/// @brief remove a logfile in the file system
// remove a logfile in the file system
void LogfileManager::removeLogfile(Logfile* logfile) {
// old filename
Logfile::IdType const id = logfile->id();
@ -1667,7 +1632,7 @@ void LogfileManager::removeLogfile(Logfile* logfile) {
}
}
/// @brief wait until a specific logfile has been collected
// wait until a specific logfile has been collected
int LogfileManager::waitForCollector(Logfile::IdType logfileId,
double maxWaitTime) {
static int64_t const SingleWaitPeriod = 50 * 1000;
@ -1709,7 +1674,7 @@ int LogfileManager::waitForCollector(Logfile::IdType logfileId,
return TRI_ERROR_LOCK_TIMEOUT;
}
/// @brief run the recovery procedure
// run the recovery procedure
/// this is called after the logfiles have been scanned completely and
/// recovery state has been build. additionally, all databases have been
/// opened already so we can use collections
@ -1751,7 +1716,7 @@ int LogfileManager::runRecovery() {
return TRI_ERROR_NO_ERROR;
}
/// @brief closes all logfiles
// closes all logfiles
void LogfileManager::closeLogfiles() {
WRITE_LOCKER(writeLocker, _logfilesLock);
@ -1766,7 +1731,7 @@ void LogfileManager::closeLogfiles() {
_logfiles.clear();
}
/// @brief reads the shutdown information
// reads the shutdown information
int LogfileManager::readShutdownInfo() {
std::string const filename = shutdownFilename();
std::shared_ptr<VPackBuilder> builder;
@ -1824,7 +1789,7 @@ int LogfileManager::readShutdownInfo() {
return TRI_ERROR_NO_ERROR;
}
/// @brief writes the shutdown information
// writes the shutdown information
/// this function is called at shutdown and at every logfile flush request
int LogfileManager::writeShutdownInfo(bool writeShutdownTime) {
TRI_IF_FAILURE("LogfileManagerWriteShutdown") { return TRI_ERROR_DEBUG; }
@ -1884,7 +1849,7 @@ int LogfileManager::writeShutdownInfo(bool writeShutdownTime) {
return TRI_ERROR_NO_ERROR;
}
/// @brief start the synchronizer thread
// start the synchronizer thread
int LogfileManager::startSynchronizerThread() {
_synchronizerThread = new SynchronizerThread(this, _syncInterval);
@ -1896,7 +1861,7 @@ int LogfileManager::startSynchronizerThread() {
return TRI_ERROR_NO_ERROR;
}
/// @brief stop the synchronizer thread
// stop the synchronizer thread
void LogfileManager::stopSynchronizerThread() {
if (_synchronizerThread != nullptr) {
LOG(TRACE) << "stopping WAL synchronizer thread";
@ -1905,7 +1870,7 @@ void LogfileManager::stopSynchronizerThread() {
}
}
/// @brief start the allocator thread
// start the allocator thread
int LogfileManager::startAllocatorThread() {
_allocatorThread = new AllocatorThread(this);
@ -1917,7 +1882,7 @@ int LogfileManager::startAllocatorThread() {
return TRI_ERROR_NO_ERROR;
}
/// @brief stop the allocator thread
// stop the allocator thread
void LogfileManager::stopAllocatorThread() {
if (_allocatorThread != nullptr) {
LOG(TRACE) << "stopping WAL allocator thread";
@ -1926,7 +1891,7 @@ void LogfileManager::stopAllocatorThread() {
}
}
/// @brief start the collector thread
// start the collector thread
int LogfileManager::startCollectorThread() {
_collectorThread = new CollectorThread(this, _server);
@ -1938,7 +1903,7 @@ int LogfileManager::startCollectorThread() {
return TRI_ERROR_NO_ERROR;
}
/// @brief stop the collector thread
// stop the collector thread
void LogfileManager::stopCollectorThread() {
if (_collectorThread != nullptr) {
LOG(TRACE) << "stopping WAL collector thread";
@ -1979,7 +1944,7 @@ void LogfileManager::stopCollectorThread() {
}
}
/// @brief start the remover thread
// start the remover thread
int LogfileManager::startRemoverThread() {
_removerThread = new RemoverThread(this);
@ -1991,7 +1956,7 @@ int LogfileManager::startRemoverThread() {
return TRI_ERROR_NO_ERROR;
}
/// @brief stop the remover thread
// stop the remover thread
void LogfileManager::stopRemoverThread() {
if (_removerThread != nullptr) {
LOG(TRACE) << "stopping WAL remover thread";
@ -2000,7 +1965,7 @@ void LogfileManager::stopRemoverThread() {
}
}
/// @brief check which logfiles are present in the log directory
// check which logfiles are present in the log directory
int LogfileManager::inventory() {
int res = ensureDirectory();
@ -2036,7 +2001,7 @@ int LogfileManager::inventory() {
return TRI_ERROR_NO_ERROR;
}
/// @brief inspect the logfiles in the log directory
// inspect the logfiles in the log directory
int LogfileManager::inspectLogfiles() {
LOG(TRACE) << "inspecting WAL logfiles";
@ -2142,7 +2107,7 @@ int LogfileManager::inspectLogfiles() {
return TRI_ERROR_NO_ERROR;
}
/// @brief allocates a new reserve logfile
// allocates a new reserve logfile
int LogfileManager::createReserveLogfile(uint32_t size) {
Logfile::IdType const id = nextId();
std::string const filename = logfileName(id);
@ -2174,12 +2139,12 @@ int LogfileManager::createReserveLogfile(uint32_t size) {
return TRI_ERROR_NO_ERROR;
}
/// @brief get an id for the next logfile
// get an id for the next logfile
Logfile::IdType LogfileManager::nextId() {
return static_cast<Logfile::IdType>(TRI_NewTickServer());
}
/// @brief ensure the wal logfiles directory is actually there
// ensure the wal logfiles directory is actually there
int LogfileManager::ensureDirectory() {
// strip directory separator from path
// this is required for Windows
@ -2211,18 +2176,18 @@ int LogfileManager::ensureDirectory() {
return TRI_ERROR_NO_ERROR;
}
/// @brief return the absolute name of the shutdown file
// return the absolute name of the shutdown file
std::string LogfileManager::shutdownFilename() const {
return (*_databasePath) + TRI_DIR_SEPARATOR_STR + std::string("SHUTDOWN");
return (_databasePath + TRI_DIR_SEPARATOR_STR) + "SHUTDOWN";
}
/// @brief return an absolute filename for a logfile id
// return an absolute filename for a logfile id
std::string LogfileManager::logfileName(Logfile::IdType id) const {
return _directory + std::string("logfile-") + basics::StringUtils::itoa(id) +
std::string(".db");
}
/// @brief return the current time as a string
// return the current time as a string
std::string LogfileManager::getTimeString() {
char buffer[32];
size_t len;

View File

@ -24,7 +24,7 @@
#ifndef ARANGOD_WAL_LOGFILE_MANAGER_H
#define ARANGOD_WAL_LOGFILE_MANAGER_H 1
#include "Basics/Common.h"
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h"
@ -87,7 +87,7 @@ struct LogfileBarrier {
TRI_voc_tick_t minTick;
};
class LogfileManager {
class LogfileManager final : public application_features::ApplicationFeature {
friend class AllocatorThread;
friend class CollectorThread;
@ -95,98 +95,103 @@ class LogfileManager {
LogfileManager& operator=(LogfileManager const&) = delete;
public:
LogfileManager(TRI_server_t*, std::string*);
explicit LogfileManager(application_features::ApplicationServer* server);
/// @brief destroy the logfile manager
// destroy the logfile manager
~LogfileManager();
/// @brief get the logfile manager instance
static LogfileManager* instance();
// get the logfile manager instance
static LogfileManager* instance() {
TRI_ASSERT(Instance != nullptr);
return Instance;
}
/// @brief initialize the logfile manager instance
static void initialize(std::string*, TRI_server_t*);
private:
static LogfileManager* Instance;
public:
static void collectOptions(std::shared_ptr<options::ProgramOptions> options);
bool prepare();
void collectOptions(
std::shared_ptr<options::ProgramOptions> options) override final;
void prepare() override final;
void start() override final;
void stop() override final;
bool open();
bool start();
void stop();
public:
/// @brief get the logfile directory
// get the logfile directory
inline std::string directory() const { return _directory; }
/// @brief get the logfile size
// get the logfile size
inline uint32_t filesize() const { return _filesize; }
/// @brief set the logfile size
// set the logfile size
inline void filesize(uint32_t value) { _filesize = value; }
/// @brief get the sync interval
// get the sync interval
inline uint64_t syncInterval() const { return _syncInterval / 1000; }
/// @brief set the sync interval
// set the sync interval
inline void syncInterval(uint64_t value) { _syncInterval = value * 1000; }
/// @brief get the number of reserve logfiles
// get the number of reserve logfiles
inline uint32_t reserveLogfiles() const { return _reserveLogfiles; }
/// @brief set the number of reserve logfiles
// set the number of reserve logfiles
inline void reserveLogfiles(uint32_t value) { _reserveLogfiles = value; }
/// @brief get the number of historic logfiles to keep
// get the number of historic logfiles to keep
inline uint32_t historicLogfiles() const { return _historicLogfiles; }
/// @brief set the number of historic logfiles
// set the number of historic logfiles
inline void historicLogfiles(uint32_t value) { _historicLogfiles = value; }
/// @brief whether or not there was a SHUTDOWN file with a tick value
// whether or not there was a SHUTDOWN file with a tick value
/// at server start
inline bool hasFoundLastTick() const { return _hasFoundLastTick; }
/// @brief whether or not we are in the recovery phase
// whether or not we are in the recovery phase
inline bool isInRecovery() const { return _inRecovery; }
/// @brief whether or not we are in the shutdown phase
// whether or not we are in the shutdown phase
inline bool isInShutdown() const { return (_shutdown != 0); }
/// @brief return the slots manager
// return the slots manager
Slots* slots() { return _slots; }
/// @brief whether or not oversize entries are allowed
// whether or not oversize entries are allowed
inline bool allowOversizeEntries() const { return _allowOversizeEntries; }
/// @brief sets the "allowOversizeEntries" value
// sets the "allowOversizeEntries" value
inline void allowOversizeEntries(bool value) {
_allowOversizeEntries = value;
}
/// @brief whether or not write-throttling can be enabled
// whether or not write-throttling can be enabled
inline bool canBeThrottled() const { return (_throttleWhenPending > 0); }
/// @brief maximum wait time when write-throttled (in milliseconds)
// maximum wait time when write-throttled (in milliseconds)
inline uint64_t maxThrottleWait() const { return _maxThrottleWait; }
/// @brief maximum wait time when write-throttled (in milliseconds)
// maximum wait time when write-throttled (in milliseconds)
inline void maxThrottleWait(uint64_t value) { _maxThrottleWait = value; }
/// @brief whether or not write-throttling is currently enabled
// whether or not write-throttling is currently enabled
inline bool isThrottled() { return (_writeThrottled != 0); }
/// @brief activate write-throttling
// activate write-throttling
void activateWriteThrottling() { _writeThrottled = 1; }
/// @brief deactivate write-throttling
// deactivate write-throttling
void deactivateWriteThrottling() { _writeThrottled = 0; }
/// @brief allow or disallow writes to the WAL
// allow or disallow writes to the WAL
inline void allowWrites(bool value) { _allowWrites = value; }
/// @brief get the value of --wal.throttle-when-pending
// get the value of --wal.throttle-when-pending
inline uint64_t throttleWhenPending() const { return _throttleWhenPending; }
/// @brief set the value of --wal.throttle-when-pending
// set the value of --wal.throttle-when-pending
inline void throttleWhenPending(uint64_t value) {
_throttleWhenPending = value;
@ -195,352 +200,347 @@ class LogfileManager {
}
}
/// @brief registers a transaction
// registers a transaction
int registerTransaction(TRI_voc_tid_t);
/// @brief unregisters a transaction
// unregisters a transaction
void unregisterTransaction(TRI_voc_tid_t, bool);
/// @brief return the set of failed transactions
// return the set of failed transactions
std::unordered_set<TRI_voc_tid_t> getFailedTransactions();
/// @brief return the set of dropped collections
// return the set of dropped collections
/// this is used during recovery and not used afterwards
std::unordered_set<TRI_voc_cid_t> getDroppedCollections();
/// @brief return the set of dropped databases
// return the set of dropped databases
/// this is used during recovery and not used afterwards
std::unordered_set<TRI_voc_tick_t> getDroppedDatabases();
/// @brief unregister a list of failed transactions
// unregister a list of failed transactions
void unregisterFailedTransactions(std::unordered_set<TRI_voc_tid_t> const&);
/// @brief whether or not it is currently allowed to create an additional
// whether or not it is currently allowed to create an additional
/// logfile
bool logfileCreationAllowed(uint32_t);
/// @brief whether or not there are reserve logfiles
// whether or not there are reserve logfiles
bool hasReserveLogfiles();
/// @brief signal that a sync operation is required
// signal that a sync operation is required
void signalSync(bool);
/// @brief reserve space in a logfile
// reserve space in a logfile
SlotInfo allocate(uint32_t);
/// @brief reserve space in a logfile
// reserve space in a logfile
SlotInfo allocate(TRI_voc_tick_t, TRI_voc_cid_t, uint32_t);
/// @brief finalize a log entry
// finalize a log entry
void finalize(SlotInfo&, bool);
/// @brief write data into the logfile, using database id and collection id
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy allocateAndWrite(TRI_voc_tick_t, TRI_voc_cid_t, void*, uint32_t, bool);
/// @brief write data into the logfile
// write data into the logfile, using database id and collection id
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy allocateAndWrite(TRI_voc_tick_t, TRI_voc_cid_t, void*, uint32_t,
bool);
// write data into the logfile
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy allocateAndWrite(void*, uint32_t, bool);
/// @brief write data into the logfile
// write data into the logfile
/// this is a convenience function that combines allocate, memcpy and finalize
SlotInfoCopy allocateAndWrite(Marker const&, bool);
/// @brief wait for the collector queue to get cleared for the given
// wait for the collector queue to get cleared for the given
/// collection
int waitForCollectorQueue(TRI_voc_cid_t, double);
/// @brief finalize and seal the currently open logfile
// finalize and seal the currently open logfile
/// this is useful to ensure that any open writes up to this point have made
/// it into a logfile
int flush(bool, bool, bool);
/// wait until all changes to the current logfile are synced
bool waitForSync(double);
/// @brief re-inserts a logfile back into the inventory only
// re-inserts a logfile back into the inventory only
void relinkLogfile(Logfile*);
/// @brief remove a logfile from the inventory only
// remove a logfile from the inventory only
bool unlinkLogfile(Logfile*);
/// @brief remove a logfile from the inventory only
// remove a logfile from the inventory only
Logfile* unlinkLogfile(Logfile::IdType);
/// @brief removes logfiles that are allowed to be removed
// removes logfiles that are allowed to be removed
bool removeLogfiles();
/// @brief sets the status of a logfile to open
// sets the status of a logfile to open
void setLogfileOpen(Logfile*);
/// @brief sets the status of a logfile to seal-requested
// sets the status of a logfile to seal-requested
void setLogfileSealRequested(Logfile*);
/// @brief sets the status of a logfile to sealed
// sets the status of a logfile to sealed
void setLogfileSealed(Logfile*);
/// @brief sets the status of a logfile to sealed
// sets the status of a logfile to sealed
void setLogfileSealed(Logfile::IdType);
/// @brief return the status of a logfile
// return the status of a logfile
Logfile::StatusType getLogfileStatus(Logfile::IdType);
/// @brief return the file descriptor of a logfile
// return the file descriptor of a logfile
int getLogfileDescriptor(Logfile::IdType);
/// @brief get the current open region of a logfile
// get the current open region of a logfile
/// this uses the slots lock
void getActiveLogfileRegion(Logfile*, char const*&, char const*&);
/// @brief garbage collect expires logfile barriers
// garbage collect expires logfile barriers
void collectLogfileBarriers();
/// @brief returns a list of all logfile barrier ids
// returns a list of all logfile barrier ids
std::vector<TRI_voc_tick_t> getLogfileBarriers();
/// @brief remove a specific logfile barrier
// remove a specific logfile barrier
bool removeLogfileBarrier(TRI_voc_tick_t);
/// @brief adds a barrier that prevents removal of logfiles
// adds a barrier that prevents removal of logfiles
TRI_voc_tick_t addLogfileBarrier(TRI_voc_tick_t, double);
/// @brief extend the lifetime of a logfile barrier
// extend the lifetime of a logfile barrier
bool extendLogfileBarrier(TRI_voc_tick_t, double, TRI_voc_tick_t);
/// @brief get minimum tick value from all logfile barriers
// get minimum tick value from all logfile barriers
TRI_voc_tick_t getMinBarrierTick();
/// @brief get logfiles for a tick range
// get logfiles for a tick range
std::vector<Logfile*> getLogfilesForTickRange(TRI_voc_tick_t, TRI_voc_tick_t,
bool& minTickIncluded);
/// @brief return logfiles for a tick range
// return logfiles for a tick range
void returnLogfiles(std::vector<Logfile*> const&);
/// @brief get a logfile by id
// get a logfile by id
Logfile* getLogfile(Logfile::IdType);
/// @brief get a logfile and its status by id
// get a logfile and its status by id
Logfile* getLogfile(Logfile::IdType, Logfile::StatusType&);
/// @brief get a logfile for writing. this may return nullptr
// get a logfile for writing. this may return nullptr
int getWriteableLogfile(uint32_t, Logfile::StatusType&, Logfile*&);
/// @brief get a logfile to collect. this may return nullptr
// get a logfile to collect. this may return nullptr
Logfile* getCollectableLogfile();
/// @brief get a logfile to remove. this may return nullptr
// get a logfile to remove. this may return nullptr
/// if it returns a logfile, the logfile is removed from the list of available
/// logfiles
Logfile* getRemovableLogfile();
/// @brief increase the number of collect operations for a logfile
// increase the number of collect operations for a logfile
void increaseCollectQueueSize(Logfile*);
/// @brief decrease the number of collect operations for a logfile
// decrease the number of collect operations for a logfile
void decreaseCollectQueueSize(Logfile*);
/// @brief mark a file as being requested for collection
// mark a file as being requested for collection
void setCollectionRequested(Logfile*);
/// @brief mark a file as being done with collection
// mark a file as being done with collection
void setCollectionDone(Logfile*);
/// @brief force the status of a specific logfile
// force the status of a specific logfile
void forceStatus(Logfile*, Logfile::StatusType);
/// @brief return the current state
// return the current state
LogfileManagerState state();
/// @brief return the current available logfile ranges
// return the current available logfile ranges
LogfileRanges ranges();
/// @brief get information about running transactions
// get information about running transactions
std::tuple<size_t, Logfile::IdType, Logfile::IdType> runningTransactions();
private:
/// @brief remove a logfile in the file system
// remove a logfile in the file system
void removeLogfile(Logfile*);
/// @brief wait for the collector thread to collect a specific logfile
// wait for the collector thread to collect a specific logfile
int waitForCollector(Logfile::IdType, double);
/// @brief run the recovery procedure
// run the recovery procedure
/// this is called after the logfiles have been scanned completely and
/// recovery state has been build. additionally, all databases have been
/// opened already so we can use collections
int runRecovery();
/// @brief closes all logfiles
// closes all logfiles
void closeLogfiles();
/// @brief reads the shutdown information
// reads the shutdown information
int readShutdownInfo();
/// @brief writes the shutdown information
// writes the shutdown information
int writeShutdownInfo(bool);
/// @brief start the synchronizer thread
// start the synchronizer thread
int startSynchronizerThread();
/// @brief stop the synchronizer thread
// stop the synchronizer thread
void stopSynchronizerThread();
/// @brief start the allocator thread
// start the allocator thread
int startAllocatorThread();
/// @brief stop the allocator thread
// stop the allocator thread
void stopAllocatorThread();
/// @brief start the collector thread
// start the collector thread
int startCollectorThread();
/// @brief stop the collector thread
// stop the collector thread
void stopCollectorThread();
/// @brief start the remover thread
// start the remover thread
int startRemoverThread();
/// @brief stop the remover thread
// stop the remover thread
void stopRemoverThread();
/// @brief check which logfiles are present in the log directory
// check which logfiles are present in the log directory
int inventory();
/// @brief inspect all found WAL logfiles
// inspect all found WAL logfiles
/// this searches for the max tick in the logfiles and builds up the initial
/// transaction state
int inspectLogfiles();
/// @brief allocate a new reserve logfile
// allocate a new reserve logfile
int createReserveLogfile(uint32_t);
/// @brief get an id for the next logfile
// get an id for the next logfile
Logfile::IdType nextId();
/// @brief ensure the wal logfiles directory is actually there
// ensure the wal logfiles directory is actually there
int ensureDirectory();
/// @brief return the absolute name of the shutdown file
// return the absolute name of the shutdown file
std::string shutdownFilename() const;
/// @brief return an absolute filename for a logfile id
// return an absolute filename for a logfile id
std::string logfileName(Logfile::IdType) const;
/// @brief return the current time as a string
// return the current time as a string
static std::string getTimeString();
private:
/// @brief pointer to the server
// pointer to the server
TRI_server_t* _server;
/// @brief the arangod config variable containing the database path
std::string* _databasePath;
// the arangod config variable containing the database path
std::string _databasePath;
/// @brief state during recovery
// state during recovery
RecoverState* _recoverState;
/// @brief maximum number of parallel open logfiles
bool _allowOversizeEntries = true;
std::string _directory = "";
uint32_t _historicLogfiles = 10;
bool _ignoreLogfileErrors = false;
bool _ignoreRecoveryErrors = false;
uint32_t _filesize = 32 * 1024 * 1024;
uint32_t _maxOpenLogfiles = 0;
uint32_t _reserveLogfiles = 4;
uint32_t _numberOfSlots = 1048576;
uint64_t _syncInterval = 100;
uint64_t _throttleWhenPending = 0;
uint64_t _maxThrottleWait = 15000;
/// @brief maximum wait time for write-throttling
//YYY #warning JAN this should be non-static, but the singleton cannot be created before 'start'
static bool _allowOversizeEntries;
static std::string _directory;
static uint32_t _historicLogfiles;
static bool _ignoreLogfileErrors;
static bool _ignoreRecoveryErrors;
static uint32_t _filesize;
static uint32_t _maxOpenLogfiles;
static uint32_t _reserveLogfiles;
static uint32_t _numberOfSlots;
static uint64_t _syncInterval;
static uint64_t _throttleWhenPending;
static uint64_t _maxThrottleWait;
/// @brief whether or not writes to the WAL are allowed
// whether or not writes to the WAL are allowed
bool _allowWrites;
/// @brief this is true if there was a SHUTDOWN file with a last tick at
// this is true if there was a SHUTDOWN file with a last tick at
/// server start
bool _hasFoundLastTick;
/// @brief whether or not the recovery procedure is running
// whether or not the recovery procedure is running
bool _inRecovery;
/// @brief whether or not the logfile manager was properly initialized and
// whether or not the logfile manager was properly initialized and
/// started
bool _startCalled;
/// @brief a lock protecting the _logfiles map and the logfiles' statuses
// a lock protecting the _logfiles map and the logfiles' statuses
basics::ReadWriteLock _logfilesLock;
/// @brief the logfiles
// the logfiles
std::map<Logfile::IdType, Logfile*> _logfiles;
/// @brief the slots manager
// the slots manager
Slots* _slots;
/// @brief the synchronizer thread
// the synchronizer thread
SynchronizerThread* _synchronizerThread;
/// @brief the allocator thread
// the allocator thread
AllocatorThread* _allocatorThread;
/// @brief the collector thread
// the collector thread
CollectorThread* _collectorThread;
/// @brief the logfile remover thread
// the logfile remover thread
RemoverThread* _removerThread;
/// @brief last opened logfile id. note: writing to this variable is protected
// last opened logfile id. note: writing to this variable is protected
/// by the _idLock
std::atomic<Logfile::IdType> _lastOpenedId;
/// @brief last fully collected logfile id. note: writing to this variable is
// last fully collected logfile id. note: writing to this variable is
/// protected by the_idLock
std::atomic<Logfile::IdType> _lastCollectedId;
/// @brief last fully sealed logfile id. note: writing to this variable is
// last fully sealed logfile id. note: writing to this variable is
/// protected by the _idLock
std::atomic<Logfile::IdType> _lastSealedId;
/// @brief a lock protecting the shutdown file
// a lock protecting the shutdown file
Mutex _shutdownFileLock;
/// @brief a lock protecting _transactions and _failedTransactions
// a lock protecting _transactions and _failedTransactions
basics::ReadWriteLock _transactionsLock;
/// @brief currently ongoing transactions
// currently ongoing transactions
std::unordered_map<TRI_voc_tid_t, std::pair<Logfile::IdType, Logfile::IdType>>
_transactions;
/// @brief set of failed transactions
// set of failed transactions
std::unordered_set<TRI_voc_tid_t> _failedTransactions;
/// @brief set of dropped collections
// set of dropped collections
/// this is populated during recovery and not used afterwards
std::unordered_set<TRI_voc_cid_t> _droppedCollections;
/// @brief set of dropped databases
// set of dropped databases
/// this is populated during recovery and not used afterwards
std::unordered_set<TRI_voc_tick_t> _droppedDatabases;
/// @brief a lock protecting the updates of _lastCollectedId, _lastSealedId,
// a lock protecting the updates of _lastCollectedId, _lastSealedId,
/// and _lastOpenedId
Mutex _idLock;
/// @brief whether or not write-throttling is currently enabled
// whether or not write-throttling is currently enabled
int _writeThrottled;
/// @brief whether or not we have been shut down already
// whether or not we have been shut down already
volatile sig_atomic_t _shutdown;
/// @brief a lock protecting _barriers
// a lock protecting _barriers
basics::ReadWriteLock _barriersLock;
/// @brief barriers that prevent WAL logfiles from being collected
// barriers that prevent WAL logfiles from being collected
std::unordered_map<TRI_voc_tick_t, LogfileBarrier*> _barriers;
};
}