1
0
Fork 0

issue 435.1: allow a user to specify the number of IResearchFeature threads via command line (#6164)

This commit is contained in:
Vasiliy 2018-08-16 14:14:15 +03:00 committed by Jan
parent 63efe2a634
commit 364360ae79
6 changed files with 240 additions and 25 deletions

View File

@ -425,7 +425,8 @@ NS_BEGIN(iresearch)
Field::Field(Field&& rhs)
: _features(rhs._features),
_analyzer(std::move(rhs._analyzer)),
_name(std::move(rhs._name)) {
_name(std::move(rhs._name)),
_storeValues(std::move(rhs._storeValues)) {
rhs._features = nullptr;
}
@ -434,6 +435,7 @@ Field& Field::operator=(Field&& rhs) {
_features = rhs._features;
_analyzer = std::move(rhs._analyzer);
_name = std::move(rhs._name);
_storeValues = std::move(rhs._storeValues);
rhs._features = nullptr;
}

View File

@ -144,6 +144,20 @@ typedef arangodb::aql::AqlValue (*IResearchFunctionPtr)(
arangodb::SmallVector<arangodb::aql::AqlValue> const&
);
size_t computeThreadPoolSize(size_t threads, size_t threadsLimit) {
static const size_t MAX_THREADS = 8; // arbitrary limit on the upper bound of threads in pool
static const size_t MIN_THREADS = 1; // at least one thread is required
auto maxThreads = threadsLimit ? threadsLimit : MAX_THREADS;
return threads
? threads
: std::max(
MIN_THREADS,
std::min(maxThreads, size_t(std::thread::hardware_concurrency()) / 4)
)
;
}
void registerFunctions(arangodb::aql::AqlFunctionFeature& functions) {
arangodb::iresearch::addFunction(functions, {
"__ARANGOSEARCH_SCORE_DEBUG", // name
@ -382,11 +396,13 @@ class IResearchFeature::Async {
public:
typedef std::function<bool(size_t& timeoutMsec, bool timeout)> Fn;
Async();
explicit Async(size_t poolSize = 0);
Async(size_t poolSize, Async&& other);
~Async();
void emplace(std::shared_ptr<ResourceMutex> const& mutex, Fn &&fn); // add an asynchronous task
void notify() const; // notify all tasks
size_t poolSize() { return _pool.size(); }
void start();
private:
@ -431,6 +447,8 @@ class IResearchFeature::Async {
arangodb::basics::ConditionVariable _join; // mutex to join on
std::vector<Thread> _pool; // thread pool (size fixed for the entire life of object)
std::atomic<bool> _terminate; // unconditionaly terminate async tasks
void stop(Thread* redelegate = nullptr);
};
void IResearchFeature::Async::Thread::run() {
@ -446,7 +464,7 @@ void IResearchFeature::Async::Thread::run() {
SCOPED_LOCK_NAMED(_mutex, lock); // aquire before '_terminate' check so that don't miss notify()
if (_terminate->load()) {
return; // termination requested
break; // termination requested
}
// transfer any new pending tasks into active tasks
@ -492,7 +510,7 @@ void IResearchFeature::Async::Thread::run() {
_wasNotified = false; // ignore notification since woke up
if (_terminate->load()) { // check again after sleep
return; // termination requested
break; // termination requested
}
}
@ -557,15 +575,25 @@ void IResearchFeature::Async::Thread::run() {
++i;
}
}
// ...........................................................................
// move all tasks back into _pending in case the may neeed to be reasigned
// ...........................................................................
SCOPED_LOCK_NAMED(_mutex, lock); // '_pending' may be modified asynchronously
for (auto& task: pendingRedelegate) {
_pending.emplace_back(std::move(task));
}
for (auto& task: _tasks) {
_pending.emplace_back(std::move(task));
}
_tasks.clear();
}
IResearchFeature::Async::Async(): _terminate(false) {
static const unsigned int MAX_THREADS = 8; // arbitrary limit on the upper bound of threads in pool
static const unsigned int MIN_THREADS = 1; // at least one thread is required
auto const poolSize = std::max(
MIN_THREADS,
std::min(MAX_THREADS, std::thread::hardware_concurrency() / 4) // arbitrary fraction of available cores
);
IResearchFeature::Async::Async(size_t poolSize): _terminate(false) {
poolSize = std::max(size_t(1), poolSize); // need at least one thread
for (size_t i = 0; i < poolSize; ++i) {
_pool.emplace_back(std::string("ArangoSearch #") + std::to_string(i));
@ -579,23 +607,15 @@ IResearchFeature::Async::Async(): _terminate(false) {
last = &thread;
thread._terminate = &_terminate;
}
}
IResearchFeature::Async::Async(size_t poolSize, Async&& other)
: Async(poolSize) {
other.stop(&_pool[0]);
}
IResearchFeature::Async::~Async() {
_terminate.store(true); // request stop asynchronous tasks
notify(); // notify all threads
CONDITION_LOCKER(lock, _join);
// join with all threads in pool
for (auto& thread: _pool) {
if (thread.hasStarted()) {
while(thread.isRunning()) {
_join.wait();
}
}
}
stop();
}
void IResearchFeature::Async::emplace(
@ -632,6 +652,35 @@ void IResearchFeature::Async::start() {
<< "started " << _pool.size() << " ArangoSearch maintenance thread(s)";
}
void IResearchFeature::Async::stop(Thread* redelegate /*= nullptr*/) {
_terminate.store(true); // request stop asynchronous tasks
notify(); // notify all threads
CONDITION_LOCKER(lock, _join);
// join with all threads in pool
for (auto& thread: _pool) {
if (thread.hasStarted()) {
while(thread.isRunning()) {
_join.wait();
}
}
// redelegate all thread tasks if requested
if (redelegate) {
SCOPED_LOCK(redelegate->_mutex);
for (auto& task: thread._pending) {
redelegate->_pending.emplace_back(std::move(task));
++redelegate->_size;
}
thread._pending.clear();
redelegate->_cond.notify_all(); // notify thread about a new task (thread may be sleeping indefinitely)
}
}
}
IResearchFeature::IResearchFeature(
arangodb::application_features::ApplicationServer& server
)
@ -642,7 +691,6 @@ IResearchFeature::IResearchFeature(
_threadsLimit(0) {
setOptional(true);
startsAfter("V8Phase");
startsAfter("IResearchAnalyzer"); // used for retrieving IResearch analyzers for functions
startsAfter("AQLFunctions");
}
@ -716,6 +764,12 @@ void IResearchFeature::prepare() {
// start the async task thread pool
if (!ServerState::instance()->isCoordinator() &&
!ServerState::instance()->isAgent()) {
auto poolSize = computeThreadPoolSize(_threads, _threadsLimit);
if (_async->poolSize() != poolSize) {
_async = std::make_unique<Async>(poolSize, std::move(*_async));
}
_async->start();
}
}

View File

@ -58,6 +58,7 @@ struct IResearchFeatureSetup {
~IResearchFeatureSetup() {
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(), arangodb::LogLevel::DEFAULT);
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
}
};
@ -75,6 +76,15 @@ TEST_CASE("IResearchFeatureTest", "[iresearch][iresearch-feature]") {
UNUSED(s);
SECTION("test_start") {
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
auto* functions = new arangodb::aql::AqlFunctionFeature(server);
arangodb::iresearch::IResearchFeature iresearch(server);
@ -130,6 +140,15 @@ SECTION("IResearch_version") {
SECTION("test_async") {
// schedule task (null resource mutex)
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -150,6 +169,15 @@ SECTION("test_async") {
// schedule task (null resource mutex value)
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -171,6 +199,15 @@ SECTION("test_async") {
// schedule task (null functr)
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -187,6 +224,15 @@ SECTION("test_async") {
// schedule task (wait indefinite)
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -210,6 +256,15 @@ SECTION("test_async") {
// single-run task
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -231,6 +286,15 @@ SECTION("test_async") {
// multi-run task
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -265,6 +329,15 @@ SECTION("test_async") {
// trigger task by notify
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -297,6 +370,15 @@ SECTION("test_async") {
// trigger by timeout
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -332,6 +414,15 @@ SECTION("test_async") {
// deallocate empty
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
{
@ -343,6 +434,15 @@ SECTION("test_async") {
// deallocate with running tasks
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
auto resourceMutex = std::make_shared<arangodb::iresearch::ResourceMutex>(&server);
bool deallocated = false;
@ -365,6 +465,15 @@ SECTION("test_async") {
// multiple tasks with same resourceMutex + resourceMutex reset (sequential creation)
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
@ -409,6 +518,53 @@ SECTION("test_async") {
CHECK((true == deallocated1));
thread.join();
}
// schedule task (resize pool)
{
// create a new instance of an ApplicationServer and fill it with the required features
// cannot use the existing server since its features already have some state
std::shared_ptr<arangodb::application_features::ApplicationServer> originalServer(
arangodb::application_features::ApplicationServer::server,
[](arangodb::application_features::ApplicationServer* ptr)->void {
arangodb::application_features::ApplicationServer::server = ptr;
}
);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::iresearch::IResearchFeature feature(server);
server.addFeature(new arangodb::ViewTypesFeature(server)); // required for IResearchFeature::prepare()
arangodb::options::ProgramOptions options("", "", "", nullptr);
auto optionsPtr = std::shared_ptr<arangodb::options::ProgramOptions>(&options, [](arangodb::options::ProgramOptions*)->void {});
feature.collectOptions(optionsPtr);
options.get<arangodb::options::UInt64Parameter>("arangosearch.threads")->set("8");
auto resourceMutex = std::make_shared<arangodb::iresearch::ResourceMutex>(&server);
bool deallocated = false;
std::condition_variable cond;
std::mutex mutex;
size_t count = 0;
auto last = std::chrono::system_clock::now();
std::chrono::system_clock::duration diff;
SCOPED_LOCK_NAMED(mutex, lock);
{
std::shared_ptr<bool> flag(&deallocated, [](bool* ptr)->void { *ptr = true; });
feature.async(resourceMutex, [&cond, &mutex, flag, &count, &last, &diff](size_t& timeoutMsec, bool)->bool {
diff = std::chrono::system_clock::now() - last;
last = std::chrono::system_clock::now();
timeoutMsec = 100;
if (++count <= 1) return true;
SCOPED_LOCK(mutex);
cond.notify_all();
return false;
});
}
feature.prepare(); // start thread pool after a task has been scheduled, to trigger resize with a task
CHECK((std::cv_status::timeout != cond.wait_for(lock, std::chrono::milliseconds(1000))));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
CHECK((true == deallocated));
CHECK((2 == count));
CHECK((std::chrono::milliseconds(100) < diff));
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -45,6 +45,7 @@ struct IResearchViewMetaSetup {
}
~IResearchViewMetaSetup() {
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
}
};

View File

@ -62,6 +62,7 @@ struct LogicalDataSourceSetup {
}
~LogicalDataSourceSetup() {
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
// destroy application features

View File

@ -82,6 +82,7 @@ int main(int argc, char* argv[]) {
arangodb::application_features::ApplicationServer server(nullptr, nullptr);
arangodb::ShellColorsFeature sc(server);
arangodb::application_features::ApplicationServer::server = nullptr; // avoid "ApplicationServer initialized twice"
sc.prepare();
arangodb::ArangoGlobalContext ctx(1, const_cast<char**>(&ARGV0), ".");