1
0
Fork 0

less locking for ClusterSelectivityEstimates (#8892)

This commit is contained in:
Jan 2019-05-06 13:22:36 +02:00 committed by GitHub
parent 207850e04c
commit b06c44472b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 89 additions and 88 deletions

View File

@ -109,8 +109,8 @@ ClusterCollection::~ClusterCollection() {}
/// @brief fetches current index selectivity estimates
/// if allowUpdate is true, will potentially make a cluster-internal roundtrip
/// to fetch current values!
IndexEstMap ClusterCollection::clusterIndexEstimates(bool allowUpdate, TRI_voc_tick_t tid) const {
return _selectivityEstimates.get(allowUpdate, tid);
IndexEstMap ClusterCollection::clusterIndexEstimates(bool allowUpdating, TRI_voc_tick_t tid) {
return _selectivityEstimates.get(allowUpdating, tid);
}
/// @brief sets the current index selectivity estimates

View File

@ -62,7 +62,7 @@ class ClusterCollection final : public PhysicalCollection {
/// @brief fetches current index selectivity estimates
/// if allowUpdate is true, will potentially make a cluster-internal roundtrip
/// to fetch current values!
IndexEstMap clusterIndexEstimates(bool allowUpdate, TRI_voc_tick_t tid) const override;
IndexEstMap clusterIndexEstimates(bool allowUpdating, TRI_voc_tick_t tid) override;
/// @brief sets the current index selectivity estimates
void setClusterIndexEstimates(IndexEstMap&& estimates) override;

View File

@ -23,90 +23,81 @@
#include "ClusterSelectivityEstimates.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Cluster/ClusterMethods.h"
#include "Indexes/Index.h"
#include "VocBase/LogicalCollection.h"
using namespace arangodb;
ClusterSelectivityEstimates::ClusterSelectivityEstimates(LogicalCollection& collection)
: _collection(collection), _expireStamp(0.0) {}
: _collection(collection),
_updating(false) {}
void ClusterSelectivityEstimates::flush() {
WRITE_LOCKER(lock, _lock);
_estimates.clear();
_expireStamp = 0.0;
}
IndexEstMap ClusterSelectivityEstimates::get(bool allowUpdate, TRI_voc_tid_t tid) const {
double now;
{
READ_LOCKER(readLock, _lock);
if (!allowUpdate) {
// return whatever is there. may be empty as well
return _estimates;
}
now = TRI_microtime();
if (!_estimates.empty() && _expireStamp > now) {
// already have an estimate, and it is not yet expired
return _estimates;
}
// wait until we ourselves are able to set the _updating flag
while (_updating.load(std::memory_order_relaxed) || _updating.exchange(true, std::memory_order_acquire)) {
std::this_thread::yield();
}
// have no estimate yet, or it is already expired
// we have given up the read lock here
// because we now need to modify the estimates
auto guard = scopeGuard([this]() {
_updating.store(false, std::memory_order_release);
});
std::atomic_store<InternalData>(&_data, std::shared_ptr<InternalData>());
}
int tries = 0;
while (true) {
decltype(_estimates) estimates;
IndexEstMap ClusterSelectivityEstimates::get(bool allowUpdating, TRI_voc_tid_t tid) {
auto data = std::atomic_load<ClusterSelectivityEstimates::InternalData>(&_data);
WRITE_LOCKER(writeLock, _lock);
if (allowUpdating) {
double const now = TRI_microtime();
if (!_estimates.empty() && _expireStamp > now) {
// some other thread has updated the estimates for us... just use them
return _estimates;
}
int res = selectivityEstimatesOnCoordinator(_collection.vocbase().name(),
_collection.name(), estimates, tid);
if (res == TRI_ERROR_NO_ERROR) {
_estimates = estimates;
// let selectivity estimates expire less seldom for system collections
_expireStamp = now + defaultTtl * (_collection.name()[0] == '_' ? 10.0 : 1.0);
// give up the lock, and then update the selectivity values for each index
writeLock.unlock();
// push new selectivity values into indexes' cache
auto indexes = _collection.getIndexes();
for (std::shared_ptr<Index>& idx : indexes) {
auto it = estimates.find(std::to_string(idx->id()));
if (it != estimates.end()) {
idx->updateClusterSelectivityEstimate(it->second);
bool useExpired = false;
int tries = 0;
do {
if (data) {
auto const& estimates = data->estimates;
if (!estimates.empty() && (data->expireStamp > now || useExpired)) {
// already have an estimate, and it is not yet expired
// or, we have an expired estimate, and another thread is currently updating it
return estimates;
}
}
return estimates;
}
// only one thread is allowed to fetch the estimates from the DB servers at any given time
if (_updating.load(std::memory_order_relaxed) || _updating.exchange(true, std::memory_order_acquire)) {
useExpired = true;
} else {
auto guard = scopeGuard([this]() {
_updating.store(false, std::memory_order_release);
});
if (++tries == 3) {
return _estimates;
}
// must fetch estimates from coordinator
IndexEstMap estimates;
int res = selectivityEstimatesOnCoordinator(_collection.vocbase().name(),
_collection.name(), estimates, tid);
if (res == TRI_ERROR_NO_ERROR) {
// store the updated estimates and return them
set(estimates);
return estimates;
}
}
data = std::atomic_load<ClusterSelectivityEstimates::InternalData>(&_data);
} while (++tries <= 3);
}
// give up!
if (data) {
// we have got some estimates before
return data->estimates;
}
// return an empty map!
return IndexEstMap();
}
void ClusterSelectivityEstimates::set(std::unordered_map<std::string, double>&& estimates) {
double const now = TRI_microtime();
void ClusterSelectivityEstimates::set(IndexEstMap const& estimates) {
// push new selectivity values into indexes' cache
auto indexes = _collection.getIndexes();
@ -117,13 +108,13 @@ void ClusterSelectivityEstimates::set(std::unordered_map<std::string, double>&&
idx->updateClusterSelectivityEstimate(it->second);
}
}
double ttl = defaultTtl;
// let selectivity estimates expire less seldomly for system collections
if (!_collection.name().empty() && _collection.name()[0] == '_') {
ttl = systemCollectionTtl;
}
// finally update the cache
{
WRITE_LOCKER(writelock, _lock);
_estimates = std::move(estimates);
// let selectivity estimates expire less seldom for system collections
_expireStamp = now + defaultTtl * (_collection.name()[0] == '_' ? 10.0 : 1.0);
}
std::atomic_store<ClusterSelectivityEstimates::InternalData>(&_data, std::make_shared<ClusterSelectivityEstimates::InternalData>(estimates, ttl));
}

View File

@ -25,7 +25,7 @@
#define ARANGOD_CLUSTER_ENGINE_CLUSTER_SELECTIVITY_ESTIMATES_H 1
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Indexes/IndexIterator.h"
#include "VocBase/voc-types.h"
namespace arangodb {
@ -40,16 +40,26 @@ class ClusterSelectivityEstimates {
/// @brief fetch estimates from cache or server
/// @param allowUpdate allow cluster communication
/// @param tid specify ongoing transaction this is a part of
std::unordered_map<std::string, double> get(bool allowUpdate, TRI_voc_tick_t tid) const;
void set(std::unordered_map<std::string, double>&& estimates);
IndexEstMap get(bool allowUpdating, TRI_voc_tick_t tid);
void set(IndexEstMap const& estimates);
private:
LogicalCollection& _collection;
mutable basics::ReadWriteLock _lock;
mutable std::unordered_map<std::string, double> _estimates;
mutable double _expireStamp;
struct InternalData {
IndexEstMap estimates;
double expireStamp;
InternalData(IndexEstMap const& estimates, double expireStamp)
: estimates(estimates), expireStamp(expireStamp) {}
};
static constexpr double defaultTtl = 60.0;
LogicalCollection& _collection;
// the current estimates, only load and stored using atomic operations
std::shared_ptr<InternalData> _data;
// whether or not a thread is currently updating the estimates
std::atomic<bool> _updating;
static constexpr double defaultTtl = 90.0;
static constexpr double systemCollectionTtl = 900.0;
};
} // namespace arangodb

View File

@ -55,8 +55,8 @@ PhysicalCollection::PhysicalCollection(LogicalCollection& collection,
/// @brief fetches current index selectivity estimates
/// if allowUpdate is true, will potentially make a cluster-internal roundtrip
/// to fetch current values!
IndexEstMap PhysicalCollection::clusterIndexEstimates(bool allowUpdate,
TRI_voc_tick_t tid) const {
IndexEstMap PhysicalCollection::clusterIndexEstimates(bool allowUpdating,
TRI_voc_tick_t tid) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"cluster index estimates called for non-cluster collection");

View File

@ -87,7 +87,7 @@ class PhysicalCollection {
/// @brief fetches current index selectivity estimates
/// if allowUpdate is true, will potentially make a cluster-internal roundtrip
/// to fetch current values!
virtual IndexEstMap clusterIndexEstimates(bool allowUpdate, TRI_voc_tick_t tid) const;
virtual IndexEstMap clusterIndexEstimates(bool allowUpdating, TRI_voc_tick_t tid);
/// @brief sets the current index selectivity estimates
virtual void setClusterIndexEstimates(IndexEstMap&& estimates);

View File

@ -425,8 +425,8 @@ std::unique_ptr<FollowerInfo> const& LogicalCollection::followers() const {
return _followers;
}
IndexEstMap LogicalCollection::clusterIndexEstimates(bool allowUpdate, TRI_voc_tid_t tid) {
return getPhysical()->clusterIndexEstimates(allowUpdate, tid);
IndexEstMap LogicalCollection::clusterIndexEstimates(bool allowUpdating, TRI_voc_tid_t tid) {
return getPhysical()->clusterIndexEstimates(allowUpdating, tid);
}
void LogicalCollection::setClusterIndexEstimates(IndexEstMap&& estimates) {

View File

@ -212,7 +212,7 @@ class LogicalCollection : public LogicalDataSource {
/// if allowUpdate is true, will potentially make a cluster-internal roundtrip
/// to fetch current values!
/// @param tid the optional transaction ID to use
IndexEstMap clusterIndexEstimates(bool allowUpdate, TRI_voc_tid_t tid = 0);
IndexEstMap clusterIndexEstimates(bool allowUpdating, TRI_voc_tid_t tid = 0);
/// @brief sets the current index selectivity estimates
void setClusterIndexEstimates(IndexEstMap&& estimates);