diff --git a/arangod/Cache/ManagerTasks.cpp b/arangod/Cache/ManagerTasks.cpp index d5771f1456..d07fa3ecc9 100644 --- a/arangod/Cache/ManagerTasks.cpp +++ b/arangod/Cache/ManagerTasks.cpp @@ -27,6 +27,8 @@ #include "Cache/Cache.h" #include "Cache/Manager.h" #include "Cache/Metadata.h" +#include "Scheduler/Scheduler.h" +#include "Scheduler/SchedulerFeature.h" using namespace arangodb::cache; @@ -37,14 +39,14 @@ FreeMemoryTask::FreeMemoryTask(Manager::TaskEnvironment environment, FreeMemoryTask::~FreeMemoryTask() {} bool FreeMemoryTask::dispatch() { - auto ioService = _manager->ioService(); - if (ioService == nullptr) { + auto scheduler = SchedulerFeature::SCHEDULER; + if (scheduler == nullptr) { return false; } _manager->prepareTask(_environment); auto self = shared_from_this(); - ioService->post([self, this]() -> void { run(); }); + scheduler->post([self, this]() -> void { run(); }); return true; } @@ -78,14 +80,14 @@ MigrateTask::MigrateTask(Manager::TaskEnvironment environment, Manager* manager, MigrateTask::~MigrateTask() {} bool MigrateTask::dispatch() { - auto ioService = _manager->ioService(); - if (ioService == nullptr) { + auto scheduler = SchedulerFeature::SCHEDULER; + if (scheduler == nullptr) { return false; } _manager->prepareTask(_environment); auto self = shared_from_this(); - ioService->post([self, this]() -> void { run(); }); + scheduler->post([self, this]() -> void { run(); }); return true; } diff --git a/arangod/MMFiles/MMFilesCollection.cpp b/arangod/MMFiles/MMFilesCollection.cpp index a8acce201d..60985386c4 100644 --- a/arangod/MMFiles/MMFilesCollection.cpp +++ b/arangod/MMFiles/MMFilesCollection.cpp @@ -593,7 +593,7 @@ int MMFilesCollection::close() { } // wait until ditches have been processed fully - while (_ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_DROP) || + while (_ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_DROP) || _ditches.contains(MMFilesDitch::TRI_DITCH_DATAFILE_RENAME) || _ditches.contains(MMFilesDitch::TRI_DITCH_COMPACTION)) { usleep(20000); @@ -696,7 +696,7 @@ int MMFilesCollection::rotateActiveJournal() { MMFilesDatafile* datafile = _journals.back(); TRI_ASSERT(datafile != nullptr); - + TRI_IF_FAILURE("CreateMultipleJournals") { // create an additional journal now, without sealing and renaming the old one! _datafiles.emplace_back(datafile); @@ -794,7 +794,7 @@ int MMFilesCollection::reserveJournalSpace(TRI_voc_tick_t tick, while (targetSize - 256 < size) { targetSize *= 2; } - + WRITE_LOCKER(writeLocker, _filesLock); TRI_ASSERT(_journals.size() <= 1); @@ -1603,16 +1603,12 @@ int MMFilesCollection::fillIndexes( TRI_ASSERT(n > 0); - TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); - auto ioService = SchedulerFeature::SCHEDULER->ioService(); - TRI_ASSERT(ioService != nullptr); - PerformanceLogScope logScope( std::string("fill-indexes-document-collection { collection: ") + _logicalCollection->vocbase()->name() + "/" + _logicalCollection->name() + " }, indexes: " + std::to_string(n - 1)); - auto queue = std::make_shared(ioService); + auto queue = std::make_shared(); try { TRI_ASSERT(!ServerState::instance()->isCoordinator()); @@ -2003,7 +1999,7 @@ void MMFilesCollection::prepareIndexes(VPackSlice indexesSlice) { bool foundPrimary = false; bool foundEdge = false; - + for (auto const& it : VPackArrayIterator(indexesSlice)) { auto const& s = it.get("type"); if (s.isString()) { @@ -2018,13 +2014,13 @@ void MMFilesCollection::prepareIndexes(VPackSlice indexesSlice) { for (auto const& idx : _indexes) { if (idx->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) { foundPrimary = true; - } else if (_logicalCollection->type() == TRI_COL_TYPE_EDGE && + } else if (_logicalCollection->type() == TRI_COL_TYPE_EDGE && idx->type() == Index::IndexType::TRI_IDX_TYPE_EDGE_INDEX) { foundEdge = true; } } - if (!foundPrimary || + if (!foundPrimary || (!foundEdge && _logicalCollection->type() == TRI_COL_TYPE_EDGE)) { // we still do not have any of the default indexes, so create them now createInitialIndexes(); @@ -2045,7 +2041,7 @@ void MMFilesCollection::prepareIndexes(VPackSlice indexesSlice) { auto idx = idxFactory->prepareIndexFromSlice(v, false, _logicalCollection, true); - + if (ServerState::instance()->isRunningInCluster()) { addIndexCoordinator(idx); } else { @@ -2056,7 +2052,7 @@ void MMFilesCollection::prepareIndexes(VPackSlice indexesSlice) { TRI_ASSERT(!_indexes.empty()); if (_indexes[0]->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX || - (_logicalCollection->type() == TRI_COL_TYPE_EDGE && + (_logicalCollection->type() == TRI_COL_TYPE_EDGE && (_indexes.size() < 2 || _indexes[1]->type() != Index::IndexType::TRI_IDX_TYPE_EDGE_INDEX))) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE for (auto const& it : _indexes) { @@ -2079,7 +2075,7 @@ void MMFilesCollection::prepareIndexes(VPackSlice indexesSlice) { errorMsg.append(_logicalCollection->name()); errorMsg.push_back('\''); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, errorMsg); - } + } foundPrimary = true; } } diff --git a/lib/Basics/LocalTaskQueue.cpp b/lib/Basics/LocalTaskQueue.cpp index c6b7fe85c6..caa8318e52 100644 --- a/lib/Basics/LocalTaskQueue.cpp +++ b/lib/Basics/LocalTaskQueue.cpp @@ -27,6 +27,8 @@ #include "Basics/MutexLocker.h" #include "Basics/asio-helper.h" #include "Logger/Logger.h" +#include "Scheduler/Scheduler.h" +#include "Scheduler/SchedulerFeature.h" using namespace arangodb::basics; @@ -37,15 +39,15 @@ using namespace arangodb::basics; LocalTask::LocalTask(std::shared_ptr const& queue) : _queue(queue) {} //////////////////////////////////////////////////////////////////////////////// -/// @brief dispatch this task to the underlying io_service +/// @brief dispatch this task to the scheduler //////////////////////////////////////////////////////////////////////////////// void LocalTask::dispatch() { auto self = shared_from_this(); - _queue->ioService()->post([self, this]() { + SchedulerFeature::SCHEDULER->post([self, this]() { _queue->startTask(); - try { - run(); + try { + run(); _queue->stopTask(); } catch (...) { _queue->stopTask(); @@ -75,50 +77,41 @@ void LocalCallbackTask::run() { } //////////////////////////////////////////////////////////////////////////////// -/// @brief dispatch this task to the underlying io_service +/// @brief dispatch the callback task to the scheduler //////////////////////////////////////////////////////////////////////////////// void LocalCallbackTask::dispatch() { auto self = shared_from_this(); - _queue->ioService()->post([self, this]() { run(); }); + SchedulerFeature::SCHEDULER->post([self, this]() { run(); }); } //////////////////////////////////////////////////////////////////////////////// -/// @brief create a queue using the specified io_service +/// @brief create a queue //////////////////////////////////////////////////////////////////////////////// -LocalTaskQueue::LocalTaskQueue(boost::asio::io_service* ioService) - : _ioService(ioService), - _queue(), +LocalTaskQueue::LocalTaskQueue() + : _queue(), _callbackQueue(), _condition(), _mutex(), _missing(0), _started(0), - _status(TRI_ERROR_NO_ERROR) { - TRI_ASSERT(_ioService != nullptr); -} - -////////////////////////////////////////////////////////////////////////////// -/// @brief exposes underlying io_service -////////////////////////////////////////////////////////////////////////////// - -boost::asio::io_service* LocalTaskQueue::ioService() { return _ioService; } + _status(TRI_ERROR_NO_ERROR) {} //////////////////////////////////////////////////////////////////////////////// /// @brief destroy the queue. //////////////////////////////////////////////////////////////////////////////// LocalTaskQueue::~LocalTaskQueue() {} - -void LocalTaskQueue::startTask() { + +void LocalTaskQueue::startTask() { CONDITION_LOCKER(guard, _condition); - ++_started; + ++_started; } -void LocalTaskQueue::stopTask() { +void LocalTaskQueue::stopTask() { CONDITION_LOCKER(guard, _condition); - --_started; + --_started; } ////////////////////////////////////////////////////////////////////////////// @@ -183,7 +176,7 @@ void LocalTaskQueue::dispatchAndWait() { if (_missing > 0 && _started == 0 && - _ioService->stopped()) { + SchedulerFeature::SCHEDULER->isStopping()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } @@ -210,10 +203,10 @@ void LocalTaskQueue::dispatchAndWait() { if (_missing == 0) { break; } - + if (_missing > 0 && _started == 0 && - _ioService->stopped()) { + SchedulerFeature::SCHEDULER->isStopping()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } diff --git a/lib/Basics/LocalTaskQueue.h b/lib/Basics/LocalTaskQueue.h index 4b7d12ad5b..f197ee0474 100644 --- a/lib/Basics/LocalTaskQueue.h +++ b/lib/Basics/LocalTaskQueue.h @@ -90,19 +90,13 @@ class LocalTaskQueue { LocalTaskQueue(LocalTaskQueue const&) = delete; LocalTaskQueue& operator=(LocalTaskQueue const&) = delete; - explicit LocalTaskQueue(boost::asio::io_service*); + explicit LocalTaskQueue(); ~LocalTaskQueue(); - void startTask(); + void startTask(); void stopTask(); - ////////////////////////////////////////////////////////////////////////////// - /// @brief exposes underlying io_service - ////////////////////////////////////////////////////////////////////////////// - - boost::asio::io_service* ioService(); - ////////////////////////////////////////////////////////////////////////////// /// @brief enqueue a task to be run ////////////////////////////////////////////////////////////////////////////// @@ -144,12 +138,6 @@ class LocalTaskQueue { int status(); private: - ////////////////////////////////////////////////////////////////////////////// - /// @brief io_service to dispatch tasks to - ////////////////////////////////////////////////////////////////////////////// - - boost::asio::io_service* _ioService; - ////////////////////////////////////////////////////////////////////////////// /// @brief internal task queue ////////////////////////////////////////////////////////////////////////////// @@ -179,11 +167,11 @@ class LocalTaskQueue { ////////////////////////////////////////////////////////////////////////////// size_t _missing; - + ////////////////////////////////////////////////////////////////////////////// /// @brief number of dispatched and started tasks ////////////////////////////////////////////////////////////////////////////// - + size_t _started; //////////////////////////////////////////////////////////////////////////////