1
0
Fork 0

Bug fix/scheduler delete (#3077)

* removed delete call

* cleanup

* lower cpu activity of log thread too

* fix log messages

* do not enter threads into unordered_set, as it is unneeded

* do not compile in calls to disabled plan cache

* moved AQL regex cache from thread local variables to a class of its own

* more sensible thread creation and destruction
This commit is contained in:
Frank Celler 2017-08-25 12:00:17 +02:00 committed by Jan
parent 896b32daf3
commit 6d08d4f4aa
27 changed files with 644 additions and 575 deletions

View File

@ -155,15 +155,7 @@ void CalculationBlock::doEvaluation(AqlItemBlock* result) {
if (!_expression->isV8()) {
// an expression that does not require V8
Functions::InitializeThreadContext();
try {
executeExpression(result);
Functions::DestroyThreadContext();
} catch (...) {
Functions::DestroyThreadContext();
throw;
}
executeExpression(result);
} else {
bool const isRunningInCluster = transaction()->state()->isRunningInCluster();

View File

@ -31,6 +31,7 @@
#include "ApplicationFeatures/ApplicationServer.h"
#include "Aql/Function.h"
#include "Aql/Query.h"
#include "Aql/RegexCache.h"
#include "Basics/Exceptions.h"
#include "Basics/ScopeGuard.h"
#include "Basics/StringBuffer.h"
@ -52,13 +53,6 @@
using namespace arangodb;
using namespace arangodb::aql;
/// @brief thread-local cache for compiled regexes (REGEX function)
thread_local std::unordered_map<std::string, RegexMatcher*>* RegexCache =
nullptr;
/// @brief thread-local cache for compiled regexes (LIKE function)
thread_local std::unordered_map<std::string, RegexMatcher*>* LikeCache =
nullptr;
/// @brief convert a number value into an AqlValue
static AqlValue NumberValue(transaction::Methods* trx, int value) {
transaction::BuilderLeaser builder(trx);
@ -99,110 +93,6 @@ void Functions::ValidateParameters(VPackFunctionParameters const& parameters,
static_cast<int>(Function::MaxArguments));
}
/// @brief clear the regex cache in a thread
static void ClearRegexCache() {
if (RegexCache != nullptr) {
for (auto& it : *RegexCache) {
delete it.second;
}
delete RegexCache;
RegexCache = nullptr;
}
}
/// @brief clear the like cache in a thread
static void ClearLikeCache() {
if (LikeCache != nullptr) {
for (auto& it : *LikeCache) {
delete it.second;
}
delete LikeCache;
LikeCache = nullptr;
}
}
/// @brief compile a LIKE pattern from a string
static std::string BuildLikePattern(char const* ptr, size_t length,
bool caseInsensitive) {
std::string pattern;
pattern.reserve(length + 8); // reserve some room
// pattern is always anchored
pattern.push_back('^');
if (caseInsensitive) {
pattern.append("(?i)");
}
bool escaped = false;
for (size_t i = 0; i < length; ++i) {
char const c = ptr[i];
if (c == '\\') {
if (escaped) {
// literal backslash
pattern.append("\\\\");
}
escaped = !escaped;
} else {
if (c == '%') {
if (escaped) {
// literal %
pattern.push_back('%');
} else {
// wildcard
pattern.append("(.|[\r\n])*");
}
} else if (c == '_') {
if (escaped) {
// literal underscore
pattern.push_back('_');
} else {
// wildcard character
pattern.append("(.|[\r\n])");
}
} else if (c == '?' || c == '+' || c == '[' || c == '(' || c == ')' ||
c == '{' || c == '}' || c == '^' || c == '$' || c == '|' ||
c == '\\' || c == '.') {
// character with special meaning in a regex
pattern.push_back('\\');
pattern.push_back(c);
} else {
if (escaped) {
// found a backslash followed by no special character
pattern.append("\\\\");
}
// literal character
pattern.push_back(c);
}
escaped = false;
}
}
// always anchor the pattern
pattern.push_back('$');
return pattern;
}
/// @brief compile a REGEX pattern from a string
static std::string BuildRegexPattern(char const* ptr, size_t length,
bool caseInsensitive) {
std::string pattern;
if (caseInsensitive) {
pattern.reserve(length + 4);
pattern.append("(?i)");
} else {
pattern.reserve(length);
}
pattern.append(ptr, length);
return pattern;
}
/// @brief extract a function parameter from the arguments
AqlValue Functions::ExtractFunctionParameterValue(
transaction::Methods*, VPackFunctionParameters const& parameters,
@ -691,14 +581,6 @@ static void FlattenList(VPackSlice const& array, size_t maxDepth,
}
}
/// @brief called before a query starts
/// has the chance to set up any thread-local storage
void Functions::InitializeThreadContext() {}
/// @brief called when a query ends
/// its responsibility is to clear any thread-local storage
void Functions::DestroyThreadContext() { ClearRegexCache(); ClearLikeCache(); }
/// @brief function IS_NULL
AqlValue Functions::IsNull(arangodb::aql::Query* query,
transaction::Methods* trx,
@ -1243,36 +1125,9 @@ AqlValue Functions::Like(arangodb::aql::Query* query,
// build pattern from parameter #1
AqlValue regex = ExtractFunctionParameterValue(trx, parameters, 1);
AppendAsString(trx, adapter, regex);
std::string const pattern =
BuildLikePattern(buffer->c_str(), buffer->length(), caseInsensitive);
RegexMatcher* matcher = nullptr;
if (LikeCache != nullptr) {
auto it = LikeCache->find(pattern);
// check regex cache
if (it != LikeCache->end()) {
matcher = (*it).second;
}
}
if (matcher == nullptr) {
matcher =
arangodb::basics::Utf8Helper::DefaultUtf8Helper.buildMatcher(pattern);
try {
if (LikeCache == nullptr) {
LikeCache = new std::unordered_map<std::string, RegexMatcher*>();
}
// insert into cache, no matter if pattern is valid or not
LikeCache->emplace(pattern, matcher);
} catch (...) {
delete matcher;
ClearLikeCache();
throw;
}
}
// the matcher is owned by the query!
::RegexMatcher* matcher = query->regexCache()->buildLikeMatcher(buffer->c_str(), buffer->length(), caseInsensitive);
if (matcher == nullptr) {
// compiling regular expression failed
@ -1311,35 +1166,8 @@ AqlValue Functions::RegexTest(arangodb::aql::Query* query,
AqlValue regex = ExtractFunctionParameterValue(trx, parameters, 1);
AppendAsString(trx, adapter, regex);
std::string const pattern =
BuildRegexPattern(buffer->c_str(), buffer->length(), caseInsensitive);
RegexMatcher* matcher = nullptr;
if (RegexCache != nullptr) {
auto it = RegexCache->find(pattern);
// check regex cache
if (it != RegexCache->end()) {
matcher = (*it).second;
}
}
if (matcher == nullptr) {
matcher =
arangodb::basics::Utf8Helper::DefaultUtf8Helper.buildMatcher(pattern);
try {
if (RegexCache == nullptr) {
RegexCache = new std::unordered_map<std::string, RegexMatcher*>();
}
// insert into cache, no matter if pattern is valid or not
RegexCache->emplace(pattern, matcher);
} catch (...) {
delete matcher;
ClearRegexCache();
throw;
}
}
// the matcher is owned by the query!
::RegexMatcher* matcher = query->regexCache()->buildRegexMatcher(buffer->c_str(), buffer->length(), caseInsensitive);
if (matcher == nullptr) {
// compiling regular expression failed
@ -1378,35 +1206,8 @@ AqlValue Functions::RegexReplace(arangodb::aql::Query* query,
AqlValue regex = ExtractFunctionParameterValue(trx, parameters, 1);
AppendAsString(trx, adapter, regex);
std::string const pattern =
BuildRegexPattern(buffer->c_str(), buffer->length(), caseInsensitive);
RegexMatcher* matcher = nullptr;
if (RegexCache != nullptr) {
auto it = RegexCache->find(pattern);
// check regex cache
if (it != RegexCache->end()) {
matcher = (*it).second;
}
}
if (matcher == nullptr) {
matcher =
arangodb::basics::Utf8Helper::DefaultUtf8Helper.buildMatcher(pattern);
try {
if (RegexCache == nullptr) {
RegexCache = new std::unordered_map<std::string, RegexMatcher*>();
}
// insert into cache, no matter if pattern is valid or not
RegexCache->emplace(pattern, matcher);
} catch (...) {
delete matcher;
ClearRegexCache();
throw;
}
}
// the matcher is owned by the query!
::RegexMatcher* matcher = query->regexCache()->buildRegexMatcher(buffer->c_str(), buffer->length(), caseInsensitive);
if (matcher == nullptr) {
// compiling regular expression failed

View File

@ -86,14 +86,6 @@ struct Functions {
char const* funcName, bool recursive);
public:
/// @brief called before a query starts
/// has the chance to set up any thread-local storage
static void InitializeThreadContext();
/// @brief called when a query ends
/// its responsibility is to clear any thread-local storage
static void DestroyThreadContext();
/// @brief helper function. not callable as a "normal" AQL function
static void Stringify(transaction::Methods* trx,
arangodb::basics::VPackStringBufferAdapter& buffer,

View File

@ -301,17 +301,9 @@ bool IndexBlock::initIndexes() {
}
} else {
// no V8 context required!
Functions::InitializeThreadContext();
try {
executeExpressions();
TRI_IF_FAILURE("IndexBlock::executeExpression") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
Functions::DestroyThreadContext();
} catch (...) {
Functions::DestroyThreadContext();
throw;
executeExpressions();
TRI_IF_FAILURE("IndexBlock::executeExpression") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
}
}

View File

@ -36,6 +36,7 @@
#include "Aql/QueryResources.h"
#include "Aql/QueryResultV8.h"
#include "Aql/QueryString.h"
#include "Aql/RegexCache.h"
#include "Aql/ResourceUsage.h"
#include "Aql/types.h"
#include "Basics/Common.h"
@ -194,6 +195,9 @@ class Query {
/// @brief get v8 executor
V8Executor* executor();
/// @brief cache for regular expressions constructed by the query
RegexCache* regexCache() { return &_regexCache; }
/// @brief return the engine, if prepared
ExecutionEngine* engine() const { return _engine.get(); }
@ -339,6 +343,9 @@ class Query {
/// @brief warnings collected during execution
std::vector<std::pair<int, std::string>> _warnings;
/// @brief cache for regular expressions constructed by the query
RegexCache _regexCache;
/// @brief query start time
double _startTime;

157
arangod/Aql/RegexCache.cpp Normal file
View File

@ -0,0 +1,157 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "RegexCache.h"
#include "Basics/Utf8Helper.h"
using namespace arangodb::aql;
RegexCache::~RegexCache() {
clear();
}
void RegexCache::clear() noexcept {
clear(_regexCache);
clear(_likeCache);
}
icu::RegexMatcher* RegexCache::buildRegexMatcher(char const* ptr, size_t length, bool caseInsensitive) {
buildRegexPattern(_temp, ptr, length, caseInsensitive);
return fromCache(_temp, _regexCache);
}
icu::RegexMatcher* RegexCache::buildLikeMatcher(char const* ptr, size_t length, bool caseInsensitive) {
buildLikePattern(_temp, ptr, length, caseInsensitive);
return fromCache(_temp, _likeCache);
}
void RegexCache::clear(std::unordered_map<std::string, icu::RegexMatcher*>& cache) noexcept {
try {
for (auto& it : cache) {
delete it.second;
}
cache.clear();
} catch (...) {
}
}
/// @brief get matcher from cache, or insert a new matcher for the specified pattern
icu::RegexMatcher* RegexCache::fromCache(std::string const& pattern,
std::unordered_map<std::string, icu::RegexMatcher*>& cache) {
auto it = cache.find(pattern);
if (it != cache.end()) {
return (*it).second;
}
icu::RegexMatcher* matcher = arangodb::basics::Utf8Helper::DefaultUtf8Helper.buildMatcher(pattern);
try {
// insert into cache, no matter if pattern is valid or not
cache.emplace(_temp, matcher);
return matcher;
} catch (...) {
delete matcher;
throw;
}
}
/// @brief compile a REGEX pattern from a string
void RegexCache::buildRegexPattern(std::string& out,
char const* ptr, size_t length,
bool caseInsensitive) {
out.clear();
if (caseInsensitive) {
out.reserve(length + 4);
out.append("(?i)");
}
out.append(ptr, length);
}
/// @brief compile a LIKE pattern from a string
void RegexCache::buildLikePattern(std::string& out,
char const* ptr, size_t length,
bool caseInsensitive) {
out.clear();
out.reserve(length + 8); // reserve some room
// pattern is always anchored
out.push_back('^');
if (caseInsensitive) {
out.append("(?i)");
}
bool escaped = false;
for (size_t i = 0; i < length; ++i) {
char const c = ptr[i];
if (c == '\\') {
if (escaped) {
// literal backslash
out.append("\\\\");
}
escaped = !escaped;
} else {
if (c == '%') {
if (escaped) {
// literal %
out.push_back('%');
} else {
// wildcard
out.append("(.|[\r\n])*");
}
} else if (c == '_') {
if (escaped) {
// literal underscore
out.push_back('_');
} else {
// wildcard character
out.append("(.|[\r\n])");
}
} else if (c == '?' || c == '+' || c == '[' || c == '(' || c == ')' ||
c == '{' || c == '}' || c == '^' || c == '$' || c == '|' ||
c == '\\' || c == '.') {
// character with special meaning in a regex
out.push_back('\\');
out.push_back(c);
} else {
if (escaped) {
// found a backslash followed by no special character
out.append("\\\\");
}
// literal character
out.push_back(c);
}
escaped = false;
}
}
// always anchor the pattern
out.push_back('$');
}

70
arangod/Aql/RegexCache.h Normal file
View File

@ -0,0 +1,70 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_REGEX_CACHE_H
#define ARANGOD_AQL_REGEX_CACHE_H 1
#include "Basics/Common.h"
#include <unicode/regex.h>
namespace arangodb {
namespace aql {
class RegexCache {
public:
RegexCache(RegexCache const&) = delete;
RegexCache& operator=(RegexCache const&) = delete;
RegexCache() = default;
~RegexCache();
void clear() noexcept;
icu::RegexMatcher* buildRegexMatcher(char const* ptr, size_t length, bool caseInsensitive);
icu::RegexMatcher* buildLikeMatcher(char const* ptr, size_t length, bool caseInsensitive);
private:
/// @brief clear the specified cache
void clear(std::unordered_map<std::string, icu::RegexMatcher*>& cache) noexcept;
/// @brief get matcher from cache, or insert a new matcher for the specified pattern
icu::RegexMatcher* fromCache(std::string const& pattern,
std::unordered_map<std::string, icu::RegexMatcher*>& cache);
static void buildRegexPattern(std::string& out, char const* ptr, size_t length, bool caseInsensitive);
static void buildLikePattern(std::string& out, char const* ptr, size_t length, bool caseInsensitive);
private:
/// @brief cache for compiled regexes (REGEX function)
std::unordered_map<std::string, icu::RegexMatcher*> _regexCache;
/// @brief cache for compiled regexes (LIKE function)
std::unordered_map<std::string, icu::RegexMatcher*> _likeCache;
/// @brief a reusable string object for pattern generation
std::string _temp;
};
}
}
#endif

View File

@ -157,6 +157,7 @@ SET(ARANGOD_SOURCES
Aql/QueryResources.cpp
Aql/QueryString.cpp
Aql/Range.cpp
Aql/RegexCache.cpp
Aql/RestAqlHandler.cpp
Aql/Scopes.cpp
Aql/ShortStringStorage.cpp

View File

@ -35,7 +35,7 @@
#include "Basics/asio-helper.h"
#include "Cache/CacheManagerFeatureThreads.h"
#include "Cache/Manager.h"
#include "Logger/LogAppender.h"
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "Scheduler/Scheduler.h"

View File

@ -2167,8 +2167,10 @@ std::shared_ptr<Index> MMFilesCollection::createIndex(transaction::Methods* trx,
THROW_ARANGO_EXCEPTION(res);
}
#if USE_PLAN_CACHE
arangodb::aql::PlanCache::instance()->invalidate(
_logicalCollection->vocbase());
#endif
// Until here no harm is done if sth fails. The shared ptr will clean up. if
// left before

View File

@ -521,26 +521,21 @@ static inline node_t* FindDirectSubNodeLinear(const node_t* const node,
/// the caller must ensure the node actually has sub-nodes!
static node_t* FindDirectSubNodeBinary(const node_t* const node,
const node_char_t c) {
node_char_t* followerKeys;
node_t** followerNodes;
uint32_t numFollowers;
uint32_t l, r;
#if TRI_FULLTEXT_DEBUG
TRI_ASSERT(node != nullptr);
#endif
numFollowers = NodeNumFollowers(node);
uint32_t numFollowers = NodeNumFollowers(node);
#if TRI_FULLTEXT_DEBUG
TRI_ASSERT(numFollowers >= 1);
#endif
followerKeys = NodeFollowersKeys(node);
followerNodes = NodeFollowersNodes(node);
node_char_t* followerKeys = NodeFollowersKeys(node);
node_t** followerNodes = NodeFollowersNodes(node);
l = 0;
uint32_t l = 0;
// this is safe (look at the function documentation)
r = numFollowers - 1;
uint32_t r = numFollowers - 1;
while (true) {
node_char_t followerKey;
@ -574,13 +569,11 @@ static node_t* FindDirectSubNodeBinary(const node_t* const node,
/// @brief find a node's sub-node, identified by its start character
static inline node_t* FindDirectSubNode(const node_t* const node,
const node_char_t c) {
uint32_t numFollowers;
#if TRI_FULLTEXT_DEBUG
TRI_ASSERT(node != nullptr);
#endif
numFollowers = NodeNumFollowers(node);
uint32_t numFollowers = NodeNumFollowers(node);
if (numFollowers >= 8) {
return FindDirectSubNodeBinary(node, c);
@ -596,17 +589,13 @@ static inline node_t* FindDirectSubNode(const node_t* const node,
/// @brief find a node by its key, starting from the index root
static node_t* FindNode(const index__t* idx, char const* const key,
size_t const keyLength) {
node_t* node;
node_char_t* p;
size_t i;
node = (node_t*)idx->_root;
node_t* node = idx->_root;
#if TRI_FULLTEXT_DEBUG
TRI_ASSERT(node != nullptr);
#endif
p = (node_char_t*)key;
node_char_t* p = (node_char_t*)key;
for (i = 0; i < keyLength; ++i) {
for (size_t i = 0; i < keyLength; ++i) {
node = FindDirectSubNode(node, *(p++));
if (node == nullptr) {
return nullptr;

View File

@ -742,7 +742,9 @@ int DatabaseFeature::dropDatabase(std::string const& name, bool waitForDeletion,
vocbase->setIsOwnAppsDirectory(removeAppsDirectory);
// invalidate all entries for the database
#if USE_PLAN_CACHE
arangodb::aql::PlanCache::instance()->invalidate(vocbase);
#endif
arangodb::aql::QueryCache::instance()->invalidate(vocbase);
engine->prepareDropDatabase(vocbase, !engine->inRecovery(), res);

View File

@ -456,8 +456,10 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
THROW_ARANGO_EXCEPTION(res);
}
#if USE_PLAN_CACHE
arangodb::aql::PlanCache::instance()->invalidate(
_logicalCollection->vocbase());
#endif
// Until here no harm is done if sth fails. The shared ptr will clean up. if
// left before
{

View File

@ -76,9 +76,12 @@ class JobQueueThread final
try {
job->_callback(std::move(job->_handler));
} catch (std::exception& e) {
} catch (std::exception const& e) {
LOG_TOPIC(WARN, Logger::THREADS)
<< "Exception caught in a dangereous place! " << e.what();
<< "caught exception while executing job callback: " << e.what();
} catch (...) {
LOG_TOPIC(WARN, Logger::THREADS)
<< "caught unknown exception while executing job callback";
}
this->_jobQueue->wakeup();

View File

@ -22,32 +22,33 @@
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#ifdef _WIN32
#include "Basics/win-utils.h"
#endif
#include "Scheduler.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/Thread.h"
#include "Basics/WorkMonitor.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Random/RandomGenerator.h"
#include "Rest/GeneralResponse.h"
#include "Scheduler/JobGuard.h"
#include "Scheduler/JobQueue.h"
#include "Scheduler/Task.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include <thread>
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::rest;
namespace {
constexpr double MIN_SECONDS = 30.0;
}
// -----------------------------------------------------------------------------
// --SECTION-- SchedulerManagerThread
// -----------------------------------------------------------------------------
@ -56,7 +57,7 @@ namespace {
class SchedulerManagerThread : public Thread {
public:
SchedulerManagerThread(Scheduler* scheduler, boost::asio::io_service* service)
: Thread("SchedulerManager"), _scheduler(scheduler), _service(service) {}
: Thread("SchedulerManager", true), _scheduler(scheduler), _service(service) {}
~SchedulerManagerThread() { shutdown(); }
@ -65,14 +66,11 @@ class SchedulerManagerThread : public Thread {
while (!_scheduler->isStopping()) {
try {
_service->run_one();
_scheduler->deleteOldThreads();
} catch (...) {
LOG_TOPIC(ERR, Logger::THREADS)
<< "manager loop caught an error, restarting";
}
}
_scheduler->threadDone(this);
}
private:
@ -89,73 +87,67 @@ namespace {
class SchedulerThread : public Thread {
public:
SchedulerThread(Scheduler* scheduler, boost::asio::io_service* service)
: Thread("Scheduler"), _scheduler(scheduler), _service(service) {}
: Thread("Scheduler", true), _scheduler(scheduler), _service(service) {}
~SchedulerThread() { shutdown(); }
public:
void run() {
try {
_scheduler->incRunning();
TRI_DEFER(_scheduler->threadDone(this));
constexpr size_t EVERY_LOOP = size_t(MIN_SECONDS);
// when we enter this method,
// _nrRunning has already been increased for this thread
LOG_TOPIC(DEBUG, Logger::THREADS) << "started thread ("
<< _scheduler->infoStatus() << ")";
// some random delay value to avoid all initial threads checking for
// their deletion at the very same time
double const randomWait =
static_cast<double>(RandomGenerator::interval(int64_t(0), static_cast<int64_t>(MIN_SECONDS * 0.5)));
LOG_TOPIC(DEBUG, Logger::THREADS) << "started thread ("
<< _scheduler->infoStatus() << ")";
auto start = std::chrono::steady_clock::now();
double start = TRI_microtime() + randomWait;
size_t counter = 0;
bool doDecrement = true;
while (!_scheduler->isStopping()) {
try {
static size_t EVERY_LOOP = 1000;
static double MIN_SECONDS = 30;
size_t counter = 0;
bool doDecrement = true;
while (!_scheduler->isStopping()) {
_service->run_one();
if (++counter > EVERY_LOOP) {
counter = 0;
auto now = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = now - start;
if (diff.count() > MIN_SECONDS) {
start = std::chrono::steady_clock::now();
if (_scheduler->shouldStopThread()) {
auto n = _scheduler->decRunning();
if (n <= _scheduler->minimum()) {
_scheduler->incRunning();
} else {
doDecrement = false;
break;
}
}
}
}
}
if (doDecrement) {
_scheduler->decRunning();
}
LOG_TOPIC(DEBUG, Logger::THREADS) << "stopped ("
<< _scheduler->infoStatus() << ")";
_service->run_one();
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::THREADS)
<< "restarting scheduler loop after caught exception: " << ex.what();
_scheduler->decRunning();
_scheduler->startNewThread();
<< "scheduler loop caught exception: " << ex.what();
} catch (...) {
LOG_TOPIC(ERR, Logger::THREADS)
<< "restarting scheduler loop after unknown exception";
_scheduler->decRunning();
_scheduler->startNewThread();
<< "scheduler loop caught unknown exception";
}
} catch (...) {
// better not throw from here, as this is a thread main loop
if (++counter > EVERY_LOOP) {
counter = 0;
double const now = TRI_microtime();
if (now - start > MIN_SECONDS) {
// test if we should stop this thread
// if this returns true, nrRunning will have been
// decremented by one already
if (_scheduler->stopThreadIfTooMany(now)) {
// nrRunning was decremented already. now exit thread
// main loop
doDecrement = false;
break;
}
// use new start time
start = now;
}
}
}
LOG_TOPIC(DEBUG, Logger::THREADS) << "stopped ("
<< _scheduler->infoStatus() << ")";
if (doDecrement) {
// only decrement here if this wasn't already done above
_scheduler->stopThread();
}
}
@ -169,49 +161,31 @@ class SchedulerThread : public Thread {
// --SECTION-- Scheduler
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
Scheduler::Scheduler(uint64_t nrMinimum, uint64_t /*nrDesired*/,
uint64_t nrMaximum, uint64_t maxQueueSize)
: _stopping(false),
_maxQueueSize(maxQueueSize),
: _maxQueueSize(maxQueueSize),
_nrMinimum(nrMinimum),
_nrMaximum(nrMaximum),
_nrWorking(0),
_counters(0),
_nrQueued(0),
_nrBlocked(0),
_nrRunning(0) {
_lastAllBusyStamp(0.0) {
// setup signal handlers
initializeSignalHandlers();
}
Scheduler::~Scheduler() {
stopRebalancer();
try {
deleteOldThreads();
} catch (...) {
// probably out of memory here...
// must not throw in the dtor
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "unable to delete old scheduler threads";
}
}
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
void Scheduler::post(std::function<void()> callback) {
++_nrQueued;
_ioService.get()->post([this, callback]() {
--_nrQueued;
JobGuard guard(this);
guard.work();
--_nrQueued;
callback();
});
}
@ -224,7 +198,15 @@ bool Scheduler::start(ConditionVariable* cv) {
TRI_ASSERT(_nrMinimum <= _nrMaximum);
for (uint64_t i = 0; i < _nrMinimum; ++i) {
startNewThread();
MUTEX_LOCKER(locker, _threadCreateLock);
incRunning();
try {
startNewThread();
} catch (...) {
decRunning();
throw;
}
}
startManagerThread();
@ -278,49 +260,68 @@ void Scheduler::stopRebalancer() noexcept {
}
void Scheduler::startManagerThread() {
MUTEX_LOCKER(guard, _threadsLock);
auto thread = new SchedulerManagerThread(this, _managerService.get());
try {
_threads.emplace(thread);
} catch (...) {
delete thread;
throw;
}
thread->start();
}
void Scheduler::startNewThread() {
MUTEX_LOCKER(guard, _threadsLock);
auto thread = new SchedulerThread(this, _ioService.get());
try {
_threads.emplace(thread);
} catch (...) {
delete thread;
throw;
}
thread->start();
}
void Scheduler::stopThread() {
MUTEX_LOCKER(locker, _threadCreateLock);
decRunning();
}
bool Scheduler::shouldStopThread() const {
if (_nrRunning <= _nrWorking + _nrQueued + _nrMinimum) {
// check if the current thread should be stopped
// returns true if yes, otherwise false. when the function returns
// true, it has already decremented the nrRunning counter!
bool Scheduler::stopThreadIfTooMany(double now) {
// make sure no extra threads are created while we check the timestamp
// and while we modify nrRunning
MUTEX_LOCKER(locker, _threadCreateLock);
// fetch all counters in one atomic operation
uint64_t counters = _counters.load();
uint64_t const nrRunning = numRunning(counters);
uint64_t const nrBlocked = numBlocked(counters);
uint64_t const nrWorking = numWorking(counters);
uint64_t const nrQueued = _nrQueued;
if (nrRunning <= _nrMinimum + nrBlocked) {
// don't stop a thread if we already reached the minimum
// number of threads
_lastAllBusyStamp = now;
return false;
}
if (_nrMinimum + _nrBlocked < _nrRunning) {
return true;
if (nrRunning <= nrWorking + nrQueued) {
return false;
}
if (_lastAllBusyStamp + 1.25 * MIN_SECONDS >= now) {
// last time all threads were busy is less than x seconds ago
return false;
}
// set the all busy stamp. this avoids that we shut down all threads
// at the same time
if (_lastAllBusyStamp < MIN_SECONDS / 2.0) {
_lastAllBusyStamp = now - MIN_SECONDS / 2.0;
}
return false;
// decrement nrRunning by one already in here while holding the lock
decRunning();
return true;
}
bool Scheduler::shouldQueueMore() const {
if (_nrWorking + _nrQueued + _nrMinimum < _nrMaximum) {
uint64_t const counters = _counters.load();
uint64_t const nrWorking = numWorking(counters);
if (nrWorking + _nrQueued < _nrMaximum) {
return true;
}
@ -328,7 +329,7 @@ bool Scheduler::shouldQueueMore() const {
}
bool Scheduler::hasQueueCapacity() const {
if (_nrWorking + _nrQueued + _nrMinimum >= _nrMaximum) {
if (!shouldQueueMore()) {
return false;
}
@ -350,67 +351,73 @@ std::string Scheduler::infoStatus() {
auto jobQueue = _jobQueue.get();
auto queueSize = (jobQueue == nullptr) ? 0 : jobQueue->queueSize();
return "working: " + std::to_string(_nrWorking) + ", queued: " +
uint64_t const counters = _counters.load();
return "working: " + std::to_string(numWorking(counters)) + ", queued: " +
std::to_string(_nrQueued) + ", blocked: " +
std::to_string(_nrBlocked) + ", running: " +
std::to_string(_nrRunning) + ", outstanding: " +
std::to_string(numBlocked(counters)) + ", running: " +
std::to_string(numRunning(counters)) + ", outstanding: " +
std::to_string(queueSize) + ", min/max: " +
std::to_string(_nrMinimum) + "/" + std::to_string(_nrMaximum);
}
void Scheduler::threadDone(Thread* thread) {
MUTEX_LOCKER(guard, _threadsLock);
_threads.erase(thread);
_deadThreads.insert(thread);
}
void Scheduler::deleteOldThreads() {
// delete old thread objects
std::unordered_set<Thread*> deadThreads;
{
MUTEX_LOCKER(guard, _threadsLock);
if (_deadThreads.empty()) {
return;
}
deadThreads.swap(_deadThreads);
}
for (auto thread : deadThreads) {
try {
delete thread;
} catch (...) {
LOG_TOPIC(ERR, Logger::THREADS) << "cannot delete thread";
}
}
}
void Scheduler::rebalanceThreads() {
static uint64_t count = 0;
++count;
if ((count % 5) == 0) {
if (count % 50 == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "rebalancing threads: " << infoStatus();
} else {
} else if (count % 5 == 0) {
LOG_TOPIC(TRACE, Logger::THREADS) << "rebalancing threads: " << infoStatus();
}
while (_nrRunning < _nrWorking + _nrQueued + _nrMinimum) {
if (_stopping) {
// do not start any new threads in case we are already shutting down
break;
while (true) {
{
double const now = TRI_microtime();
MUTEX_LOCKER(locker, _threadCreateLock);
uint64_t const counters = _counters.load();
uint64_t const nrRunning = numRunning(counters);
uint64_t const nrWorking = numWorking(counters);
uint64_t const nrQueued = _nrQueued;
if (nrRunning >= std::max(_nrMinimum, nrWorking + nrQueued)) {
if (nrWorking == nrRunning) {
// all threads maxed out
_lastAllBusyStamp = now;
}
break;
}
if (isStopping(counters)) {
// do not start any new threads in case we are already shutting down
break;
}
// all threads maxed out
_lastAllBusyStamp = now;
// increase nrRunning by one here already, while holding the lock
incRunning();
}
startNewThread();
// create thread and sleep without holding the mutex
try {
// actually start the new thread
startNewThread();
} catch (...) {
// if it fails, we have to rollback the increase of nrRunning again
MUTEX_LOCKER(locker, _threadCreateLock);
decRunning();
throw;
}
usleep(5000);
}
}
void Scheduler::beginShutdown() {
if (_stopping) {
if (isStopping()) {
return;
}
@ -426,22 +433,20 @@ void Scheduler::beginShutdown() {
_ioService->stop();
// set the flag AFTER stopping the threads
_stopping = true;
setStopping();
}
void Scheduler::shutdown() {
bool done = false;
while (!done) {
{
MUTEX_LOCKER(guard, _threadsLock);
done = _threads.empty();
while (true) {
uint64_t const counters = _counters.load();
if (numRunning(counters) == 0 && numWorking(counters) == 0) {
break;
}
std::this_thread::yield();
}
deleteOldThreads();
// remove all queued work descriptions in the work monitor first
// before freeing the io service a few lines later
// this is required because the work descriptions may have captured

View File

@ -30,10 +30,8 @@
#include <boost/asio/steady_timer.hpp>
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/asio-helper.h"
#include "Basics/socket-utils.h"
#include "Logger/Logger.h"
#include "Scheduler/EventLoop.h"
#include "Scheduler/Job.h"
@ -76,17 +74,22 @@ class Scheduler {
void post(std::function<void()> callback);
bool start(basics::ConditionVariable*);
bool isRunning() { return _nrRunning.load() > 0; }
bool isRunning() const { return numRunning(_counters) > 0; }
void beginShutdown();
bool isStopping() { return _stopping; }
void stopRebalancer() noexcept;
bool isStopping() { return (_counters & (1ULL << 63)) != 0; }
void shutdown();
private:
static void initializeSignalHandlers();
public:
bool shouldStopThread() const;
// decrements the nrRunning counter for the thread
void stopThread();
// check if the current thread should be stopped
// returns true if yes, otherwise false. when the function returns
// true, it has already decremented the nrRunning counter!
bool stopThreadIfTooMany(double now);
bool shouldQueueMore() const;
bool hasQueueCapacity() const;
@ -94,23 +97,28 @@ class Scheduler {
uint64_t minimum() const { return _nrMinimum; }
uint64_t incRunning() { return ++_nrRunning; }
uint64_t decRunning() { return --_nrRunning; }
std::string infoStatus();
private:
void startNewThread();
void threadDone(Thread*);
void deleteOldThreads();
void stopRebalancer() noexcept;
static void initializeSignalHandlers();
private:
void workThread() { ++_nrWorking; }
void unworkThread() { --_nrWorking; }
void setStopping() { _counters |= (1ULL << 63); }
inline void incRunning() { _counters += 1ULL << 0; }
inline void decRunning() { _counters -= 1ULL << 0; }
void blockThread() { ++_nrBlocked; }
void unblockThread() { --_nrBlocked; }
inline void workThread() { _counters += 1ULL << 16; }
inline void unworkThread() { _counters -= 1ULL << 16; }
inline void blockThread() { _counters += 1ULL << 32; }
inline void unblockThread() { _counters -= 1ULL << 32; }
inline uint64_t numRunning(uint64_t value) const { return value & 0xFFFFULL; }
inline uint64_t numWorking(uint64_t value) const { return (value >> 16) & 0xFFFFULL; }
inline uint64_t numBlocked(uint64_t value) const { return (value >> 32) & 0xFFFFULL; }
inline bool isStopping(uint64_t value) { return (value & (1ULL << 63)) != 0; }
void startIoService();
void startRebalancer();
@ -118,8 +126,6 @@ class Scheduler {
void rebalanceThreads();
private:
std::atomic<bool> _stopping;
// maximal number of outstanding user requests
uint64_t const _maxQueueSize;
@ -129,19 +135,15 @@ class Scheduler {
// maximal number of outstanding user requests
uint64_t const _nrMaximum;
// number of jobs currently been worked on
// use signed values just in case we have an underflow
std::atomic<uint64_t> _nrWorking;
// current counters
// - the lowest 16 bits contain the number of running threads
// - the next 16 bits contain the number of working threads
// - the next 16 bits contain the number of blocked threads
std::atomic<uint64_t> _counters;
// number of jobs that are currently been queued, but not worked on
std::atomic<uint64_t> _nrQueued;
// number of jobs that entered a potentially blocking situation
std::atomic<uint64_t> _nrBlocked;
// number of SchedulerThread that are running
std::atomic<uint64_t> _nrRunning;
std::unique_ptr<JobQueue> _jobQueue;
boost::shared_ptr<boost::asio::io_service::work> _serviceGuard;
@ -153,9 +155,8 @@ class Scheduler {
std::unique_ptr<boost::asio::steady_timer> _threadManager;
std::function<void(const boost::system::error_code&)> _threadHandler;
Mutex _threadsLock;
std::unordered_set<Thread*> _threads;
std::unordered_set<Thread*> _deadThreads;
mutable Mutex _threadCreateLock;
double _lastAllBusyStamp;
};
}
}

View File

@ -31,6 +31,7 @@
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ArangoGlobalContext.h"
#include "Basics/WorkMonitor.h"
#include "Logger/Logger.h"
#include "Logger/LogAppender.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
@ -54,6 +55,7 @@ SchedulerFeature::SchedulerFeature(
requiresElevatedPrivileges(false);
startsAfter("FileDescriptors");
startsAfter("Logger");
startsAfter("Random");
startsAfter("WorkMonitor");
}

View File

@ -35,6 +35,7 @@
#include "Basics/TimedAction.h"
#include "Basics/WorkMonitor.h"
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "Random/RandomGenerator.h"

View File

@ -1026,7 +1026,9 @@ std::shared_ptr<Index> LogicalCollection::createIndex(transaction::Methods* trx,
/// @brief drops an index, including index file removal and replication
bool LogicalCollection::dropIndex(TRI_idx_iid_t iid) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
#if USE_PLAN_CACHE
arangodb::aql::PlanCache::instance()->invalidate(_vocbase);
#endif
arangodb::aql::QueryCache::instance()->invalidate(_vocbase, name());
return _physical->dropIndex(iid);
}

View File

@ -585,7 +585,9 @@ int TRI_vocbase_t::dropCollectionWorker(arangodb::LogicalCollection* collection,
TRI_ASSERT(writeLocker.isLocked());
TRI_ASSERT(locker.isLocked());
#if USE_PLAN_CACHE
arangodb::aql::PlanCache::instance()->invalidate(this);
#endif
arangodb::aql::QueryCache::instance()->invalidate(this);
switch (collection->status()) {

View File

@ -330,6 +330,18 @@ function ahuacatlStringFunctionsTestSuite () {
});
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test regex function, cache
////////////////////////////////////////////////////////////////////////////////
testRegexCache : function () {
var actual = getQueryResults("FOR i IN 1..100 RETURN REGEX_TEST(CONCAT('test', i), 'test')");
assertEqual(100, actual.length);
for (var i = 0; i < actual.length; ++i) {
assertTrue(actual[i]);
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test REGEX_REPLACE, invalid arguments
////////////////////////////////////////////////////////////////////////////////
@ -524,6 +536,18 @@ function ahuacatlStringFunctionsTestSuite () {
});
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test like function, cache
////////////////////////////////////////////////////////////////////////////////
testLikeCache : function () {
var actual = getQueryResults("FOR i IN 1..100 RETURN LIKE(CONCAT('test', i), 'test%')");
assertEqual(100, actual.length);
for (var i = 0; i < actual.length; ++i) {
assertTrue(actual[i]);
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test like function, invalid arguments
////////////////////////////////////////////////////////////////////////////////

View File

@ -79,27 +79,37 @@ void Thread::startThread(void* arg) {
ptr->_threadNumber = LOCAL_THREAD_NUMBER;
if (0 <= ptr->_affinity) {
TRI_SetProcessorAffinity(&ptr->_thread, ptr->_affinity);
}
bool pushed = WorkMonitor::pushThread(ptr);
try {
ptr->runMe();
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::THREADS) << "caught exception in thread '" << ptr->_name
<< "': " << ex.what();
LOG_TOPIC(WARN, Logger::THREADS)
<< "caught exception in thread '" << ptr->_name << "': " << ex.what();
if (pushed) {
WorkMonitor::popThread(ptr);
}
ptr->cleanupMe();
throw;
} catch (...) {
if (pushed) {
WorkMonitor::popThread(ptr);
}
ptr->cleanupMe();
throw;
}
if (pushed) {
WorkMonitor::popThread(ptr);
}
ptr->cleanupMe();
}
////////////////////////////////////////////////////////////////////////////////
@ -156,8 +166,9 @@ std::string Thread::stringify(ThreadState state) {
/// @brief constructs a thread
////////////////////////////////////////////////////////////////////////////////
Thread::Thread(std::string const& name)
: _name(name),
Thread::Thread(std::string const& name, bool deleteOnExit)
: _deleteOnExit(deleteOnExit),
_name(name),
_thread(),
_threadNumber(0),
_threadId(),
@ -166,8 +177,8 @@ Thread::Thread(std::string const& name)
_affinity(-1),
_workDescription(nullptr) {
TRI_InitThread(&_thread);
// allow failing memory allocations for all threads by default
// allow failing memory allocations for all threads by default
TRI_AllowMemoryFailures();
}
@ -177,14 +188,16 @@ Thread::Thread(std::string const& name)
Thread::~Thread() {
auto state = _state.load();
LOG_TOPIC(TRACE, Logger::THREADS) << "delete(" << _name
<< "), state: " << stringify(state);
LOG_TOPIC(TRACE, Logger::THREADS)
<< "delete(" << _name << "), state: " << stringify(state);
if (state == ThreadState::STOPPED) {
int res = TRI_JoinThread(&_thread);
if (res != 0) {
LOG_TOPIC(INFO, Logger::THREADS) << "cannot detach thread";
if (TRI_IsSelfThread(&_thread)) {
// we must ignore any errors here, but TRI_DetachThread will log them
TRI_DetachThread(&_thread);
} else {
// we must ignore any errors here, but TRI_JoinThread will log them
TRI_JoinThread(&_thread);
}
_state.store(ThreadState::DETACHED);
@ -194,8 +207,9 @@ Thread::~Thread() {
state = _state.load();
if (state != ThreadState::DETACHED && state != ThreadState::CREATED) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "thread '" << _name << "' is not detached but " << stringify(state)
<< ". shutting down hard";
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "thread '" << _name << "' is not detached but " << stringify(state)
<< ". shutting down hard";
FATAL_ERROR_ABORT();
}
}
@ -219,9 +233,9 @@ void Thread::beginShutdown() {
_state.compare_exchange_strong(state, ThreadState::STOPPING);
}
LOG_TOPIC(TRACE, Logger::THREADS) << "beginShutdown(" << _name
<< ") reached state "
<< stringify(_state.load());
LOG_TOPIC(TRACE, Logger::THREADS)
<< "beginShutdown(" << _name << ") reached state "
<< stringify(_state.load());
}
////////////////////////////////////////////////////////////////////////////////
@ -244,12 +258,11 @@ void Thread::shutdown() {
if (_state.load() == ThreadState::STARTED) {
beginShutdown();
if (!isSilent() &&
_state.load() != ThreadState::STOPPING &&
if (!isSilent() && _state.load() != ThreadState::STOPPING &&
_state.load() != ThreadState::STOPPED) {
LOG_TOPIC(WARN, Logger::THREADS) << "forcefully shutting down thread '"
<< _name << "' in state "
<< stringify(_state.load());
LOG_TOPIC(WARN, Logger::THREADS)
<< "forcefully shutting down thread '" << _name << "' in state "
<< stringify(_state.load());
}
}
@ -264,7 +277,8 @@ void Thread::shutdown() {
}
if (_state.load() != ThreadState::STOPPED) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "cannot shutdown thread, giving up";
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "cannot shutdown thread, giving up";
FATAL_ERROR_ABORT();
}
}
@ -286,11 +300,12 @@ bool Thread::isStopping() const {
bool Thread::start(ConditionVariable* finishedCondition) {
if (!isSystem() && !ApplicationServer::isPrepared()) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "trying to start a thread '" << _name
<< "' before prepare has finished, current state: "
<< (ApplicationServer::server == nullptr
? -1
: (int)ApplicationServer::server->state());
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "trying to start a thread '" << _name
<< "' before prepare has finished, current state: "
<< (ApplicationServer::server == nullptr
? -1
: (int)ApplicationServer::server->state());
FATAL_ERROR_ABORT();
}
@ -317,16 +332,14 @@ bool Thread::start(ConditionVariable* finishedCondition) {
bool ok =
TRI_StartThread(&_thread, &_threadId, _name.c_str(), &startThread, this);
if (ok) {
if (0 <= _affinity) {
TRI_SetProcessorAffinity(&_thread, _affinity);
}
} else {
if (!ok) {
// could not start the thread
_state.store(ThreadState::STOPPED);
LOG_TOPIC(ERR, Logger::THREADS) << "could not start thread '" << _name
<< "': " << TRI_last_error();
LOG_TOPIC(ERR, Logger::THREADS)
<< "could not start thread '" << _name << "': " << TRI_last_error();
return false;
// must cleanup to prevent memleaks
cleanupMe();
}
return ok;
@ -384,34 +397,40 @@ void Thread::addStatus(VPackBuilder* b) {
}
}
void Thread::runMe() {
try {
run();
_state.store(ThreadState::STOPPED);
} catch (Exception const& ex) {
LOG_TOPIC(ERR, Logger::THREADS) << "exception caught in thread '" << _name
<< "': " << ex.what();
Logger::flush();
_state.store(ThreadState::STOPPED);
throw;
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::THREADS) << "exception caught in thread '" << _name
<< "': " << ex.what();
Logger::flush();
_state.store(ThreadState::STOPPED);
throw;
} catch (...) {
if (!isSilent()) {
LOG_TOPIC(ERR, Logger::THREADS) << "exception caught in thread '" << _name
<< "'";
Logger::flush();
}
_state.store(ThreadState::STOPPED);
throw;
}
void Thread::markAsStopped() {
_state.store(ThreadState::STOPPED);
if (_finishedCondition != nullptr) {
CONDITION_LOCKER(locker, *_finishedCondition);
locker.broadcast();
}
}
void Thread::runMe() {
// make sure the thread is marked as stopped under all circumstances
TRI_DEFER(markAsStopped());
try {
run();
} catch (std::exception const& ex) {
if (!isSilent()) {
LOG_TOPIC(ERR, Logger::THREADS)
<< "exception caught in thread '" << _name << "': " << ex.what();
Logger::flush();
}
throw;
} catch (...) {
if (!isSilent()) {
LOG_TOPIC(ERR, Logger::THREADS)
<< "unknown exception caught in thread '" << _name << "'";
Logger::flush();
}
throw;
}
}
void Thread::cleanupMe() {
if (_deleteOnExit) {
delete this;
}
}

View File

@ -138,16 +138,7 @@ class Thread {
}
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief constructs a thread
//////////////////////////////////////////////////////////////////////////////
explicit Thread(std::string const& name);
//////////////////////////////////////////////////////////////////////////////
/// @brief deletes the thread
//////////////////////////////////////////////////////////////////////////////
Thread(std::string const& name, bool deleteOnExit = false);
virtual ~Thread();
public:
@ -265,59 +256,28 @@ class Thread {
static void startThread(void* arg);
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief runner
//////////////////////////////////////////////////////////////////////////////
void markAsStopped();
void runMe();
void cleanupMe();
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief name of the thread
//////////////////////////////////////////////////////////////////////////////
bool const _deleteOnExit;
// name of the thread
std::string const _name;
//////////////////////////////////////////////////////////////////////////////
/// @brief thread pointer
//////////////////////////////////////////////////////////////////////////////
// internal thread information
thread_t _thread;
//////////////////////////////////////////////////////////////////////////////
/// @brief the thread number
//////////////////////////////////////////////////////////////////////////////
uint64_t _threadNumber;
//////////////////////////////////////////////////////////////////////////////
/// @brief thread identifier
//////////////////////////////////////////////////////////////////////////////
TRI_tid_t _threadId;
//////////////////////////////////////////////////////////////////////////////
/// @brief condition variable for done
//////////////////////////////////////////////////////////////////////////////
basics::ConditionVariable* _finishedCondition;
//////////////////////////////////////////////////////////////////////////////
/// @brief thread has started
//////////////////////////////////////////////////////////////////////////////
std::atomic<ThreadState> _state;
//////////////////////////////////////////////////////////////////////////////
/// @brief processor affinity
//////////////////////////////////////////////////////////////////////////////
// processor affinity
int _affinity;
//////////////////////////////////////////////////////////////////////////////
/// @brief current work description
//////////////////////////////////////////////////////////////////////////////
std::atomic<WorkDescription*> _workDescription;
};
}

View File

@ -128,7 +128,26 @@ bool TRI_StartThread(TRI_thread_t* thread, TRI_tid_t* threadId,
////////////////////////////////////////////////////////////////////////////////
int TRI_JoinThread(TRI_thread_t* thread) {
return pthread_join(*thread, nullptr);
TRI_ASSERT(!TRI_IsSelfThread(thread));
int res = pthread_join(*thread, nullptr);
if (res != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(WARN, arangodb::Logger::THREADS) << "cannot join thread: " << strerror(res);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief detaches a thread
////////////////////////////////////////////////////////////////////////////////
int TRI_DetachThread(TRI_thread_t* thread) {
int res = pthread_detach(*thread);
if (res != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(WARN, arangodb::Logger::THREADS) << "cannot detach thread: " << strerror(res);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -24,7 +24,6 @@
#include "threads.h"
#include "Logger/Logger.h"
#include "Basics/tri-strings.h"
////////////////////////////////////////////////////////////////////////////////
/// @brief data block for thread starter
@ -77,7 +76,7 @@ bool TRI_StartThread(TRI_thread_t* thread, TRI_tid_t* threadId,
try {
d.reset(new thread_data_t(starter, data, name));
} catch (...) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not start thread: out of memory";
LOG_TOPIC(ERR, arangodb::Logger::THREADS) << "could not start thread: out of memory";
return false;
}
@ -90,7 +89,7 @@ bool TRI_StartThread(TRI_thread_t* thread, TRI_tid_t* threadId,
threadId); // returns the thread identifier
if (*thread == 0) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not start thread: " << strerror(errno) << " ";
LOG_TOPIC(ERR, arangodb::Logger::THREADS) << "could not start thread: " << strerror(errno) << " ";
return false;
}
@ -106,7 +105,8 @@ int TRI_JoinThread(TRI_thread_t* thread) {
switch (result) {
case WAIT_ABANDONED: {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "threads-win32.c:TRI_JoinThread:could not join thread --> WAIT_ABANDONED"; FATAL_ERROR_EXIT();
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not join thread --> WAIT_ABANDONED";
break;
}
case WAIT_OBJECT_0: {
@ -115,18 +115,38 @@ int TRI_JoinThread(TRI_thread_t* thread) {
}
case WAIT_TIMEOUT: {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "threads-win32.c:TRI_JoinThread:could not joint thread --> WAIT_TIMEOUT"; FATAL_ERROR_EXIT();
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not join thread --> WAIT_TIMEOUT";
break;
}
case WAIT_FAILED: {
result = GetLastError();
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "threads-win32.c:TRI_JoinThread:could not join thread --> WAIT_FAILED - reason -->" << result; FATAL_ERROR_EXIT();
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not join thread --> WAIT_FAILED - reason -->" << result;
break;
}
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief detaches a thread
////////////////////////////////////////////////////////////////////////////////
int TRI_DetachThread(TRI_thread_t* thread) {
// If the function succeeds, the return value is nonzero.
// If the function fails, the return value is zero. To get extended error information, call GetLastError.
BOOL res = CloseHandle(thread);
if (res == 0) {
DWORD result = GetLastError();
LOG_TOPIC(ERR, arangodb::Logger::THREADS) << "cannot detach thread: " << result;
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks if this thread is the thread passed as a parameter
////////////////////////////////////////////////////////////////////////////////

View File

@ -70,5 +70,6 @@ void TRI_SetProcessorAffinity(TRI_thread_t*, size_t core);
// SHOULD BE REMOVED
void TRI_InitThread(TRI_thread_t* thread);
int TRI_JoinThread(TRI_thread_t* thread);
int TRI_DetachThread(TRI_thread_t* thread);
#endif

View File

@ -45,8 +45,11 @@ LogThread::~LogThread() {
}
void LogThread::log(std::unique_ptr<LogMessage>& message) {
MESSAGES->push(message.get());
message.release();
if (MESSAGES->push(message.get())) {
// only release message if adding to the queue succeeded
// otherwise we would leak here
message.release();
}
}
void LogThread::flush() {
@ -85,7 +88,7 @@ void LogThread::run() {
}
CONDITION_LOCKER(guard, *CONDITION);
guard.wait(10 * 1000);
guard.wait(25 * 1000);
}
while (_messages.pop(msg)) {