1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2016-09-02 09:28:15 +02:00
commit fc60183490
18 changed files with 313 additions and 263 deletions

View File

@ -1,6 +1,9 @@
devel
-----
* added --cluster.system-replication-factor in order to adjust the
replication factor for new system collections
* fixed issue #2012
* added a memory expection in case V8 memory gets too low

View File

@ -18,13 +18,13 @@ attribute instead. The result object will also contain an attribute *warnings*,
is an array of warnings that occurred during optimization or execution plan creation.
Each plan in the result is an object with the following attributes:
- *nodes*: the array of execution nodes of the plan. The list of available node types
can be found [here](Optimizer.md)
- *nodes*: the array of execution nodes of the plan. [The list of available node types
can be found here](Optimizer.md)
- *estimatedCost*: the total estimated cost for the plan. If there are multiple
plans, the optimizer will choose the plan with the lowest total cost.
- *collections*: an array of collections used in the query
- *rules*: an array of rules the optimizer applied. The list of rules can be
found [here](Optimizer.md)
- *rules*: an array of rules the optimizer applied. [The list of rules can be
found here](Optimizer.md)
- *variables*: array of variables used in the query (note: this may contain
internal variables created by the optimizer)

View File

@ -276,6 +276,32 @@ the `warnings` attribute of the `explain` result:
There is an upper bound on the number of warning a query may produce. If that
bound is reached, no further warnings will be returned.
!SUBSECTION Things to consider for optimizing queries
While the optimizer can fix some things in queries, its not allowed to take some assumptions,
that you, the user, knowing what queries are intended to do can take. It may pull calculations
to the front of the execution, but it may not cross certain borders.
So in certain cases you may want to move calculations in your query, so they're cheaper.
Even more expensive is if you have calculacions that are executed in javascript:
@startDocuBlockInline AQLEXP_11_explainjs
@EXAMPLE_ARANGOSH_OUTPUT{AQLEXP_11_explainjs}
db._explain('FOR x IN 1..10 LET then=DATE_NOW() FOR y IN 1..10 LET now=DATE_NOW() LET nowstr=CONCAT(now, x, y, then) RETURN nowstr', {}, {colors: false})
db._explain('LET now=DATE_NOW() FOR x IN 1..10 FOR y IN 1..10 LET nowstr=CONCAT(now, x, y, now) RETURN nowstr', {}, {colors: false})
@END_EXAMPLE_ARANGOSH_OUTPUT
@endDocuBlock AQLEXP_11_explainjs
You can see, that the optimizer found `1..10` is specified twice, but can be done first one time.
While you may see time passing by during the execution of the query and its calls to `DATE_NOW()`
this may not be the desired thing in first place. The queries V8 Expressions will however also use
significant resources, since its executed 10 x 10 times => 100 times. Now if we don't care
for the time ticking by during the query execution, we may fetch the time once at the startup
of the query, which will then only give us one V8 expression at the very start of the query.
Next to bringing better performance, this also obeys the [DRY principle](https://en.wikipedia.org/wiki/Don't_repeat_yourself).
!SUBSECTION Optimization in a cluster
When you're running AQL in the cluster, the parsing of the query is done on the

View File

@ -0,0 +1,50 @@
arangosh> db._explain('FOR x IN 1..10 LET then=DATE_NOW() FOR y IN 1..10 LET now=DATE_NOW() LET nowstr=CONCAT(now, x, y, then) RETURN nowstr', {}, {colors: false})
Query string:
FOR x IN 1..10 LET then=DATE_NOW() FOR y IN 1..10 LET now=DATE_NOW() LET nowstr=CONCAT(now, x, y,
then) RETURN nowstr
Execution plan:
Id NodeType Est. Comment
1 SingletonNode 1 * ROOT
2 CalculationNode 1 - LET #5 = 1 .. 10 /* range */ /* simple expression */
3 EnumerateListNode 10 - FOR x IN #5 /* list iteration */
4 CalculationNode 10 - LET then = DATE_NOW() /* v8 expression */
6 EnumerateListNode 100 - FOR y IN #5 /* list iteration */
7 CalculationNode 100 - LET now = DATE_NOW() /* v8 expression */
8 CalculationNode 100 - LET nowstr = CONCAT(now, x, y, then) /* simple expression */
9 ReturnNode 100 - RETURN nowstr
Indexes used:
none
Optimization rules applied:
Id RuleName
1 move-calculations-up
2 remove-redundant-calculations
3 remove-unnecessary-calculations
arangosh> db._explain('LET now=DATE_NOW() FOR x IN 1..10 FOR y IN 1..10 LET nowstr=CONCAT(now, x, y, now) RETURN nowstr', {}, {colors: false})
Query string:
LET now=DATE_NOW() FOR x IN 1..10 FOR y IN 1..10 LET nowstr=CONCAT(now, x, y, now) RETURN nowstr
Execution plan:
Id NodeType Est. Comment
1 SingletonNode 1 * ROOT
3 CalculationNode 1 - LET #4 = 1 .. 10 /* range */ /* simple expression */
2 CalculationNode 1 - LET now = DATE_NOW() /* v8 expression */
4 EnumerateListNode 10 - FOR x IN #4 /* list iteration */
6 EnumerateListNode 100 - FOR y IN #4 /* list iteration */
7 CalculationNode 100 - LET nowstr = CONCAT(now, x, y, now) /* simple expression */
8 ReturnNode 100 - RETURN nowstr
Indexes used:
none
Optimization rules applied:
Id RuleName
1 move-calculations-up
2 remove-redundant-calculations
3 remove-unnecessary-calculations

View File

@ -38,6 +38,7 @@
#include "ProgramOptions/Section.h"
#include "RestServer/DatabaseServerFeature.h"
#include "SimpleHttpClient/ConnectionManager.h"
#include "V8Server/V8DealerFeature.h"
#include "VocBase/server.h"
using namespace arangodb;
@ -67,7 +68,7 @@ ClusterFeature::ClusterFeature(application_features::ApplicationServer* server)
ClusterFeature::~ClusterFeature() {
delete _heartbeatThread;
if (_enableCluster) {
AgencyComm::cleanup();
}
@ -126,6 +127,10 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addOption("--cluster.coordinator-config",
"path to the coordinator configuration",
new StringParameter(&_coordinatorConfig));
options->addOption("--cluster.system-replication-factor",
"replication factor for system collections",
new UInt32Parameter(&_systemReplicationFactor));
}
void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
@ -180,6 +185,12 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
FATAL_ERROR_EXIT();
}
}
// validate system-replication-factor
if (_systemReplicationFactor == 0) {
LOG(FATAL) << "system replication factor must be greater 0";
FATAL_ERROR_EXIT();
}
}
void ClusterFeature::prepare() {
@ -190,6 +201,12 @@ void ClusterFeature::prepare() {
ServerState::instance()->setDBserverConfig(_dbserverConfig);
ServerState::instance()->setCoordinatorConfig(_coordinatorConfig);
V8DealerFeature* v8Dealer =
ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
v8Dealer->defineDouble("SYS_DEFAULT_REPLICATION_FACTOR_SYSTEM",
_systemReplicationFactor);
// create callback registery
_agencyCallbackRegistry.reset(
new AgencyCallbackRegistry(agencyCallbacksPath()));
@ -203,8 +220,9 @@ void ClusterFeature::prepare() {
// create an instance (this will not yet create a thread)
ClusterComm::instance();
AgencyFeature* agency =
application_features::ApplicationServer::getFeature<AgencyFeature>("Agency");
AgencyFeature* agency =
application_features::ApplicationServer::getFeature<AgencyFeature>(
"Agency");
if (agency->isEnabled() || _enableCluster) {
// initialize ClusterComm library, must call initialize only once
@ -335,12 +353,11 @@ void ClusterFeature::prepare() {
<< "' specified for --cluster.my-address";
FATAL_ERROR_EXIT();
}
}
//YYY #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
//YYY #warning FRANK split into methods
//YYY #endif
// YYY #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// YYY #warning FRANK split into methods
// YYY #endif
void ClusterFeature::start() {
// return if cluster is disabled
@ -371,53 +388,49 @@ void ClusterFeature::start() {
AgencyCommResult result = comm.getValues("Sync/HeartbeatIntervalMs");
if (result.successful()) {
velocypack::Slice HeartbeatIntervalMs =
result.slice()[0].get(std::vector<std::string>(
{AgencyComm::prefix(), "Sync", "HeartbeatIntervalMs"}));
result.slice()[0].get(std::vector<std::string>(
{AgencyComm::prefix(), "Sync", "HeartbeatIntervalMs"}));
if (HeartbeatIntervalMs.isInteger()) {
try {
_heartbeatInterval = HeartbeatIntervalMs.getUInt();
LOG(INFO) << "using heartbeat interval value '" << _heartbeatInterval
<< " ms' from agency";
}
catch (...) {
} catch (...) {
// Ignore if it is not a small int or uint
}
}
}
// no value set in agency. use default
if (_heartbeatInterval == 0) {
_heartbeatInterval = 5000; // 1/s
LOG(WARN) << "unable to read heartbeat interval from agency. Using "
<< "default value '" << _heartbeatInterval << " ms'";
}
// start heartbeat thread
_heartbeatThread = new HeartbeatThread(DatabaseServerFeature::SERVER,
_agencyCallbackRegistry.get(),
_heartbeatInterval * 1000, 5);
if (!_heartbeatThread->init() || !_heartbeatThread->start()) {
LOG(FATAL) << "heartbeat could not connect to agency endpoints ("
<< endpoints << ")";
FATAL_ERROR_EXIT();
}
while (!_heartbeatThread->isReady()) {
// wait until heartbeat is ready
usleep(10000);
}
}
AgencyCommResult result;
while (true) {
VPackBuilder builder;
try {
VPackObjectBuilder b(&builder);
@ -429,7 +442,7 @@ void ClusterFeature::start() {
result = comm.setValue("Current/ServersRegistered/" + _myId,
builder.slice(), 0.0);
if (!result.successful()) {
LOG(FATAL) << "unable to register server in agency: http code: "
<< result.httpCode() << ", body: " << result.body();
@ -437,7 +450,7 @@ void ClusterFeature::start() {
} else {
break;
}
sleep(1);
}
@ -449,7 +462,7 @@ void ClusterFeature::start() {
ServerState::instance()->setState(ServerState::STATE_SYNCING);
}
DispatcherFeature* dispatcher =
DispatcherFeature* dispatcher =
ApplicationServer::getFeature<DispatcherFeature>("Dispatcher");
dispatcher->buildAqlQueue();
@ -460,13 +473,13 @@ void ClusterFeature::unprepare() {
if (_heartbeatThread != nullptr) {
_heartbeatThread->beginShutdown();
}
// change into shutdown state
ServerState::instance()->setState(ServerState::STATE_SHUTDOWN);
AgencyComm comm;
comm.sendServerState(0.0);
if (_heartbeatThread != nullptr) {
int counter = 0;
while (_heartbeatThread->isRunning()) {
@ -493,32 +506,30 @@ void ClusterFeature::unprepare() {
AgencyComm comm;
comm.sendServerState(0.0);
// Try only once to unregister because maybe the agencycomm
// is shutting down as well...
ServerState::RoleEnum role = ServerState::instance()->getRole();
AgencyWriteTransaction unreg;
// Remove from role
if (role == ServerState::ROLE_PRIMARY) {
unreg.operations.push_back(
AgencyOperation("Current/DBServers/" + _myId,
AgencySimpleOperationType::DELETE_OP));
unreg.operations.push_back(AgencyOperation(
"Current/DBServers/" + _myId, AgencySimpleOperationType::DELETE_OP));
} else if (role == ServerState::ROLE_COORDINATOR) {
unreg.operations.push_back(
AgencyOperation("Current/Coordinators/" + _myId,
AgencySimpleOperationType::DELETE_OP));
unreg.operations.push_back(AgencyOperation(
"Current/Coordinators/" + _myId, AgencySimpleOperationType::DELETE_OP));
}
// Unregister
// Unregister
unreg.operations.push_back(
AgencyOperation("Current/ServersRegistered/" + _myId,
AgencySimpleOperationType::DELETE_OP));
AgencyOperation("Current/ServersRegistered/" + _myId,
AgencySimpleOperationType::DELETE_OP));
comm.sendTransactionWithFailover(unreg, 120.0);
while (_heartbeatThread->isRunning()) {
usleep(50000);
}

View File

@ -58,6 +58,7 @@ class ClusterFeature : public application_features::ApplicationFeature {
std::string _arangodPath;
std::string _dbserverConfig;
std::string _coordinatorConfig;
uint32_t _systemReplicationFactor = 2;
public:
AgencyCallbackRegistry* agencyCallbackRegistry() const {

View File

@ -128,7 +128,7 @@ DocumentDitch* TransactionContext::orderDitch(TRI_document_collection_t* documen
if (it != _ditches.end()) {
// tell everyone else this ditch is still in use,
// at least until the transaction is over
(*it).second->setUsedByTransaction();
TRI_ASSERT((*it).second->usedByTransaction());
// ditch already exists, return it
return (*it).second;
}

View File

@ -48,29 +48,10 @@ TRI_document_collection_t* Ditch::collection() const {
DocumentDitch::DocumentDitch(Ditches* ditches, bool usedByTransaction,
char const* filename, int line)
: Ditch(ditches, filename, line),
_usedByExternal(0),
_usedByTransaction(usedByTransaction) {}
DocumentDitch::~DocumentDitch() {}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the _usedByTransaction flag, using the required lock
////////////////////////////////////////////////////////////////////////////////
void DocumentDitch::setUsedByTransaction() {
auto callback = [this]() -> void { _usedByTransaction = true; };
_ditches->executeProtected(callback);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief increases the _usedByExternal value, using the required lock
////////////////////////////////////////////////////////////////////////////////
void DocumentDitch::setUsedByExternal() {
auto callback = [this]() -> void { ++_usedByExternal; };
_ditches->executeProtected(callback);
}
ReplicationDitch::ReplicationDitch(Ditches* ditches, char const* filename,
int line)
: Ditch(ditches, filename, line) {}
@ -328,41 +309,23 @@ void Ditches::freeDitch(Ditch* ditch) {
void Ditches::freeDocumentDitch(DocumentDitch* ditch, bool fromTransaction) {
TRI_ASSERT(ditch != nullptr);
// First see who might still be using the ditch:
if (fromTransaction) {
TRI_ASSERT(ditch->usedByTransaction() == true);
}
bool shouldFree = false;
{
// Really free it:
MUTEX_LOCKER(mutexLocker, _lock); // FIX_MUTEX
// First see who might still be using the ditch:
if (fromTransaction) {
TRI_ASSERT(ditch->_usedByTransaction == true);
ditch->_usedByTransaction = false;
} else {
// note: _usedByExternal may or may not be set when we get here
// the reason is that there are ditches not linked at all
// (when a ditch is created ahead of operations but the operations are
// not executed etc.)
if (ditch->_usedByExternal > 0) {
--ditch->_usedByExternal;
}
}
unlink(ditch);
if (ditch->_usedByTransaction == false && ditch->_usedByExternal == 0) {
// Really free it:
unlink(ditch);
// decrease counter
--_numDocumentDitches;
// free the ditch
shouldFree = true;
}
// decrease counter
--_numDocumentDitches;
}
if (shouldFree) {
delete ditch;
}
delete ditch;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -133,12 +133,10 @@ class DocumentDitch : public Ditch {
char const* typeName() const override final { return "document-reference"; }
void setUsedByTransaction();
void setUsedByExternal();
bool usedByTransaction() const { return _usedByTransaction; }
private:
uint32_t _usedByExternal;
bool _usedByTransaction;
bool const _usedByTransaction;
};
////////////////////////////////////////////////////////////////////////////////

View File

@ -101,9 +101,6 @@ LogfileManager::LogfileManager(ApplicationServer* server)
_lastCollectedId(0),
_lastSealedId(0),
_shutdownFileLock(),
_transactionsLock(),
_transactions(),
_failedTransactions(),
_droppedCollections(),
_droppedDatabases(),
_idLock(),
@ -116,9 +113,6 @@ LogfileManager::LogfileManager(ApplicationServer* server)
requiresElevatedPrivileges(false);
startsAfter("DatabaseServer");
startsAfter("QueryRegistry");
_transactions.reserve(32);
_failedTransactions.reserve(32);
}
// destroy the logfile manager
@ -340,12 +334,14 @@ bool LogfileManager::open() {
// note all failed transactions that we found plus the list
// of collections and databases that we can ignore
{
WRITE_LOCKER(writeLocker, _transactionsLock);
_failedTransactions.reserve(_recoverState->failedTransactions.size());
WRITE_LOCKER(allTransactionsLocker, _allTransactionsLock);
for (auto const& it : _recoverState->failedTransactions) {
_failedTransactions.emplace(it.first);
size_t bucket = getBucket(it.first);
WRITE_LOCKER(locker, _transactions[bucket]._lock);
_transactions[bucket]._failedTransactions.emplace(it.first);
}
_droppedDatabases = _recoverState->droppedDatabases;
@ -551,12 +547,13 @@ int LogfileManager::registerTransaction(TRI_voc_tid_t transactionId) {
}
try {
auto p = std::make_pair(lastCollectedId, lastSealedId);
WRITE_LOCKER(writeLocker, _transactionsLock);
size_t bucket = getBucket(transactionId);
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
// insert into currently running list of transactions
_transactions.emplace(transactionId, std::move(p));
_transactions[bucket]._activeTransactions.emplace(transactionId, std::make_pair(lastCollectedId, lastSealedId));
TRI_ASSERT(lastCollectedId <= lastSealedId);
return TRI_ERROR_NO_ERROR;
@ -568,12 +565,15 @@ int LogfileManager::registerTransaction(TRI_voc_tid_t transactionId) {
// unregisters a transaction
void LogfileManager::unregisterTransaction(TRI_voc_tid_t transactionId,
bool markAsFailed) {
WRITE_LOCKER(writeLocker, _transactionsLock);
size_t bucket = getBucket(transactionId);
READ_LOCKER(allTransactionsLocker, _allTransactionsLock);
WRITE_LOCKER(writeLocker, _transactions[bucket]._lock);
_transactions.erase(transactionId);
_transactions[bucket]._activeTransactions.erase(transactionId);
if (markAsFailed) {
_failedTransactions.emplace(transactionId);
_transactions[bucket]._failedTransactions.emplace(transactionId);
}
}
@ -582,8 +582,15 @@ std::unordered_set<TRI_voc_tid_t> LogfileManager::getFailedTransactions() {
std::unordered_set<TRI_voc_tid_t> failedTransactions;
{
READ_LOCKER(readLocker, _transactionsLock);
failedTransactions = _failedTransactions;
WRITE_LOCKER(allTransactionsLocker, _allTransactionsLock);
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
READ_LOCKER(locker, _transactions[bucket]._lock);
for (auto const& it : _transactions[bucket]._failedTransactions) {
failedTransactions.emplace(it);
}
}
}
return failedTransactions;
@ -618,10 +625,15 @@ std::unordered_set<TRI_voc_tick_t> LogfileManager::getDroppedDatabases() {
// unregister a list of failed transactions
void LogfileManager::unregisterFailedTransactions(
std::unordered_set<TRI_voc_tid_t> const& failedTransactions) {
WRITE_LOCKER(writeLocker, _transactionsLock);
WRITE_LOCKER(allTransactionsLocker, _allTransactionsLock);
std::for_each(failedTransactions.begin(), failedTransactions.end(),
[&](TRI_voc_tid_t id) { _failedTransactions.erase(id); });
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
READ_LOCKER(locker, _transactions[bucket]._lock);
std::for_each(failedTransactions.begin(), failedTransactions.end(),
[&](TRI_voc_tid_t id) { _transactions[bucket]._failedTransactions.erase(id); });
}
}
// whether or not it is currently allowed to create an additional
@ -1371,16 +1383,20 @@ Logfile* LogfileManager::getCollectableLogfile() {
// iterate over all active readers and find their minimum used logfile id
Logfile::IdType minId = UINT64_MAX;
{
READ_LOCKER(readLocker, _transactionsLock);
{
WRITE_LOCKER(allTransactionsLocker, _allTransactionsLock);
// iterate over all active transactions and find their minimum used logfile
// id
for (auto const& it : _transactions) {
Logfile::IdType lastWrittenId = it.second.second;
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
READ_LOCKER(locker, _transactions[bucket]._lock);
if (lastWrittenId < minId) {
minId = lastWrittenId;
for (auto const& it : _transactions[bucket]._activeTransactions) {
Logfile::IdType lastWrittenId = it.second.second;
if (lastWrittenId < minId) {
minId = lastWrittenId;
}
}
}
}
@ -1421,14 +1437,18 @@ Logfile* LogfileManager::getRemovableLogfile() {
Logfile::IdType minId = UINT64_MAX;
{
READ_LOCKER(readLocker, _transactionsLock);
WRITE_LOCKER(allTransactionsLocker, _allTransactionsLock);
// iterate over all active readers and find their minimum used logfile id
for (auto const& it : _transactions) {
Logfile::IdType lastCollectedId = it.second.first;
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
READ_LOCKER(locker, _transactions[bucket]._lock);
if (lastCollectedId < minId) {
minId = lastCollectedId;
for (auto const& it : _transactions[bucket]._activeTransactions) {
Logfile::IdType lastCollectedId = it.second.first;
if (lastCollectedId < minId) {
minId = lastCollectedId;
}
}
}
}
@ -1591,19 +1611,23 @@ LogfileManager::runningTransactions() {
{
Logfile::IdType value;
READ_LOCKER(readLocker, _transactionsLock);
WRITE_LOCKER(readLocker, _allTransactionsLock);
for (auto const& it : _transactions) {
++count;
for (size_t bucket = 0; bucket < numBuckets; ++bucket) {
READ_LOCKER(locker, _transactions[bucket]._lock);
value = it.second.first;
if (value < lastCollectedId) {
lastCollectedId = value;
}
count += _transactions[bucket]._activeTransactions.size();
for (auto const& it : _transactions[bucket]._activeTransactions) {
value = it.second.second;
if (value < lastSealedId) {
lastSealedId = value;
value = it.second.first;
if (value < lastCollectedId) {
lastCollectedId = value;
}
value = it.second.second;
if (value < lastSealedId) {
lastSealedId = value;
}
}
}
}

View File

@ -95,6 +95,8 @@ class LogfileManager final : public application_features::ApplicationFeature {
LogfileManager(LogfileManager const&) = delete;
LogfileManager& operator=(LogfileManager const&) = delete;
static constexpr size_t numBuckets = 8;
public:
explicit LogfileManager(application_features::ApplicationServer* server);
@ -376,6 +378,10 @@ class LogfileManager final : public application_features::ApplicationFeature {
std::tuple<size_t, Logfile::IdType, Logfile::IdType> runningTransactions();
private:
// hashes the transaction id into a bucket
size_t getBucket(TRI_voc_tid_t id) const { return std::hash<TRI_voc_cid_t>()(id) % numBuckets; }
// memcpy the data into the WAL region and return the filled slot
// to the WAL logfile manager
SlotInfoCopy writeSlot(SlotInfo& slotInfo,
@ -515,15 +521,20 @@ class LogfileManager final : public application_features::ApplicationFeature {
// a lock protecting the shutdown file
Mutex _shutdownFileLock;
// a lock protecting _transactions and _failedTransactions
basics::ReadWriteLock _transactionsLock;
// a lock protecting ALL buckets in _transactions
basics::ReadWriteLock _allTransactionsLock;
// currently ongoing transactions
std::unordered_map<TRI_voc_tid_t, std::pair<Logfile::IdType, Logfile::IdType>>
_transactions;
struct {
// a lock protecting _activeTransactions and _failedTransactions
basics::ReadWriteLock _lock;
// set of failed transactions
std::unordered_set<TRI_voc_tid_t> _failedTransactions;
// currently ongoing transactions
std::unordered_map<TRI_voc_tid_t, std::pair<Logfile::IdType, Logfile::IdType>>
_activeTransactions;
// set of failed transactions
std::unordered_set<TRI_voc_tid_t> _failedTransactions;
} _transactions[numBuckets];
// set of dropped collections
/// this is populated during recovery and not used afterwards

View File

@ -475,6 +475,24 @@ function analyzeCoreDumpWindows (instanceInfo) {
// //////////////////////////////////////////////////////////////////////////////
function analyzeServerCrash (arangod, options, checkStr) {
serverCrashed = true;
var cpf = "/proc/sys/kernel/core_pattern";
if (fs.isFile(cpf)) {
var matchApport = /.*apport.*/;
var matchVarTmp = /\/var\/tmp/;
var corePattern = fs.readBuffer(cpf);
var cp = corePattern.asciiSlice(0, corePattern.length);
if (matchApport.exec(cp) != null) {
print(RED + "apport handles corefiles on your system. Uninstall it if you want us to get corefiles for analysis.");
return;
}
if (matchVarTmp.exec(cp) == null) {
print(RED + "Don't know howto locate corefiles in your system. '" + cpf + "' contains: '" + cp + "'");
return;
}
}
const storeArangodPath = arangod.rootDir + '/arangod_' + arangod.pid;
print(RED +

View File

@ -44,6 +44,8 @@ var mountAppRegEx = /\/APP(\/|$)/i;
var mountNumberRegEx = /^\/[\d\-%]/;
var pathRegex = /^((\.{0,2}(\/|\\))|(~\/)|[a-zA-Z]:\\)/;
const DEFAULT_REPLICATION_FACTOR_SYSTEM = internal.DEFAULT_REPLICATION_FACTOR_SYSTEM;
var getReadableName = function (name) {
return name.split(/([-_]|\s)+/).map(function (token) {
return token.slice(0, 1).toUpperCase() + token.slice(1);
@ -53,7 +55,7 @@ var getReadableName = function (name) {
var getStorage = function () {
var c = db._collection('_apps');
if (c === null) {
c = db._create('_apps', {isSystem: true, replicationFactor: 2,
c = db._create('_apps', {isSystem: true, replicationFactor: DEFAULT_REPLICATION_FACTOR_SYSTEM,
distributeShardsLike: '_graphs', journalSize: 4 * 1024 * 1024});
c.ensureIndex({ type: 'hash', fields: [ 'mount' ], unique: true });
}

View File

@ -667,7 +667,8 @@ var bindEdgeCollections = function (self, edgeCollections) {
var err = new ArangoError();
err.errorNum = arangodb.errors.ERROR_GRAPH_INVALID_EDGE.code;
err.errorMessage =
arangodb.errors.ERROR_GRAPH_INVALID_EDGE.message + ' between ' + from + ' and ' + to + '.';
arangodb.errors.ERROR_GRAPH_INVALID_EDGE.message +
' between ' + from + ' and ' + to + '. Doesn\'t conform to any edge definition';
throw err;
}
}

View File

@ -346,4 +346,13 @@
exports.sendChunk = global.SYS_SEND_CHUNK;
delete global.SYS_SEND_CHUNK;
}
// //////////////////////////////////////////////////////////////////////////////
// / @brief default replication factor
// //////////////////////////////////////////////////////////////////////////////
if (global.SYS_DEFAULT_REPLICATION_FACTOR_SYSTEM) {
exports.DEFAULT_REPLICATION_FACTOR_SYSTEM = global.SYS_DEFAULT_REPLICATION_FACTOR_SYSTEM;
delete global.SYS_DEFAULT_REPLICATION_FACTOR_SYSTEM;
}
}());

View File

@ -27,9 +27,10 @@
// / @author Copyright 2014, triAGENS GmbH, Cologne, Germany
// //////////////////////////////////////////////////////////////////////////////
var internal = require('internal');
var cluster = require('@arangodb/cluster');
var db = internal.db;
const internal = require('internal');
const cluster = require('@arangodb/cluster');
const db = internal.db;
const DEFAULT_REPLICATION_FACTOR_SYSTEM = internal.DEFAULT_REPLICATION_FACTOR_SYSTEM;
// //////////////////////////////////////////////////////////////////////////////
// / @brief initialized
@ -51,7 +52,7 @@ function createStatisticsCollection (name) {
try {
r = db._create(name, { isSystem: true, waitForSync: false,
replicationFactor: 2,
replicationFactor: DEFAULT_REPLICATION_FACTOR_SYSTEM,
journalSize: 8 * 1024 * 1024,
distributeShardsLike: '_graphs' });
} catch (err) {}

View File

@ -48,7 +48,7 @@
function upgrade () {
// default replication factor for system collections
const DEFAULT_REPLICATION_FACTOR_SYSTEM = 2;
const DEFAULT_REPLICATION_FACTOR_SYSTEM = internal.DEFAULT_REPLICATION_FACTOR_SYSTEM;
// system database only
const DATABASE_SYSTEM = 1000;

View File

@ -42,10 +42,7 @@ class DeadlockDetector {
DeadlockDetector& operator=(DeadlockDetector const&) = delete;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief add a thread to the list of blocked threads
////////////////////////////////////////////////////////////////////////////////
int detectDeadlock(T const* value, bool isWrite) {
auto tid = Thread::currentThreadId();
@ -53,81 +50,48 @@ class DeadlockDetector {
return detectDeadlock(value, tid, isWrite);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a reader to the list of blocked readers
////////////////////////////////////////////////////////////////////////////////
int setReaderBlocked(T const* value) { return setBlocked(value, false); }
////////////////////////////////////////////////////////////////////////////////
/// @brief add a writer to the list of blocked writers
////////////////////////////////////////////////////////////////////////////////
int setWriterBlocked(T const* value) { return setBlocked(value, true); }
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a reader from the list of blocked readers
////////////////////////////////////////////////////////////////////////////////
void unsetReaderBlocked(T const* value) { unsetBlocked(value, false); }
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a writer from the list of blocked writers
////////////////////////////////////////////////////////////////////////////////
void unsetWriterBlocked(T const* value) { unsetBlocked(value, true); }
////////////////////////////////////////////////////////////////////////////////
/// @brief add a reader to the list of active readers
////////////////////////////////////////////////////////////////////////////////
void addReader(T const* value, bool wasBlockedBefore) {
addActive(value, false, wasBlockedBefore);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a writer to the list of active writers
////////////////////////////////////////////////////////////////////////////////
void addWriter(T const* value, bool wasBlockedBefore) {
addActive(value, true, wasBlockedBefore);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unregister a reader from the list of active readers
////////////////////////////////////////////////////////////////////////////////
void unsetReader(T const* value) { unsetActive(value, false); }
////////////////////////////////////////////////////////////////////////////////
/// @brief unregister a writer from the list of active writers
////////////////////////////////////////////////////////////////////////////////
void unsetWriter(T const* value) { unsetActive(value, true); }
////////////////////////////////////////////////////////////////////////////////
/// @brief enable / disable
////////////////////////////////////////////////////////////////////////////////
void enabled(bool value) {
MUTEX_LOCKER(mutexLocker, _lock);
_enabled = value;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the enabled status
////////////////////////////////////////////////////////////////////////////////
bool enabled() {
MUTEX_LOCKER(mutexLocker, _lock);
return _enabled;
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief add a thread to the list of blocked threads
////////////////////////////////////////////////////////////////////////////////
int detectDeadlock(T const* value, TRI_tid_t tid, bool isWrite) const {
if (!_enabled) {
return TRI_ERROR_NO_ERROR;
@ -142,57 +106,51 @@ class DeadlockDetector {
};
std::unordered_set<TRI_tid_t> visited;
std::vector<StackValue> stack;
stack.emplace_back(StackValue(value, tid, isWrite));
std::vector<StackValue> stack{ StackValue(value, tid, isWrite) };
while (!stack.empty()) {
StackValue top = stack.back(); // intentionally copy StackValue
stack.pop_back();
auto it = _active.find(top.value);
if (it != _active.end()) {
if (!top.isWrite) {
// we are a reader
bool other = (*it).second.second;
if (!top.isWrite) {
// we are a reader
auto it = _active.find(top.value);
if (other) {
// other is a writer
TRI_tid_t otherTid = *((*it).second.first.begin());
if (it != _active.end()) {
bool other = (*it).second.second;
if (visited.find(otherTid) != visited.end()) {
return TRI_ERROR_DEADLOCK;
}
if (other) {
// other is a writer
TRI_tid_t otherTid = *((*it).second.first.begin());
auto it2 = _blocked.find(otherTid);
if (visited.find(otherTid) != visited.end()) {
return TRI_ERROR_DEADLOCK;
}
if (it2 != _blocked.end()) {
// writer thread is blocking...
stack.emplace_back((*it2).second.first, otherTid, other);
}
}
} else {
// we are a writer
auto it2 = _blocked.find(otherTid);
// other is either a reader or a writer
for (auto const& otherTid : (*it).second.first) {
if (visited.find(otherTid) != visited.end()) {
return TRI_ERROR_DEADLOCK;
}
if (it2 != _blocked.end()) {
// writer thread is blocking...
stack.emplace_back(
StackValue((*it2).second.first, otherTid, other));
}
}
}
} else {
// we are a writer
auto it = _active.find(top.value);
auto it2 = _blocked.find(otherTid);
if (it != _active.end()) {
// other is either a reader or a writer
for (auto const& otherTid : (*it).second.first) {
if (visited.find(otherTid) != visited.end()) {
return TRI_ERROR_DEADLOCK;
}
auto it2 = _blocked.find(otherTid);
if (it2 != _blocked.end()) {
// writer thread is blocking...
stack.emplace_back(StackValue((*it2).second.first, otherTid,
(*it).second.second));
}
}
}
if (it2 != _blocked.end()) {
// writer thread is blocking...
stack.emplace_back((*it2).second.first, otherTid, (*it).second.second);
}
}
}
}
visited.emplace(top.tid);
@ -202,10 +160,7 @@ class DeadlockDetector {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a thread to the list of blocked threads
////////////////////////////////////////////////////////////////////////////////
int setBlocked(T const* value, bool isWrite) {
auto tid = Thread::currentThreadId();
@ -215,15 +170,13 @@ class DeadlockDetector {
return TRI_ERROR_NO_ERROR;
}
auto it = _blocked.find(tid);
bool const inserted = _blocked.emplace(tid, std::make_pair(value, isWrite)).second;
if (it != _blocked.end()) {
// we're already blocking. should never happend
return TRI_ERROR_DEADLOCK;
if (!inserted) {
// we're already blocking. should never happen
return TRI_ERROR_DEADLOCK;
}
_blocked.emplace(tid, std::make_pair(value, isWrite));
try {
int res = detectDeadlock(value, tid, isWrite);
@ -252,10 +205,7 @@ class DeadlockDetector {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a thread from the list of blocked threads
////////////////////////////////////////////////////////////////////////////////
void unsetBlocked(T const* value, bool isWrite) {
auto tid = Thread::currentThreadId();
@ -274,14 +224,11 @@ class DeadlockDetector {
#endif
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unregister a thread from the list of active threads
////////////////////////////////////////////////////////////////////////////////
void unsetActive(T const* value, bool isWrite) {
auto tid = Thread::currentThreadId();
MUTEX_LOCKER(mutexLocker, _lock);
MUTEX_LOCKER(mutexLocker, _lock); // note: this lock is expensive when many threads compete
if (!_enabled) {
return;
@ -348,14 +295,11 @@ class DeadlockDetector {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a reader/writer to the list of active threads
////////////////////////////////////////////////////////////////////////////////
void addActive(T const* value, bool isWrite, bool wasBlockedBefore) {
auto tid = Thread::currentThreadId();
MUTEX_LOCKER(mutexLocker, _lock);
MUTEX_LOCKER(mutexLocker, _lock); // note: this lock is expensive when many threads compete
if (!_enabled) {
return;
@ -398,29 +342,17 @@ class DeadlockDetector {
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief lock for managing the data structures
////////////////////////////////////////////////////////////////////////////////
arangodb::Mutex _lock;
////////////////////////////////////////////////////////////////////////////////
/// @brief threads currently blocked
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<TRI_tid_t, std::pair<T const*, bool>> _blocked;
////////////////////////////////////////////////////////////////////////////////
/// @brief threads currently holding locks
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<T const*, std::pair<std::unordered_set<TRI_tid_t>, bool>>
_active;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the detector is enabled
////////////////////////////////////////////////////////////////////////////////
bool _enabled;
};