//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include #include #include #include #include "Cache/Manager.h" #include "Basics/Common.h" #include "Basics/SharedPRNG.h" #include "Basics/voc-errors.h" #include "Cache/Cache.h" #include "Cache/CachedValue.h" #include "Cache/Common.h" #include "Cache/FrequencyBuffer.h" #include "Cache/ManagerTasks.h" #include "Cache/Metadata.h" #include "Cache/PlainCache.h" #include "Cache/Table.h" #include "Cache/Transaction.h" #include "Cache/TransactionalCache.h" #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" using namespace arangodb::cache; const uint64_t Manager::minSize = 1024 * 1024; const uint64_t Manager::minCacheAllocation = Cache::minSize + Table::allocationSize(Table::minLogSize) + std::max(PlainCache::allocationSize(true), TransactionalCache::allocationSize(true)) + Manager::cacheRecordOverhead; const std::chrono::milliseconds Manager::rebalancingGracePeriod(10); Manager::Manager(PostFn schedulerPost, uint64_t globalLimit, bool enableWindowedStats) : _lock(), _shutdown(false), _shuttingDown(false), _resizing(false), _rebalancing(false), _accessStats((globalLimit >= (1024 * 1024 * 1024)) ? ((1024 * 1024) / sizeof(uint64_t)) : (globalLimit / (1024 * sizeof(uint64_t)))), _enableWindowedStats(enableWindowedStats), _findStats(nullptr), _findHits(), _findMisses(), _caches(), _nextCacheId(1), _globalSoftLimit(globalLimit), _globalHardLimit(globalLimit), _globalHighwaterMark(static_cast( Manager::highwaterMultiplier * static_cast(_globalSoftLimit))), _fixedAllocation(sizeof(Manager) + Manager::tableListsOverhead + _accessStats.memoryUsage()), _spareTableAllocation(0), _globalAllocation(_fixedAllocation), _transactions(), _schedulerPost(schedulerPost), _resizeAttempt(0), _outstandingTasks(0), _rebalancingTasks(0), _resizingTasks(0), _rebalanceCompleted(std::chrono::steady_clock::now() - Manager::rebalancingGracePeriod) { TRI_ASSERT(_globalAllocation < _globalSoftLimit); TRI_ASSERT(_globalAllocation < _globalHardLimit); if (enableWindowedStats) { try { _findStats.reset(new Manager::FindStatBuffer(16384)); _fixedAllocation += _findStats->memoryUsage(); _globalAllocation = _fixedAllocation; } catch (std::bad_alloc const&) { _findStats.reset(nullptr); _enableWindowedStats = false; } } } Manager::~Manager() { try { shutdown(); } catch (...) { // no exceptions allowed here } } std::shared_ptr Manager::createCache(CacheType type, bool enableWindowedStats, uint64_t maxSize) { std::shared_ptr result(nullptr); _lock.writeLock(); bool allowed = isOperational(); Metadata metadata; std::shared_ptr table(nullptr); uint64_t id = _nextCacheId++; if (allowed) { uint64_t fixedSize = 0; switch (type) { case CacheType::Plain: fixedSize = PlainCache::allocationSize(enableWindowedStats); break; case CacheType::Transactional: fixedSize = TransactionalCache::allocationSize(enableWindowedStats); break; default: break; } std::tie(allowed, metadata, table) = registerCache(fixedSize, maxSize); } if (allowed) { switch (type) { case CacheType::Plain: result = PlainCache::create(this, id, std::move(metadata), table, enableWindowedStats); break; case CacheType::Transactional: result = TransactionalCache::create(this, id, std::move(metadata), table, enableWindowedStats); break; default: break; } } if (result.get() != nullptr) { _caches.try_emplace(id, result); } _lock.writeUnlock(); return result; } void Manager::destroyCache(std::shared_ptr cache) { Cache::destroy(cache); } void Manager::beginShutdown() { _lock.writeLock(); if (!_shutdown) { _shuttingDown = true; } _lock.writeUnlock(); } void Manager::shutdown() { _lock.writeLock(); if (!_shutdown) { if (!_shuttingDown) { _shuttingDown = true; } while (!_caches.empty()) { std::shared_ptr cache = _caches.begin()->second; _lock.writeUnlock(); cache->shutdown(); _lock.writeLock(); } freeUnusedTables(); _shutdown = true; } _lock.writeUnlock(); } // change global cache limit bool Manager::resize(uint64_t newGlobalLimit) { _lock.writeLock(); if ((newGlobalLimit < Manager::minSize) || (static_cast(0.5 * (1.0 - Manager::highwaterMultiplier) * static_cast(newGlobalLimit)) < _fixedAllocation) || (static_cast(Manager::highwaterMultiplier * static_cast(newGlobalLimit)) < (_caches.size() * Manager::minCacheAllocation))) { _lock.writeUnlock(); return false; } bool success = true; if (!isOperational() || globalProcessRunning()) { // shut(ting) down or still have another global process running already success = false; } else { bool done = adjustGlobalLimitsIfAllowed(newGlobalLimit); if (!done) { // otherwise we need to actually resize _resizing = true; _globalSoftLimit = newGlobalLimit; _globalHighwaterMark = static_cast( Manager::highwaterMultiplier * static_cast(_globalSoftLimit)); freeUnusedTables(); done = adjustGlobalLimitsIfAllowed(newGlobalLimit); if (!done) { rebalance(true); shrinkOvergrownCaches(TaskEnvironment::resizing); } } } _lock.writeUnlock(); return success; } uint64_t Manager::globalLimit() { _lock.readLock(); uint64_t limit = _resizing ? _globalSoftLimit : _globalHardLimit; _lock.readUnlock(); return limit; } uint64_t Manager::globalAllocation() { _lock.readLock(); uint64_t allocation = _globalAllocation; _lock.readUnlock(); return allocation; } std::pair Manager::globalHitRates() { double lifetimeRate = std::nan(""); double windowedRate = std::nan(""); uint64_t currentHits = _findHits.value(std::memory_order_relaxed); uint64_t currentMisses = _findMisses.value(std::memory_order_relaxed); if (currentHits + currentMisses > 0) { lifetimeRate = 100.0 * (static_cast(currentHits) / static_cast(currentHits + currentMisses)); } if (_enableWindowedStats && _findStats.get() != nullptr) { auto stats = _findStats->getFrequencies(); if (stats.size() == 1) { if (stats[0].first == static_cast(Stat::findHit)) { windowedRate = 100.0; } else { windowedRate = 0.0; } } else if (stats.size() == 2) { if (stats[0].first == static_cast(Stat::findHit)) { currentHits = stats[0].second; currentMisses = stats[1].second; } else { currentHits = stats[1].second; currentMisses = stats[0].second; } if (currentHits + currentMisses > 0) { windowedRate = 100.0 * (static_cast(currentHits) / static_cast(currentHits + currentMisses)); } } } return std::make_pair(lifetimeRate, windowedRate); } Transaction* Manager::beginTransaction(bool readOnly) { return _transactions.begin(readOnly); } void Manager::endTransaction(Transaction* tx) noexcept { _transactions.end(tx); } bool Manager::post(std::function fn) { return _schedulerPost(fn); } std::tuple> Manager::registerCache(uint64_t fixedSize, uint64_t maxSize) { TRI_ASSERT(_lock.isWriteLocked()); Metadata metadata; std::shared_ptr
table; bool ok = true; if ((_globalHighwaterMark / (_caches.size() + 1)) < Manager::minCacheAllocation) { ok = false; } if (ok) { table = leaseTable(Table::minLogSize); ok = (table != nullptr); } if (ok) { metadata = Metadata(Cache::minSize, fixedSize, table->memoryUsage(), maxSize); ok = increaseAllowed(metadata.allocatedSize - table->memoryUsage(), true); if (ok) { _globalAllocation += (metadata.allocatedSize - table->memoryUsage()); TRI_ASSERT(_globalAllocation >= _fixedAllocation); } } if (!ok && (table != nullptr)) { reclaimTable(table, true); table.reset(); } return std::make_tuple(ok, std::move(metadata), std::move(table)); } void Manager::unregisterCache(uint64_t id) { _lock.writeLock(); _accessStats.purgeRecord(id); auto it = _caches.find(id); if (it == _caches.end()) { _lock.writeUnlock(); return; } std::shared_ptr& cache = it->second; Metadata* metadata = cache->metadata(); metadata->readLock(); _globalAllocation -= metadata->allocatedSize; TRI_ASSERT(_globalAllocation >= _fixedAllocation); metadata->readUnlock(); _caches.erase(id); _lock.writeUnlock(); } std::pair Manager::requestGrow(Cache* cache) { Manager::time_point nextRequest = futureTime(100); bool allowed = false; bool ok = _lock.writeLock(Manager::triesSlow); if (ok) { if (isOperational() && !globalProcessRunning()) { Metadata* metadata = cache->metadata(); metadata->writeLock(); allowed = !metadata->isResizing() && !metadata->isMigrating(); if (allowed) { if (metadata->allocatedSize >= metadata->deservedSize && pastRebalancingGracePeriod()) { uint64_t increase = std::min(metadata->hardUsageLimit / 2, metadata->maxSize - metadata->allocatedSize); if (increase > 0 && increaseAllowed(increase)) { uint64_t newLimit = metadata->allocatedSize + increase; metadata->adjustDeserved(newLimit); } else { allowed = false; } } if (allowed) { nextRequest = std::chrono::steady_clock::now(); resizeCache(TaskEnvironment::none, cache, metadata->newLimit()); // unlocks metadata } } if (!allowed) { metadata->writeUnlock(); } } _lock.writeUnlock(); } return std::make_pair(allowed, nextRequest); } std::pair Manager::requestMigrate(Cache* cache, uint32_t requestedLogSize) { Manager::time_point nextRequest = futureTime(100); bool allowed = false; bool ok = _lock.writeLock(Manager::triesSlow); if (ok) { if (isOperational() && !globalProcessRunning()) { Metadata* metadata = cache->metadata(); metadata->writeLock(); allowed = !metadata->isMigrating(); if (allowed) { if (metadata->tableSize < Table::allocationSize(requestedLogSize)) { uint64_t increase = Table::allocationSize(requestedLogSize) - metadata->tableSize; if ((metadata->allocatedSize + increase >= metadata->deservedSize) && pastRebalancingGracePeriod()) { if (increaseAllowed(increase)) { uint64_t newLimit = metadata->allocatedSize + increase; uint64_t granted = metadata->adjustDeserved(newLimit); if (granted < newLimit) { allowed = false; } } else { allowed = false; } } } } if (allowed) { // first find out if cache is allowed to migrate allowed = metadata->migrationAllowed(Table::allocationSize(requestedLogSize)); } if (allowed) { // now find out if we can lease the table std::shared_ptr
table = leaseTable(requestedLogSize); allowed = (table.get() != nullptr); if (allowed) { nextRequest = std::chrono::steady_clock::now(); migrateCache(TaskEnvironment::none, cache, table); // unlocks metadata } } if (!allowed) { metadata->writeUnlock(); } } _lock.writeUnlock(); } return std::make_pair(allowed, nextRequest); } void Manager::reportAccess(uint64_t id) { if ((basics::SharedPRNG::rand() & static_cast(7)) == 0) { _accessStats.insertRecord(id); } } void Manager::reportHitStat(Stat stat) { switch (stat) { case Stat::findHit: { _findHits.add(1, std::memory_order_relaxed); if (_enableWindowedStats && _findStats.get() != nullptr) { _findStats->insertRecord(static_cast(Stat::findHit)); } break; } case Stat::findMiss: { _findMisses.add(1, std::memory_order_relaxed); if (_enableWindowedStats && _findStats.get() != nullptr) { _findStats->insertRecord(static_cast(Stat::findMiss)); } break; } default: { break; } } } bool Manager::isOperational() const { TRI_ASSERT(_lock.isLocked()); return (!_shutdown && !_shuttingDown); } bool Manager::globalProcessRunning() const { TRI_ASSERT(_lock.isLocked()); return (_rebalancing || _resizing); } void Manager::prepareTask(Manager::TaskEnvironment environment) { _outstandingTasks++; switch (environment) { case TaskEnvironment::rebalancing: { _rebalancingTasks++; break; } case TaskEnvironment::resizing: { _resizingTasks++; break; } case TaskEnvironment::none: default: { break; } } } void Manager::unprepareTask(Manager::TaskEnvironment environment) { switch (environment) { case TaskEnvironment::rebalancing: { if ((--_rebalancingTasks) == 0) { _lock.writeLock(); _rebalancing = false; _rebalanceCompleted = std::chrono::steady_clock::now(); _lock.writeUnlock(); }; break; } case TaskEnvironment::resizing: { if ((--_resizingTasks) == 0) { _lock.writeLock(); _resizing = false; _lock.writeUnlock(); }; break; } case TaskEnvironment::none: default: { break; } } _outstandingTasks--; } int Manager::rebalance(bool onlyCalculate) { if (!onlyCalculate) { _lock.writeLock(); if (_caches.size() == 0) { _lock.writeUnlock(); return TRI_ERROR_NO_ERROR; } if (!isOperational()) { _lock.writeUnlock(); return TRI_ERROR_SHUTTING_DOWN; } if (globalProcessRunning()) { _lock.writeUnlock(); return TRI_ERROR_ARANGO_BUSY; } // start rebalancing _rebalancing = true; } // adjust deservedSize for each cache std::shared_ptr cacheList = priorityList(); for (auto pair : (*cacheList)) { std::shared_ptr& cache = pair.first; double weight = pair.second; uint64_t newDeserved = static_cast( std::ceil(weight * static_cast(_globalHighwaterMark))); #ifdef ARANGODB_ENABLE_MAINTAINER_MODE if (newDeserved < Manager::minCacheAllocation) { LOG_TOPIC("eabec", DEBUG, Logger::CACHE) << "Deserved limit of " << newDeserved << " from weight " << weight << " and highwater " << _globalHighwaterMark << ". Should be at least " << Manager::minCacheAllocation; TRI_ASSERT(newDeserved >= Manager::minCacheAllocation); } #endif Metadata* metadata = cache->metadata(); metadata->writeLock(); #ifdef ARANGODB_ENABLE_MAINTAINER_MODE uint64_t fixed = metadata->fixedSize + metadata->tableSize + Manager::cacheRecordOverhead; if (newDeserved < fixed) { LOG_TOPIC("e63e4", DEBUG, Logger::CACHE) << "Setting deserved cache size " << newDeserved << " below usage: " << fixed << " ; Using weight " << weight; } #endif metadata->adjustDeserved(newDeserved); metadata->writeUnlock(); } if (!onlyCalculate) { if (_globalAllocation >= _globalHighwaterMark * 0.7) { shrinkOvergrownCaches(TaskEnvironment::rebalancing); } if (_rebalancingTasks.load() == 0) { _rebalanceCompleted = std::chrono::steady_clock::now(); _rebalancing = false; } _lock.writeUnlock(); } return TRI_ERROR_NO_ERROR; } void Manager::shrinkOvergrownCaches(Manager::TaskEnvironment environment) { TRI_ASSERT(_lock.isWriteLocked()); for (auto it : _caches) { std::shared_ptr& cache = it.second; // skip this cache if it is already resizing or shutdown! if (!cache->canResize()) { continue; } Metadata* metadata = cache->metadata(); metadata->writeLock(); if (metadata->allocatedSize > metadata->deservedSize) { resizeCache(environment, cache.get(), metadata->newLimit()); // unlocks metadata } else { metadata->writeUnlock(); } } } void Manager::freeUnusedTables() { TRI_ASSERT(_lock.isWriteLocked()); for (size_t i = 0; i < 32; i++) { while (!_tables[i].empty()) { auto table = _tables[i].top(); _globalAllocation -= table->memoryUsage(); TRI_ASSERT(_globalAllocation >= _fixedAllocation); _tables[i].pop(); } } } bool Manager::adjustGlobalLimitsIfAllowed(uint64_t newGlobalLimit) { TRI_ASSERT(_lock.isWriteLocked()); if (newGlobalLimit < _globalAllocation) { return false; } _globalHighwaterMark = static_cast( Manager::highwaterMultiplier * static_cast(newGlobalLimit)); _globalSoftLimit = newGlobalLimit; _globalHardLimit = newGlobalLimit; return true; } void Manager::resizeCache(Manager::TaskEnvironment environment, Cache* cache, uint64_t newLimit) { TRI_ASSERT(_lock.isWriteLocked()); Metadata* metadata = cache->metadata(); TRI_ASSERT(metadata->isWriteLocked()); if (metadata->usage <= newLimit) { uint64_t oldLimit = metadata->hardUsageLimit; bool success = metadata->adjustLimits(newLimit, newLimit); TRI_ASSERT(success); metadata->writeUnlock(); _globalAllocation -= oldLimit; _globalAllocation += newLimit; TRI_ASSERT(_globalAllocation >= _fixedAllocation); return; } bool success = metadata->adjustLimits(newLimit, metadata->hardUsageLimit); TRI_ASSERT(success); TRI_ASSERT(!metadata->isResizing()); metadata->toggleResizing(); metadata->writeUnlock(); auto task = std::make_shared(environment, this, cache->shared_from_this()); bool dispatched = task->dispatch(); if (!dispatched) { // TODO: decide what to do if we don't have an io_service metadata->writeLock(); metadata->toggleResizing(); metadata->writeUnlock(); } } void Manager::migrateCache(Manager::TaskEnvironment environment, Cache* cache, std::shared_ptr
& table) { TRI_ASSERT(_lock.isWriteLocked()); Metadata* metadata = cache->metadata(); TRI_ASSERT(metadata->isWriteLocked()); TRI_ASSERT(!metadata->isMigrating()); metadata->toggleMigrating(); metadata->writeUnlock(); auto task = std::make_shared(environment, this, cache->shared_from_this(), table); bool dispatched = task->dispatch(); if (!dispatched) { // TODO: decide what to do if we don't have an io_service metadata->writeLock(); reclaimTable(table, true); metadata->toggleMigrating(); metadata->writeUnlock(); } } std::shared_ptr
Manager::leaseTable(uint32_t logSize) { TRI_ASSERT(_lock.isWriteLocked()); std::shared_ptr
table; if (_tables[logSize].empty()) { if (increaseAllowed(Table::allocationSize(logSize), true)) { try { table = std::make_shared
(logSize); _globalAllocation += table->memoryUsage(); TRI_ASSERT(_globalAllocation >= _fixedAllocation); } catch (std::bad_alloc const&) { table.reset(); } } } else { table = _tables[logSize].top(); _spareTableAllocation -= table->memoryUsage(); _tables[logSize].pop(); } return table; } void Manager::reclaimTable(std::shared_ptr
table, bool internal) { TRI_ASSERT(table.get() != nullptr); if (!internal) { _lock.writeLock(); } uint32_t logSize = table->logSize(); size_t maxTables = (logSize < 18) ? (1 << (18 - logSize)) : 1; if ((_tables[logSize].size() < maxTables) && ((table->memoryUsage() + _spareTableAllocation) < ((_globalSoftLimit - _globalHighwaterMark) / 2))) { _tables[logSize].emplace(table); _spareTableAllocation += table->memoryUsage(); } else { _globalAllocation -= table->memoryUsage(); TRI_ASSERT(_globalAllocation >= _fixedAllocation); table.reset(); } if (!internal) { _lock.writeUnlock(); } } bool Manager::increaseAllowed(uint64_t increase, bool privileged) const { TRI_ASSERT(_lock.isLocked()); if (privileged) { if (_resizing && (_globalAllocation <= _globalSoftLimit)) { return (increase <= (_globalSoftLimit - _globalAllocation)); } return (increase <= (_globalHardLimit - _globalAllocation)); } return (increase <= (_globalHighwaterMark - _globalAllocation)); } std::shared_ptr Manager::priorityList() { TRI_ASSERT(_lock.isWriteLocked()); double minimumWeight = static_cast(Manager::minCacheAllocation) / static_cast(_globalHighwaterMark); while (static_cast(std::ceil(minimumWeight * static_cast(_globalHighwaterMark))) < Manager::minCacheAllocation) { minimumWeight *= 1.001; // bump by 0.1% until we fix precision issues } double uniformMarginalWeight = 0.2 / static_cast(_caches.size()); double baseWeight = std::max(minimumWeight, uniformMarginalWeight); #ifdef ARANGODB_ENABLE_MAINTAINER_MODE LOG_TOPIC("7eac8", DEBUG, Logger::CACHE) << "uniformMarginalWeight " << uniformMarginalWeight; LOG_TOPIC("108e6", DEBUG, Logger::CACHE) << "baseWeight " << baseWeight; if (1.0 < (baseWeight * static_cast(_caches.size()))) { LOG_TOPIC("b2f55", FATAL, Logger::CACHE) << "weight: " << baseWeight << ", count: " << _caches.size(); TRI_ASSERT(1.0 >= (baseWeight * static_cast(_caches.size()))); } #endif double remainingWeight = 1.0 - (baseWeight * static_cast(_caches.size())); std::shared_ptr list(new PriorityList()); list->reserve(_caches.size()); // catalog accessed caches and count total accesses // to get basis for comparison typename AccessStatBuffer::stats_t stats = _accessStats.getFrequencies(); std::set accessed; uint64_t totalAccesses = 0; uint64_t globalUsage = 0; for (auto const& s : stats) { auto c = _caches.find(s.first); if (c != _caches.end()) { totalAccesses += s.second; accessed.emplace(c->second->id()); } } totalAccesses = std::max(static_cast(1), totalAccesses); double allocFrac = 0.8 * std::min(1.0, static_cast(_globalAllocation) / static_cast(_globalHighwaterMark)); // calculate global data usage for (auto it = _caches.begin(); it != _caches.end(); it++) { globalUsage += it->second->usage(); } globalUsage = std::max(globalUsage, static_cast(1)); // avoid div-by-zero // gather all unaccessed caches at beginning of list for (auto it = _caches.begin(); it != _caches.end(); it++) { std::shared_ptr& cache = it->second; auto found = accessed.find(cache->id()); if (found == accessed.end()) { double weight = baseWeight + (cache->usage() / globalUsage) * allocFrac; list->emplace_back(cache, weight); } } double accessNormalizer = ((1.0 - allocFrac) * remainingWeight) / static_cast(totalAccesses); double usageNormalizer = (allocFrac * remainingWeight) / static_cast(globalUsage); // gather all accessed caches in order for (auto s : stats) { auto it = accessed.find(s.first); if (it != accessed.end()) { std::shared_ptr& cache = _caches.find(s.first)->second; double accessWeight = static_cast(s.second) * accessNormalizer; double usageWeight = static_cast(cache->usage()) * usageNormalizer; TRI_ASSERT(accessWeight >= 0.0); TRI_ASSERT(usageWeight >= 0.0); list->emplace_back(cache, (baseWeight + accessWeight + usageWeight)); } } return list; } Manager::time_point Manager::futureTime(uint64_t millisecondsFromNow) { return (std::chrono::steady_clock::now() + std::chrono::milliseconds(millisecondsFromNow)); } bool Manager::pastRebalancingGracePeriod() const { TRI_ASSERT(_lock.isLocked()); bool ok = !_rebalancing; if (ok) { ok = (std::chrono::steady_clock::now() - _rebalanceCompleted) >= Manager::rebalancingGracePeriod; } return ok; }