//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "ClusterSelectivityEstimates.h" #include "Cluster/ClusterMethods.h" #include "Indexes/Index.h" #include "VocBase/LogicalCollection.h" using namespace arangodb; ClusterSelectivityEstimates::ClusterSelectivityEstimates(LogicalCollection& collection) : _collection(collection), _updating(false) {} void ClusterSelectivityEstimates::flush() { // wait until we ourselves are able to set the _updating flag while (_updating.load(std::memory_order_relaxed) || _updating.exchange(true, std::memory_order_acquire)) { std::this_thread::yield(); } auto guard = scopeGuard([this]() { _updating.store(false, std::memory_order_release); }); std::atomic_store(&_data, std::shared_ptr()); } IndexEstMap ClusterSelectivityEstimates::get(bool allowUpdating, TRI_voc_tid_t tid) { auto data = std::atomic_load(&_data); if (allowUpdating) { double const now = TRI_microtime(); bool useExpired = false; int tries = 0; do { if (data) { auto const& estimates = data->estimates; if (!estimates.empty() && (data->expireStamp > now || useExpired)) { // already have an estimate, and it is not yet expired // or, we have an expired estimate, and another thread is currently updating it return estimates; } } // only one thread is allowed to fetch the estimates from the DB servers at any given time if (_updating.load(std::memory_order_relaxed) || _updating.exchange(true, std::memory_order_acquire)) { useExpired = true; } else { auto guard = scopeGuard([this]() { _updating.store(false, std::memory_order_release); }); // must fetch estimates from coordinator IndexEstMap estimates; int res = selectivityEstimatesOnCoordinator(_collection.vocbase().name(), _collection.name(), estimates, tid); if (res == TRI_ERROR_NO_ERROR) { // store the updated estimates and return them set(estimates); return estimates; } } data = std::atomic_load(&_data); } while (++tries <= 3); } // give up! if (data) { // we have got some estimates before return data->estimates; } // return an empty map! return IndexEstMap(); } void ClusterSelectivityEstimates::set(IndexEstMap const& estimates) { // push new selectivity values into indexes' cache auto indexes = _collection.getIndexes(); for (std::shared_ptr& idx : indexes) { auto it = estimates.find(std::to_string(idx->id())); if (it != estimates.end()) { idx->updateClusterSelectivityEstimate(it->second); } } double ttl = defaultTtl; // let selectivity estimates expire less seldomly for system collections if (!_collection.name().empty() && _collection.name()[0] == '_') { ttl = systemCollectionTtl; } // finally update the cache std::atomic_store(&_data, std::make_shared(estimates, ttl)); }