////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2017 EMC Corporation /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is EMC Corporation /// /// @author Andrey Abramov /// @author Vasily Nabatchikov //////////////////////////////////////////////////////////////////////////////// // otherwise define conflict between 3rdParty\date\include\date\date.h and 3rdParty\iresearch\core\shared.hpp #if defined(_MSC_VER) #include "date/date.h" #undef NOEXCEPT #endif #include "search/scorers.hpp" #include "utils/log.hpp" #include "ApplicationServerHelper.h" #include "Aql/AqlFunctionFeature.h" #include "Aql/AqlValue.h" #include "Aql/Function.h" #include "Basics/ConditionLocker.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" #include "ClusterEngine/ClusterEngine.h" #include "Containers/SmallVector.h" #include "FeaturePhases/V8FeaturePhase.h" #include "IResearch/Containers.h" #include "IResearch/IResearchCommon.h" #include "IResearch/IResearchFeature.h" #include "IResearch/IResearchLinkCoordinator.h" #include "IResearch/IResearchLinkHelper.h" #include "IResearch/IResearchMMFilesLink.h" #include "IResearch/IResearchRocksDBLink.h" #include "IResearch/IResearchRocksDBRecoveryHelper.h" #include "IResearch/IResearchView.h" #include "IResearch/IResearchViewCoordinator.h" #include "Logger/LogMacros.h" #include "MMFiles/MMFilesEngine.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/DatabasePathFeature.h" #include "RestServer/FlushFeature.h" #include "RestServer/UpgradeFeature.h" #include "RestServer/ViewTypesFeature.h" #include "RocksDBEngine/RocksDBEngine.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Methods.h" #include "VocBase/LogicalCollection.h" #include "VocBase/LogicalDataSource.h" #include "VocBase/LogicalView.h" namespace arangodb { namespace basics { class VPackStringBufferAdapter; } // namespace basics namespace aql { class Query; } // namespace aql } // namespace arangodb namespace { typedef irs::async_utils::read_write_mutex::read_mutex ReadMutex; typedef irs::async_utils::read_write_mutex::write_mutex WriteMutex; static const std::string FLUSH_COLLECTION_FIELD("cid"); static const std::string FLUSH_INDEX_FIELD("iid"); static const std::string FLUSH_VALUE_FIELD("value"); class IResearchLogTopic final : public arangodb::LogTopic { public: explicit IResearchLogTopic(std::string const& name) : arangodb::LogTopic(name, DEFAULT_LEVEL) { setIResearchLogLevel(DEFAULT_LEVEL); } virtual void setLogLevel(arangodb::LogLevel level) override { arangodb::LogTopic::setLogLevel(level); setIResearchLogLevel(level); } private: static arangodb::LogLevel const DEFAULT_LEVEL = arangodb::LogLevel::INFO; typedef std::underlying_type::type irsLogLevelType; typedef std::underlying_type::type arangoLogLevelType; static_assert(static_cast(irs::logger::IRL_FATAL) == static_cast(arangodb::LogLevel::FATAL) - 1 && static_cast(irs::logger::IRL_ERROR) == static_cast(arangodb::LogLevel::ERR) - 1 && static_cast(irs::logger::IRL_WARN) == static_cast(arangodb::LogLevel::WARN) - 1 && static_cast(irs::logger::IRL_INFO) == static_cast(arangodb::LogLevel::INFO) - 1 && static_cast(irs::logger::IRL_DEBUG) == static_cast(arangodb::LogLevel::DEBUG) - 1 && static_cast(irs::logger::IRL_TRACE) == static_cast(arangodb::LogLevel::TRACE) - 1, "inconsistent log level mapping"); static void setIResearchLogLevel(arangodb::LogLevel level) { if (level == arangodb::LogLevel::DEFAULT) { level = DEFAULT_LEVEL; } auto irsLevel = static_cast( static_cast(level) - 1); // -1 for DEFAULT irsLevel = std::max(irsLevel, irs::logger::IRL_FATAL); irsLevel = std::min(irsLevel, irs::logger::IRL_TRACE); irs::logger::output_le(irsLevel, stderr); } }; // IResearchLogTopic arangodb::aql::AqlValue dummyFilterFunc( arangodb::aql::ExpressionContext*, arangodb::transaction::Methods*, ::arangodb::containers::SmallVector const&) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_NOT_IMPLEMENTED, "ArangoSearch filter functions EXISTS, STARTS_WITH, IN_RANGE, PHRASE, MIN_MATCH, " "BOOST and ANALYZER " " are designed to be used only within a corresponding SEARCH statement " "of ArangoSearch view." " Please ensure function signature is correct."); } /// function body for ArangoSearchContext functions ANALYZER/BOOST. /// Just returns its first argument as outside ArangoSearch context /// there is nothing to do with search stuff, but optimization could roll. arangodb::aql::AqlValue dummyContextFunc( arangodb::aql::ExpressionContext*, arangodb::transaction::Methods*, ::arangodb::containers::SmallVector const& args) { TRI_ASSERT(!args.empty()); //ensured by function signature return args[0]; } /// Executes MIN_MATCH function with const parameters locally the same way it will be done in ArangoSearch on runtime /// This will allow optimize out MIN_MATCH call if all arguments are const arangodb::aql::AqlValue dummyMinMatchContextFunc( arangodb::aql::ExpressionContext*, arangodb::transaction::Methods*, ::arangodb::containers::SmallVector const& args) { TRI_ASSERT(args.size() > 1); // ensured by function signature auto& minMatchValue = args.back(); if (ADB_LIKELY(minMatchValue.isNumber())) { auto matchesLeft = minMatchValue.toInt64(); const auto argsCount = args.size() - 1; for (size_t i = 0; i < argsCount && matchesLeft > 0; ++i) { auto& currValue = args[i]; if (currValue.toBoolean()) { matchesLeft--; } } return arangodb::aql::AqlValue(arangodb::aql::AqlValueHintBool(matchesLeft == 0)); } else { auto message = std::string("'MIN_MATCH' AQL function: ") .append(" last argument has invalid type '") .append(minMatchValue.getTypeString()) .append("' (numeric expected)"); THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, message); } } arangodb::aql::AqlValue dummyScorerFunc( arangodb::aql::ExpressionContext*, arangodb::transaction::Methods*, ::arangodb::containers::SmallVector const&) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_NOT_IMPLEMENTED, "ArangoSearch scorer functions BM25() and TFIDF() are designed to " "be used only outside SEARCH statement within a context of ArangoSearch " "view." " Please ensure function signature is correct."); } size_t computeThreadPoolSize(size_t threads, size_t threadsLimit) { static const size_t MAX_THREADS = 8; // arbitrary limit on the upper bound of threads in pool static const size_t MIN_THREADS = 1; // at least one thread is required auto maxThreads = threadsLimit ? threadsLimit : MAX_THREADS; return threads ? threads : std::max(MIN_THREADS, std::min(maxThreads, size_t(std::thread::hardware_concurrency()) / 4)); } bool upgradeSingleServerArangoSearchView0_1( TRI_vocbase_t& vocbase, arangodb::velocypack::Slice const& /*upgradeParams*/) { using arangodb::application_features::ApplicationServer; if (!arangodb::ServerState::instance()->isSingleServer() && !arangodb::ServerState::instance()->isDBServer()) { return true; // not applicable for other ServerState roles } for (auto& view : vocbase.views()) { if (!arangodb::LogicalView::cast(view.get())) { continue; // not an IResearchView } arangodb::velocypack::Builder builder; arangodb::Result res; builder.openObject(); res = view->properties(builder, arangodb::LogicalDataSource::Serialization::Persistence); // get JSON with meta + 'version' builder.close(); if (!res.ok()) { LOG_TOPIC("c5dc4", WARN, arangodb::iresearch::TOPIC) << "failure to generate persisted definition while upgrading " "IResearchView from version 0 to version 1"; return false; // definition generation failure } auto versionSlice = builder.slice().get(arangodb::iresearch::StaticStrings::VersionField); if (!versionSlice.isNumber()) { LOG_TOPIC("eae1c", WARN, arangodb::iresearch::TOPIC) << "failure to find 'version' field while upgrading IResearchView " "from version 0 to version 1"; return false; // required field is missing } auto const version = versionSlice.getNumber(); if (0 != version) { continue; // no upgrade required } builder.clear(); builder.openObject(); res = view->properties(builder, arangodb::LogicalDataSource::Serialization::Properties); // get JSON with end-user definition builder.close(); if (!res.ok()) { LOG_TOPIC("d6e30", WARN, arangodb::iresearch::TOPIC) << "failure to generate persisted definition while upgrading " "IResearchView from version 0 to version 1"; return false; // definition generation failure } irs::utf8_path dataPath; auto& server = ApplicationServer::server(); if (!server.hasFeature()) { LOG_TOPIC("67c7e", WARN, arangodb::iresearch::TOPIC) << "failure to find feature 'DatabasePath' while upgrading " "IResearchView from version 0 to version 1"; return false; // required feature is missing } auto& dbPathFeature = server.getFeature(); // original algorithm for computing data-store path static const std::string subPath("databases"); static const std::string dbPath("database-"); dataPath = irs::utf8_path(dbPathFeature.directory()); dataPath /= subPath; dataPath /= dbPath; dataPath += std::to_string(vocbase.id()); dataPath /= arangodb::iresearch::DATA_SOURCE_TYPE.name(); dataPath += "-"; dataPath += std::to_string(view->id()); res = view->drop(); // drop view (including all links) if (!res.ok()) { LOG_TOPIC("cb9d1", WARN, arangodb::iresearch::TOPIC) << "failure to drop view while upgrading IResearchView from version " "0 to version 1"; return false; // view drom failure } // ......................................................................... // non-recoverable state below here // ......................................................................... // non-version 0 IResearchView implementations no longer drop from vocbase // on db-server, do it explicitly if (arangodb::ServerState::instance()->isDBServer()) { res = arangodb::LogicalViewHelperStorageEngine::drop(*view); if (!res.ok()) { LOG_TOPIC("bfb3d", WARN, arangodb::iresearch::TOPIC) << "failure to drop view from vocbase while upgrading " "IResearchView from version 0 to version 1"; return false; // view drom failure } } if (arangodb::ServerState::instance()->isSingleServer() || arangodb::ServerState::instance()->isDBServer()) { bool exists; // remove any stale data-store if (!dataPath.exists(exists) || (exists && !dataPath.remove())) { LOG_TOPIC("9ab42", WARN, arangodb::iresearch::TOPIC) << "failure to remove old data-store path while upgrading " "IResearchView from version 0 to version 1, view definition: " << builder.slice().toString(); return false; // data-store removal failure } } if (arangodb::ServerState::instance()->isDBServer()) { continue; // no need to recreate per-cid view } // recreate view res = arangodb::iresearch::IResearchView::factory().create(view, vocbase, builder.slice()); if (!res.ok()) { LOG_TOPIC("f8d20", WARN, arangodb::iresearch::TOPIC) << "failure to recreate view while upgrading IResearchView from " "version 0 to version 1, error: " << res.errorNumber() << " " << res.errorMessage() << ", view definition: " << builder.slice().toString(); return false; // data-store removal failure } } return true; } void registerFilters(arangodb::aql::AqlFunctionFeature& functions) { using arangodb::iresearch::addFunction; auto flags = arangodb::aql::Function::makeFlags(arangodb::aql::Function::Flags::Deterministic, arangodb::aql::Function::Flags::Cacheable, arangodb::aql::Function::Flags::CanRunOnDBServer); addFunction(functions, { "EXISTS", ".|.,.", flags, &dummyFilterFunc }); // (attribute, [ // "analyzer"|"type"|"string"|"numeric"|"bool"|"null" // ]) addFunction(functions, { "STARTS_WITH", ".,.|.", flags, &dummyFilterFunc }); // (attribute, prefix, scoring-limit) addFunction(functions, { "PHRASE", ".,.|.+", flags, &dummyFilterFunc }); // (attribute, input [, offset, input... ] [, analyzer]) addFunction(functions, { "IN_RANGE", ".,.,.,.,.", flags, &dummyFilterFunc }); // (attribute, lower, upper, include lower, include upper) addFunction(functions, { "MIN_MATCH", ".,.|.+", flags, &dummyMinMatchContextFunc }); // (filter expression [, filter expression, ... ], min match count) addFunction(functions, { "BOOST", ".,.", flags, &dummyContextFunc }); // (filter expression, boost) addFunction(functions, { "ANALYZER", ".,.", flags, &dummyContextFunc }); // (filter expression, analyzer) } namespace { template void registerSingleFactory(arangodb::application_features::ApplicationServer& server, arangodb::IndexTypeFactory const& factory) { auto const& indexType = arangodb::iresearch::DATA_SOURCE_TYPE.name(); if (server.hasFeature()) { auto& engine = server.getFeature(); auto& engineFactory = const_cast(engine.indexFactory()); arangodb::Result res = engineFactory.emplace(indexType, factory); if (!res.ok()) { THROW_ARANGO_EXCEPTION_MESSAGE( res.errorNumber(), std::string("failure registering IResearch link factory with index " "factory from feature '") + engine.name() + "': " + res.errorMessage()); } } } } // namespace void registerIndexFactory(arangodb::application_features::ApplicationServer& server) { registerSingleFactory( server, arangodb::iresearch::IResearchLinkCoordinator::factory()); registerSingleFactory( server, arangodb::iresearch::IResearchMMFilesLink::factory()); registerSingleFactory( server, arangodb::iresearch::IResearchRocksDBLink::factory()); } void registerScorers(arangodb::aql::AqlFunctionFeature& functions) { irs::string_ref const args(".|+"); // positional arguments (attribute [, // ...]); irs::scorers::visit([&functions, &args](irs::string_ref const& name, irs::text_format::type_id const& args_format) -> bool { // ArangoDB, for API consistency, only supports scorers configurable via // jSON if (irs::text_format::json != args_format) { return true; } std::string upperName = name; // AQL function external names are always in upper case std::transform(upperName.begin(), upperName.end(), upperName.begin(), ::toupper); arangodb::iresearch::addFunction( functions, { std::move(upperName), args.c_str(), arangodb::aql::Function::makeFlags(arangodb::aql::Function::Flags::Deterministic, arangodb::aql::Function::Flags::Cacheable, arangodb::aql::Function::Flags::CanRunOnDBServer), &dummyScorerFunc // function implementation }); return true; }); } void registerRecoveryHelper() { auto helper = std::make_shared(); auto res = arangodb::RocksDBEngine::registerRecoveryHelper(helper); if (res.fail()) { THROW_ARANGO_EXCEPTION_MESSAGE( res.errorNumber(), "failed to register RocksDB recovery helper"); } } void registerUpgradeTasks() { auto& server = arangodb::application_features::ApplicationServer::server(); if (!server.hasFeature()) { return; // nothing to register with (OK if no tasks actually need to be applied) } auto& upgrade = server.getFeature(); // move IResearch data-store from IResearchView to IResearchLink { arangodb::methods::Upgrade::Task task; task.name = "upgradeArangoSearch0_1"; task.description = "store ArangoSearch index on per linked collection basis"; task.systemFlag = arangodb::methods::Upgrade::Flags::DATABASE_ALL; task.clusterFlags = arangodb::methods::Upgrade::Flags::CLUSTER_DB_SERVER_LOCAL // db-server | arangodb::methods::Upgrade::Flags::CLUSTER_NONE // local server | arangodb::methods::Upgrade::Flags::CLUSTER_LOCAL; task.databaseFlags = arangodb::methods::Upgrade::Flags::DATABASE_UPGRADE; task.action = &upgradeSingleServerArangoSearchView0_1; upgrade.addTask(std::move(task)); } } void registerViewFactory() { auto& viewType = arangodb::iresearch::DATA_SOURCE_TYPE; auto& server = arangodb::application_features::ApplicationServer::server(); auto& viewTypes = server.getFeature(); arangodb::Result res; // DB server in custer or single-server if (arangodb::ServerState::instance()->isCoordinator()) { res = viewTypes.emplace(viewType, arangodb::iresearch::IResearchViewCoordinator::factory()); } else if (arangodb::ServerState::instance()->isDBServer()) { res = viewTypes.emplace(viewType, arangodb::iresearch::IResearchView::factory()); } else if (arangodb::ServerState::instance()->isSingleServer()) { res = viewTypes.emplace(viewType, arangodb::iresearch::IResearchView::factory()); } else { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_FAILED, std::string("Invalid role for arangosearch view creation.")); } if (!res.ok()) { THROW_ARANGO_EXCEPTION_MESSAGE( res.errorNumber(), std::string("failure registering arangosearch view factory: ") + res.errorMessage()); } } arangodb::Result transactionDataSourceRegistrationCallback( arangodb::LogicalDataSource& dataSource, arangodb::transaction::Methods& trx) { if (arangodb::iresearch::DATA_SOURCE_TYPE != dataSource.type()) { return {}; // not an IResearchView (noop) } // TODO FIXME find a better way to look up a LogicalView #ifdef ARANGODB_ENABLE_MAINTAINER_MODE auto* view = dynamic_cast(&dataSource); #else auto* view = static_cast(&dataSource); #endif if (!view) { LOG_TOPIC("f42f8", WARN, arangodb::iresearch::TOPIC) << "failure to get LogicalView while processing a TransactionState by " "IResearchFeature for name '" << dataSource.name() << "'"; return {TRI_ERROR_INTERNAL}; } // TODO FIXME find a better way to look up an IResearch View auto& impl = arangodb::LogicalView::cast(*view); return arangodb::Result(impl.apply(trx) ? TRI_ERROR_NO_ERROR : TRI_ERROR_INTERNAL); } void registerTransactionDataSourceRegistrationCallback() { if (arangodb::ServerState::instance()->isSingleServer()) { arangodb::transaction::Methods::addDataSourceRegistrationCallback( &transactionDataSourceRegistrationCallback); } } std::string const FEATURE_NAME("ArangoSearch"); IResearchLogTopic LIBIRESEARCH("libiresearch"); } // namespace namespace arangodb { namespace iresearch { bool isFilter(arangodb::aql::Function const& func) noexcept { return func.implementation == &dummyFilterFunc || func.implementation == &dummyContextFunc || func.implementation == &dummyMinMatchContextFunc; } bool isScorer(arangodb::aql::Function const& func) noexcept { return func.implementation == &dummyScorerFunc; } class IResearchFeature::Async { public: typedef std::function Fn; explicit Async(IResearchFeature& feature, size_t poolSize = 0); Async(IResearchFeature& feature, size_t poolSize, Async&& other); ~Async(); void emplace(std::shared_ptr const& mutex, Fn&& fn); // add an asynchronous task void notify() const; // notify all tasks size_t poolSize() { return _pool.size(); } void start(); private: struct Pending { Fn _fn; // the function to execute std::shared_ptr _mutex; // mutex for the task resources std::chrono::system_clock::time_point _timeout; // when the task should be notified // (std::chrono::milliseconds::max() == disabled) Pending(std::shared_ptr const& mutex, Fn&& fn) : _fn(std::move(fn)), _mutex(mutex), _timeout(std::chrono::system_clock::time_point::max()) {} }; struct Task : public Pending { std::unique_lock _lock; // prevent resource deallocation explicit Task(Pending&& pending) : Pending(std::move(pending)) {} }; struct Thread : public arangodb::Thread { mutable std::condition_variable _cond; // trigger task run mutable std::mutex _mutex; // mutex used with '_cond' and '_pending' Thread* _next; // next thread in circular-list (never null!!!) (need to // store pointer for move-assignment) std::vector _pending; // pending tasks std::atomic _size; // approximate size of the active+pending task list std::vector _tasks; // the tasks to perform std::atomic* _terminate; // trigger termination of this thread (need // to store pointer for move-assignment) mutable bool _wasNotified; // a notification was raised from another thread explicit Thread(arangodb::application_features::ApplicationServer& server, std::string const& name) : arangodb::Thread(server, name), _next(nullptr), _terminate(nullptr), _wasNotified(false) {} Thread(Thread&& other) // used in constructor before tasks are started : arangodb::Thread(other._server, other.name()), _next(nullptr), _terminate(nullptr), _wasNotified(false) {} ~Thread() { shutdown(); } virtual bool isSystem() override { return true; } // or start(...) will fail virtual void run() override; }; arangodb::basics::ConditionVariable _join; // mutex to join on std::vector _pool; // thread pool (size fixed for the entire life of object) std::atomic _terminate; // unconditionaly terminate async tasks void stop(Thread* redelegate = nullptr); }; void IResearchFeature::Async::Thread::run() { std::vector pendingRedelegate; std::chrono::system_clock::time_point timeout; bool timeoutSet = false; for (;;) { bool onlyPending; auto pendingStart = _tasks.size(); { SCOPED_LOCK_NAMED(_mutex, lock); // aquire before '_terminate' check so // that don't miss notify() if (_terminate->load()) { break; // termination requested } // transfer any new pending tasks into active tasks for (auto& pending : _pending) { _tasks.emplace_back(std::move(pending)); // will aquire resource lock auto& task = _tasks.back(); if (task._mutex) { task._lock = std::unique_lock(task._mutex->mutex(), std::try_to_lock); if (!task._lock.owns_lock()) { // if can't lock 'task._mutex' then reasign the task to the next // worker pendingRedelegate.emplace_back(std::move(task)); } else if (*(task._mutex)) { continue; // resourceMutex acquisition successful } _tasks.pop_back(); // resource no longer valid } } _pending.clear(); _size.store(_tasks.size()); // do not sleep if a notification was raised or pending tasks were added if (_wasNotified || pendingStart < _tasks.size() || !pendingRedelegate.empty()) { timeout = std::chrono::system_clock::now(); timeoutSet = true; } // sleep until timeout if (!timeoutSet) { _cond.wait(lock); // wait forever } else { _cond.wait_until(lock, timeout); // wait for timeout or notify } onlyPending = !_wasNotified && pendingStart < _tasks.size(); // process all tasks if a notification was raised _wasNotified = false; // ignore notification since woke up if (_terminate->load()) { // check again after sleep break; // termination requested } } timeoutSet = false; // transfer some tasks to '_next' if have too many if (!pendingRedelegate.empty() || (_size.load() > _next->_size.load() * 2 && _tasks.size() > 1)) { SCOPED_LOCK(_next->_mutex); // reasign to '_next' tasks that failed resourceMutex aquisition while (!pendingRedelegate.empty()) { _next->_pending.emplace_back(std::move(pendingRedelegate.back())); pendingRedelegate.pop_back(); ++_next->_size; } // transfer some tasks to '_next' if have too many while (_size.load() > _next->_size.load() * 2 && _tasks.size() > 1) { _next->_pending.emplace_back(std::move(_tasks.back())); _tasks.pop_back(); ++_next->_size; --_size; } _next->_cond.notify_all(); // notify thread about a new task (thread may // be sleeping indefinitely) } for (size_t i = onlyPending ? pendingStart : 0, count = _tasks.size(); // optimization to skip previously run // tasks if a notificationw as not raised i < count;) { auto& task = _tasks[i]; auto exec = std::chrono::system_clock::now() >= task._timeout; size_t timeoutMsec = 0; // by default reschedule for the same time span try { if (!task._fn(timeoutMsec, exec)) { if (i + 1 < count) { std::swap(task, _tasks[count - 1]); // swap 'i' with tail } _tasks.pop_back(); // remove stale tail --count; continue; } } catch (...) { LOG_TOPIC("d43ee", WARN, arangodb::iresearch::TOPIC) << "caught error while executing asynchronous task"; IR_LOG_EXCEPTION(); timeoutMsec = 0; // sleep until previously set timeout } // task reschedule time modification requested if (timeoutMsec) { task._timeout = std::chrono::system_clock::now() + std::chrono::milliseconds(timeoutMsec); } timeout = timeoutSet ? std::min(timeout, task._timeout) : task._timeout; timeoutSet = true; ++i; } } // ........................................................................... // move all tasks back into _pending in case the may neeed to be reasigned // ........................................................................... SCOPED_LOCK_NAMED(_mutex, lock); // '_pending' may be modified asynchronously for (auto& task : pendingRedelegate) { _pending.emplace_back(std::move(task)); } for (auto& task : _tasks) { _pending.emplace_back(std::move(task)); } _tasks.clear(); } IResearchFeature::Async::Async(IResearchFeature& feature, size_t poolSize) : _terminate(false) { poolSize = std::max(size_t(1), poolSize); // need at least one thread for (size_t i = 0; i < poolSize; ++i) { _pool.emplace_back(feature.server(), std::string("ArangoSearch #") + std::to_string(i)); } auto* last = &(_pool.back()); // build circular list for (auto& thread : _pool) { last->_next = &thread; last = &thread; thread._terminate = &_terminate; } } IResearchFeature::Async::Async(IResearchFeature& feature, size_t poolSize, Async&& other) : Async(feature, poolSize) { other.stop(&_pool[0]); } IResearchFeature::Async::~Async() { stop(); } void IResearchFeature::Async::emplace(std::shared_ptr const& mutex, Fn&& fn) { if (!fn) { return; // skip empty functers } auto& thread = _pool[0]; SCOPED_LOCK(thread._mutex); thread._pending.emplace_back(mutex, std::move(fn)); ++thread._size; thread._cond.notify_all(); // notify thread about a new task (thread may be // sleeping indefinitely) } void IResearchFeature::Async::notify() const { // notify all threads for (auto& thread : _pool) { SCOPED_LOCK(thread._mutex); thread._cond.notify_all(); thread._wasNotified = true; } } void IResearchFeature::Async::start() { // start threads for (auto& thread : _pool) { thread.start(&_join); } LOG_TOPIC("c1b64", DEBUG, arangodb::iresearch::TOPIC) << "started " << _pool.size() << " ArangoSearch maintenance thread(s)"; } void IResearchFeature::Async::stop(Thread* redelegate /*= nullptr*/) { _terminate.store(true); // request stop asynchronous tasks notify(); // notify all threads CONDITION_LOCKER(lock, _join); // join with all threads in pool for (auto& thread : _pool) { if (thread.hasStarted()) { while (thread.isRunning()) { _join.wait(); } } // redelegate all thread tasks if requested if (redelegate) { SCOPED_LOCK(redelegate->_mutex); for (auto& task : thread._pending) { redelegate->_pending.emplace_back(std::move(task)); ++redelegate->_size; } thread._pending.clear(); redelegate->_cond.notify_all(); // notify thread about a new task (thread // may be sleeping indefinitely) } } } IResearchFeature::IResearchFeature(arangodb::application_features::ApplicationServer& server) : ApplicationFeature(server, IResearchFeature::name()), _async(std::make_unique(*this)), _running(false), _threads(0), _threadsLimit(0) { setOptional(true); startsAfter(); startsAfter(); // used for retrieving IResearch // analyzers for functions startsAfter(); } void IResearchFeature::async(std::shared_ptr const& mutex, Async::Fn&& fn) { _async->emplace(mutex, std::move(fn)); } void IResearchFeature::asyncNotify() const { _async->notify(); } void IResearchFeature::beginShutdown() { _running.store(false); ApplicationFeature::beginShutdown(); } void IResearchFeature::collectOptions(std::shared_ptr options) { auto section = FEATURE_NAME; _running.store(false); std::transform(section.begin(), section.end(), section.begin(), ::tolower); ApplicationFeature::collectOptions(options); options->addSection(section, std::string("Configure the ") + FEATURE_NAME + " feature"); options->addOption(std::string("--") + section + ".threads", "the exact number of threads to use for asynchronous " "tasks (0 == autodetect)", new arangodb::options::UInt64Parameter(&_threads)); options->addOption(std::string("--") + section + ".threads-limit", "upper limit to the autodetected number of threads to use " "for asynchronous tasks (0 == use default)", new arangodb::options::UInt64Parameter(&_threadsLimit)); } /*static*/ std::string const& IResearchFeature::name() { return FEATURE_NAME; } void IResearchFeature::prepare() { TRI_ASSERT(isEnabled()); _running.store(false); ApplicationFeature::prepare(); // load all known codecs ::iresearch::formats::init(); // load all known scorers ::iresearch::scorers::init(); // register 'arangosearch' index registerIndexFactory(server()); // register 'arangosearch' view registerViewFactory(); // register 'arangosearch' Transaction DataSource registration callback registerTransactionDataSourceRegistrationCallback(); registerRecoveryHelper(); // start the async task thread pool if (!ServerState::instance()->isCoordinator() // not a coordinator && !ServerState::instance()->isAgent()) { auto poolSize = computeThreadPoolSize(_threads, _threadsLimit); if (_async->poolSize() != poolSize) { _async = std::make_unique(*this, poolSize, std::move(*_async)); } _async->start(); } } void IResearchFeature::start() { TRI_ASSERT(isEnabled()); ApplicationFeature::start(); // register IResearchView filters { if (server().hasFeature()) { auto& functions = server().getFeature(); registerFilters(functions); registerScorers(functions); } else { LOG_TOPIC("462d7", WARN, arangodb::iresearch::TOPIC) << "failure to find feature 'AQLFunctions' while registering " "arangosearch filters"; } } registerUpgradeTasks(); // register tasks after UpgradeFeature::prepare() has finished _running.store(true); } void IResearchFeature::stop() { TRI_ASSERT(isEnabled()); _running.store(false); ApplicationFeature::stop(); } void IResearchFeature::unprepare() { TRI_ASSERT(isEnabled()); _running.store(false); ApplicationFeature::unprepare(); } void IResearchFeature::validateOptions(std::shared_ptr options) { _running.store(false); ApplicationFeature::validateOptions(options); } } // namespace iresearch } // namespace arangodb // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // -----------------------------------------------------------------------------