//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2017 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "TtlFeature.h" #include "Aql/Query.h" #include "Aql/QueryRegistry.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ConditionLocker.h" #include "Basics/ConditionVariable.h" #include "Basics/Exceptions.h" #include "Basics/MutexLocker.h" #include "Basics/Thread.h" #include "Cluster/FollowerInfo.h" #include "Cluster/ServerState.h" #include "Indexes/Index.h" #include "Logger/Logger.h" #include "ProgramOptions/Parameters.h" #include "ProgramOptions/ProgramOptions.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/QueryRegistryFeature.h" #include "VocBase/LogicalCollection.h" #include "VocBase/vocbase.h" #include #include #include #include #include using namespace arangodb; using namespace arangodb::options; namespace { // the AQL query to remove documents std::string const removeQuery("FOR doc IN @@collection FILTER doc.@indexAttribute >= 0 && doc.@indexAttribute <= @stamp SORT doc.@indexAttribute LIMIT @limit REMOVE doc IN @@collection OPTIONS { ignoreErrors: true }"); } namespace arangodb { TtlStatistics& TtlStatistics::operator+=(VPackSlice const& other) { if (!other.isObject()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "expecting object for statistics"); } if (other.hasKey("runs")) { runs += other.get("runs").getNumericValue(); } if (other.hasKey("documentsRemoved")) { documentsRemoved += other.get("documentsRemoved").getNumericValue(); } if (other.hasKey("limitReached")) { limitReached += other.get("limitReached").getNumericValue(); } return *this; } void TtlStatistics::toVelocyPack(VPackBuilder& builder) const { builder.openObject(); builder.add("runs", VPackValue(runs)); builder.add("documentsRemoved", VPackValue(documentsRemoved)); builder.add("limitReached", VPackValue(limitReached)); builder.close(); } void TtlProperties::toVelocyPack(VPackBuilder& builder, bool isActive) const { builder.openObject(); builder.add("frequency", VPackValue(frequency)); builder.add("maxTotalRemoves", VPackValue(maxTotalRemoves)); builder.add("maxCollectionRemoves", VPackValue(maxCollectionRemoves)); builder.add("onlyLoadedCollections", VPackValue(onlyLoadedCollections)); builder.add("active", VPackValue(isActive)); builder.close(); } Result TtlProperties::fromVelocyPack(VPackSlice const& slice) { if (!slice.isObject()) { return Result(TRI_ERROR_BAD_PARAMETER, "expecting object for properties"); } try { uint64_t frequency = this->frequency; uint64_t maxTotalRemoves = this->maxTotalRemoves; uint64_t maxCollectionRemoves = this->maxCollectionRemoves; uint64_t onlyLoadedCollections = this->onlyLoadedCollections; if (slice.hasKey("frequency")) { if (!slice.get("frequency").isNumber()) { return Result(TRI_ERROR_BAD_PARAMETER, "expecting numeric value for frequency"); } frequency = slice.get("frequency").getNumericValue(); if (frequency < TtlProperties::minFrequency) { return Result(TRI_ERROR_BAD_PARAMETER, "too low value for frequency"); } } if (slice.hasKey("maxTotalRemoves")) { if (!slice.get("maxTotalRemoves").isNumber()) { return Result(TRI_ERROR_BAD_PARAMETER, "expecting numeric value for maxTotalRemoves"); } maxTotalRemoves = slice.get("maxTotalRemoves").getNumericValue(); } if (slice.hasKey("maxCollectionRemoves")) { if (!slice.get("maxCollectionRemoves").isNumber()) { return Result(TRI_ERROR_BAD_PARAMETER, "expecting numeric value for maxCollectionRemoves"); } maxCollectionRemoves = slice.get("maxCollectionRemoves").getNumericValue(); } if (slice.hasKey("onlyLoadedCollections")) { if (!slice.get("onlyLoadedCollections").isBool()) { return Result(TRI_ERROR_BAD_PARAMETER, "expecting boolean value for onlyLoadedCollections"); } onlyLoadedCollections = slice.get("onlyLoadedCollections").getBool(); } this->frequency = frequency; this->maxTotalRemoves = maxTotalRemoves; this->maxCollectionRemoves = maxCollectionRemoves; this->onlyLoadedCollections = onlyLoadedCollections; return Result(); } catch (arangodb::basics::Exception const& ex) { return Result(ex.code(), ex.what()); } catch (std::exception const& ex) { return Result(TRI_ERROR_INTERNAL, ex.what()); } } class TtlThread final : public Thread { public: explicit TtlThread(TtlFeature* ttlFeature) : Thread("TTL"), _ttlFeature(ttlFeature), _working(false) { TRI_ASSERT(_ttlFeature != nullptr); } ~TtlThread() { shutdown(); } void beginShutdown() override { Thread::beginShutdown(); // wake up the thread that may be waiting in run() wakeup(); } void wakeup() { // wake up the thread that may be waiting in run() CONDITION_LOCKER(guard, _condition); guard.signal(); } bool isCurrentlyWorking() const { return _working.load(); } /// @brief frequency is specified in milliseconds void setNextStart(uint64_t frequency) { CONDITION_LOCKER(guard, _condition); _nextStart = std::chrono::steady_clock::now() + std::chrono::milliseconds(frequency); } protected: void run() override { TtlProperties properties = _ttlFeature->properties(); setNextStart(properties.frequency); LOG_TOPIC("c2be7", TRACE, Logger::TTL) << "starting TTL background thread with interval " << properties.frequency << " milliseconds, max removals per run: " << properties.maxTotalRemoves << ", max removals per collection per run " << properties.maxCollectionRemoves; while (true) { auto now = std::chrono::steady_clock::now(); while (now < _nextStart) { if (isStopping()) { // server shutdown return; } // wait for our start... CONDITION_LOCKER(guard, _condition); guard.wait(std::chrono::microseconds(std::chrono::duration_cast(_nextStart - now))); now = std::chrono::steady_clock::now(); } if (isStopping()) { // server shutdown return; } // properties may have changed... update them properties = _ttlFeature->properties(); setNextStart(properties.frequency); try { TtlStatistics stats; // note: work() will do nothing if isActive() is false work(stats, properties); // merge stats _ttlFeature->updateStats(stats); } catch (std::exception const& ex) { LOG_TOPIC("6d28a", WARN, Logger::TTL) << "caught exception in TTL background thread: " << ex.what(); } catch (...) { LOG_TOPIC("44aa8", WARN, Logger::TTL) << "caught unknown exception in TTL background thread"; } } } private: /// @brief whether or not the background thread shall continue working bool isActive() const { return _ttlFeature->isActive() && !isStopping() && !ServerState::readOnly(); } void work(TtlStatistics& stats, TtlProperties const& properties) { if (!isActive()) { return; } // mark ourselves as busy _working = true; auto guard = scopeGuard([this]() { _working = false; }); LOG_TOPIC("139af", TRACE, Logger::TTL) << "ttl thread work()"; stats.runs++; auto queryRegistryFeature = application_features::ApplicationServer::getFeature( "QueryRegistry"); auto queryRegistry = queryRegistryFeature->queryRegistry(); double const stamp = TRI_microtime(); uint64_t limitLeft = properties.maxTotalRemoves; // iterate over all databases auto db = DatabaseFeature::DATABASE; for (auto const& name : db->getDatabaseNames()) { if (!isActive()) { // feature deactivated (for example, due to running on current follower in // active failover setup) return; } TRI_vocbase_t* vocbase = db->useDatabase(name); if (vocbase == nullptr) { continue; } // make sure we decrease the reference counter later TRI_DEFER(vocbase->release()); LOG_TOPIC("ec905", TRACE, Logger::TTL) << "TTL thread going to process database '" << vocbase->name() << "'"; std::vector> collections = vocbase->collections(false); for (auto const& collection : collections) { if (!isActive()) { // feature deactivated (for example, due to running on current follower in // active failover setup) return; } if (properties.onlyLoadedCollections && collection->status() != TRI_VOC_COL_STATUS_LOADED) { // we only care about collections that are already loaded here. // otherwise our check may load them all into memory, and sure this is // something we want to avoid here continue; } if (ServerState::instance()->isDBServer() && !collection->followers()->getLeader().empty()) { // we are a follower for this shard. do not remove any data here, but // let the leader carry out the removal and replicate it continue; } std::vector> indexes = collection->getIndexes(); for (auto const& index : indexes) { // we are only interested in collections with TTL indexes if (index->type() != Index::TRI_IDX_TYPE_TTL_INDEX) { continue; } // serialize the index description so we can read the "expireAfter" attribute _builder.clear(); index->toVelocyPack(_builder, Index::makeFlags()); VPackSlice ea = _builder.slice().get(StaticStrings::IndexExpireAfter); if (!ea.isNumber()) { // index description somehow invalid continue; } double expireAfter = ea.getNumericValue(); LOG_TOPIC("5cca5", DEBUG, Logger::TTL) << "TTL thread going to work for collection '" << collection->name() << "', expireAfter: " << Logger::FIXED(expireAfter, 0) << ", stamp: " << (stamp - expireAfter) << ", limit: " << std::min(properties.maxCollectionRemoves, limitLeft); auto bindVars = std::make_shared(); bindVars->openObject(); bindVars->add("@collection", VPackValue(collection->name())); bindVars->add(VPackValue("indexAttribute")); bindVars->openArray(); for (auto const& it : index->fields()[0]) { bindVars->add(VPackValue(it.name)); } bindVars->close(); bindVars->add("stamp", VPackValue(stamp - expireAfter)); bindVars->add("limit", VPackValue(std::min(properties.maxCollectionRemoves, limitLeft))); bindVars->close(); aql::Query query(false, *vocbase, aql::QueryString(::removeQuery), bindVars, nullptr, arangodb::aql::PART_MAIN); aql::QueryResult queryResult = query.executeSync(queryRegistry); if (queryResult.result.fail()) { // we can probably live with an error here... // the thread will try to remove the documents again on next iteration if (!queryResult.result.is(TRI_ERROR_ARANGO_READ_ONLY) && !queryResult.result.is(TRI_ERROR_ARANGO_CONFLICT) && !queryResult.result.is(TRI_ERROR_LOCKED) && !queryResult.result.is(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) { LOG_TOPIC("08300", WARN, Logger::TTL) << "error during TTL document removal for collection '" << collection->name() << "': " << queryResult.result.errorMessage(); } } else { auto extra = queryResult.extra; if (extra != nullptr) { VPackSlice v = extra->slice().get("stats"); if (v.isObject()) { v = v.get("writesExecuted"); if (v.isNumber()) { uint64_t removed = v.getNumericValue(); stats.documentsRemoved += removed; if (removed > 0) { LOG_TOPIC("2455e", DEBUG, Logger::TTL) << "TTL thread removed " << removed << " documents for collection '" << collection->name() << "'"; if (limitLeft >= removed) { limitLeft -= removed; } else { limitLeft = 0; } } } } } } // there can only be one TTL index per collection, so we can abort the loop here break; } if (limitLeft == 0) { // removed as much as we are allowed to. now stop and remove more in next iteration ++stats.limitReached; return; } if (isStopping()) { // server has been stopped, so abort our loop(s) return; } } } } private: TtlFeature* _ttlFeature; arangodb::basics::ConditionVariable _condition; /// @brief next time the thread should run std::chrono::time_point _nextStart; /// @brief a builder object we reuse to save a few memory allocations VPackBuilder _builder; /// @brief set to true while the TTL thread is actually performing deletions, /// false otherwise std::atomic _working; }; } TtlFeature::TtlFeature( application_features::ApplicationServer& server ) : ApplicationFeature(server, "Ttl"), _allowRunning(true), _active(true) { startsAfter("DatabasePhase"); startsAfter("ServerPhase"); } TtlFeature::~TtlFeature() { shutdownThread(); } void TtlFeature::collectOptions(std::shared_ptr options) { options->addSection("ttl", "TTL options"); options->addOption("ttl.frequency", "frequency (in milliseconds) for the TTL background thread invocation. " "a value of 0 turns the TTL background thread off entirely", new UInt64Parameter(&_properties.frequency)); options->addOption("ttl.max-total-removes", "maximum number of documents to remove per invocation of the TTL thread", new UInt64Parameter(&_properties.maxTotalRemoves)); options->addOption("ttl.max-collection-removes", "maximum number of documents to remove per collection", new UInt64Parameter(&_properties.maxCollectionRemoves)); options->addOption("ttl.only-loaded-collection", "only consider already loaded collections for removal", new BooleanParameter(&_properties.onlyLoadedCollections)); } void TtlFeature::validateOptions(std::shared_ptr options) { if (_properties.maxTotalRemoves == 0) { LOG_TOPIC("1e299", FATAL, arangodb::Logger::STARTUP) << "invalid value for '--ttl.max-total-removes'."; FATAL_ERROR_EXIT(); } if (_properties.maxCollectionRemoves == 0) { LOG_TOPIC("2ab82", FATAL, arangodb::Logger::STARTUP) << "invalid value for '--ttl.max-collection-removes'."; FATAL_ERROR_EXIT(); } if (_properties.frequency < TtlProperties::minFrequency) { LOG_TOPIC("ea696", FATAL, arangodb::Logger::STARTUP) << "too low value for '--ttl.frequency'."; FATAL_ERROR_EXIT(); } } void TtlFeature::start() { // the thread will not run on a coordinator or an agency node, // just locally on DB servers or single servers if (ServerState::instance()->isCoordinator() || ServerState::instance()->isAgent()) { LOG_TOPIC("e94bb", DEBUG, Logger::TTL) << "turning off TTL feature because of coordinator / agency"; return; } DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature( "Database"); if (databaseFeature->checkVersion() || databaseFeature->upgrade()) { LOG_TOPIC("5614a", DEBUG, Logger::TTL) << "turning off TTL feature because of version checking or upgrade procedure"; return; } // a frequency of 0 means the thread is not started at all if (_properties.frequency == 0) { return; } MUTEX_LOCKER(locker, _threadMutex); if (application_features::ApplicationServer::isStopping()) { // don't create the thread if we are already shutting down return; } _thread.reset(new TtlThread(this)); if (!_thread->start()) { LOG_TOPIC("33c33", FATAL, Logger::TTL) << "could not start ttl background thread"; FATAL_ERROR_EXIT(); } } void TtlFeature::beginShutdown() { // this will make the TTL background thread stop as soon as possible deactivate(); MUTEX_LOCKER(locker, _threadMutex); if (_thread != nullptr) { // this will also wake up the thread if it should be sleeping _thread->beginShutdown(); } } void TtlFeature::stop() { shutdownThread(); } void TtlFeature::allowRunning(bool value) { { MUTEX_LOCKER(locker, _propertiesMutex); if (value) { _allowRunning = true; } else { _allowRunning = false; } } if (value) { return; } waitForThreadWork(); } void TtlFeature::waitForThreadWork() { while (true) { { MUTEX_LOCKER(locker, _threadMutex); if (_thread == nullptr) { break; } _thread->wakeup(); if (!_thread->isCurrentlyWorking()) { break; } } std::this_thread::sleep_for(std::chrono::milliseconds(5)); } } void TtlFeature::activate() { { MUTEX_LOCKER(locker, _propertiesMutex); if (_active) { // already activated return; } _active = true; } LOG_TOPIC("79862", DEBUG, Logger::TTL) << "activated TTL background thread"; } void TtlFeature::deactivate() { { MUTEX_LOCKER(locker, _propertiesMutex); if (!_active) { // already deactivated return; } _active = false; } waitForThreadWork(); LOG_TOPIC("898a7", DEBUG, Logger::TTL) << "deactivated TTL background thread"; } bool TtlFeature::isActive() const { MUTEX_LOCKER(locker, _propertiesMutex); return _allowRunning && _active; } void TtlFeature::statsToVelocyPack(VPackBuilder& builder) const { MUTEX_LOCKER(locker, _statisticsMutex); _statistics.toVelocyPack(builder); } void TtlFeature::updateStats(TtlStatistics const& stats) { MUTEX_LOCKER(locker, _statisticsMutex); _statistics += stats; } void TtlFeature::propertiesToVelocyPack(VPackBuilder& builder) const { MUTEX_LOCKER(locker, _propertiesMutex); _properties.toVelocyPack(builder, _active); } TtlProperties TtlFeature::properties() const { MUTEX_LOCKER(locker, _propertiesMutex); return _properties; } Result TtlFeature::propertiesFromVelocyPack(VPackSlice const& slice, VPackBuilder& out) { Result res; uint64_t frequency; bool active; { MUTEX_LOCKER(locker, _propertiesMutex); bool const hasActiveFlag = slice.isObject() && slice.hasKey("active"); if (hasActiveFlag && !slice.get("active").isBool()) { return Result(TRI_ERROR_BAD_PARAMETER, "active flag should be a boolean value"); } // store properties res = _properties.fromVelocyPack(slice); if (!res.fail() && hasActiveFlag) { // update active flag _active = slice.get("active").getBool(); } _properties.toVelocyPack(out, _active); frequency = _properties.frequency; active = _active; } { MUTEX_LOCKER(locker, _threadMutex); if (_thread != nullptr) { _thread->setNextStart(frequency); _thread->wakeup(); while (!active && _thread->isCurrentlyWorking()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } } return res; } void TtlFeature::shutdownThread() noexcept { MUTEX_LOCKER(locker, _threadMutex); if (_thread != nullptr) { try { _thread->beginShutdown(); while (_thread->isRunning()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } catch (...) { } _thread.reset(); } }