mirror of https://gitee.com/bigwinds/arangodb
fix undefined behavior/race in query cache result moving (#6538)
This commit is contained in:
parent
5595317ecb
commit
02a08dfe56
|
@ -538,7 +538,6 @@ QueryResult Query::execute(QueryRegistry* registry) {
|
|||
// check the query cache for an existing result
|
||||
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup(
|
||||
_vocbase, queryHash, _queryString);
|
||||
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);
|
||||
|
||||
if (cacheEntry != nullptr) {
|
||||
ExecContext const* exe = ExecContext::CURRENT;
|
||||
|
@ -728,10 +727,8 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
|
|||
// check the query cache for an existing result
|
||||
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup(
|
||||
_vocbase, queryHash, _queryString);
|
||||
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);
|
||||
|
||||
if (cacheEntry != nullptr) {
|
||||
|
||||
auto ctx = transaction::StandaloneContext::Create(_vocbase);
|
||||
ExecContext const* exe = ExecContext::CURRENT;
|
||||
// got a result from the query cache
|
||||
|
|
|
@ -39,7 +39,7 @@ using namespace arangodb::aql;
|
|||
static arangodb::aql::QueryCache Instance;
|
||||
|
||||
/// @brief maximum number of results in each per-database cache
|
||||
static size_t MaxResults = 128; // default value. can be changed later
|
||||
static std::atomic<size_t> MaxResults(128); // default value. can be changed later
|
||||
|
||||
/// @brief whether or not the cache is enabled
|
||||
static std::atomic<arangodb::aql::QueryCacheMode> Mode(CACHE_ON_DEMAND);
|
||||
|
@ -58,30 +58,6 @@ QueryCacheResultEntry::QueryCacheResultEntry(
|
|||
_deletionRequested(0) {
|
||||
}
|
||||
|
||||
/// @brief check whether the element can be destroyed, and delete it if yes
|
||||
void QueryCacheResultEntry::tryDelete() {
|
||||
_deletionRequested = 1;
|
||||
|
||||
if (_refCount == 0) {
|
||||
delete this;
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief use the element, so it cannot be deleted meanwhile
|
||||
void QueryCacheResultEntry::use() { ++_refCount; }
|
||||
|
||||
/// @brief unuse the element, so it can be deleted if required
|
||||
void QueryCacheResultEntry::unuse() {
|
||||
TRI_ASSERT(_refCount > 0);
|
||||
|
||||
if (--_refCount == 0) {
|
||||
if (_deletionRequested == 1) {
|
||||
// trigger the deletion
|
||||
delete this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief create a database-specific cache
|
||||
QueryCacheDatabaseEntry::QueryCacheDatabaseEntry()
|
||||
: _entriesByHash(),
|
||||
|
@ -95,16 +71,12 @@ QueryCacheDatabaseEntry::QueryCacheDatabaseEntry()
|
|||
|
||||
/// @brief destroy a database-specific cache
|
||||
QueryCacheDatabaseEntry::~QueryCacheDatabaseEntry() {
|
||||
for (auto& it : _entriesByHash) {
|
||||
tryDelete(it.second);
|
||||
}
|
||||
|
||||
_entriesByHash.clear();
|
||||
_entriesByCollection.clear();
|
||||
}
|
||||
|
||||
/// @brief lookup a query result in the database-specific cache
|
||||
QueryCacheResultEntry* QueryCacheDatabaseEntry::lookup(
|
||||
std::shared_ptr<QueryCacheResultEntry> QueryCacheDatabaseEntry::lookup(
|
||||
uint64_t hash, QueryString const& queryString) {
|
||||
auto it = _entriesByHash.find(hash);
|
||||
|
||||
|
@ -123,26 +95,20 @@ QueryCacheResultEntry* QueryCacheDatabaseEntry::lookup(
|
|||
}
|
||||
|
||||
// found an entry
|
||||
auto entry = (*it).second;
|
||||
|
||||
// mark the entry as being used so noone else can delete it while it is in use
|
||||
entry->use();
|
||||
|
||||
return entry;
|
||||
return (*it).second;
|
||||
}
|
||||
|
||||
/// @brief store a query result in the database-specific cache
|
||||
void QueryCacheDatabaseEntry::store(uint64_t hash,
|
||||
QueryCacheResultEntry* entry) {
|
||||
std::shared_ptr<QueryCacheResultEntry> entry) {
|
||||
// insert entry into the cache
|
||||
if (!_entriesByHash.emplace(hash, entry).second) {
|
||||
// remove previous entry
|
||||
auto it = _entriesByHash.find(hash);
|
||||
TRI_ASSERT(it != _entriesByHash.end());
|
||||
auto previous = (*it).second;
|
||||
unlink(previous);
|
||||
unlink(previous.get());
|
||||
_entriesByHash.erase(it);
|
||||
tryDelete(previous);
|
||||
|
||||
// and insert again
|
||||
_entriesByHash.emplace(hash, entry);
|
||||
|
@ -177,19 +143,19 @@ void QueryCacheDatabaseEntry::store(uint64_t hash,
|
|||
TRI_ASSERT(it != _entriesByHash.end());
|
||||
auto previous = (*it).second;
|
||||
_entriesByHash.erase(it);
|
||||
unlink(previous);
|
||||
tryDelete(previous);
|
||||
unlink(previous.get());
|
||||
throw;
|
||||
}
|
||||
|
||||
link(entry);
|
||||
link(entry.get());
|
||||
|
||||
enforceMaxResults(MaxResults);
|
||||
size_t maxResults = MaxResults.load();
|
||||
enforceMaxResults(maxResults);
|
||||
|
||||
TRI_ASSERT(_numElements <= MaxResults);
|
||||
TRI_ASSERT(_numElements <= maxResults);
|
||||
TRI_ASSERT(_head != nullptr);
|
||||
TRI_ASSERT(_tail != nullptr);
|
||||
TRI_ASSERT(_tail == entry);
|
||||
TRI_ASSERT(_tail == entry.get());
|
||||
TRI_ASSERT(entry->_next == nullptr);
|
||||
}
|
||||
|
||||
|
@ -217,13 +183,10 @@ void QueryCacheDatabaseEntry::invalidate(std::string const& collection) {
|
|||
if (it3 != _entriesByHash.end()) {
|
||||
// remove entry from the linked list
|
||||
auto entry = (*it3).second;
|
||||
unlink(entry);
|
||||
unlink(entry.get());
|
||||
|
||||
// erase it from hash table
|
||||
_entriesByHash.erase(it3);
|
||||
|
||||
// delete the object itself
|
||||
tryDelete(entry);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,15 +204,9 @@ void QueryCacheDatabaseEntry::enforceMaxResults(size_t value) {
|
|||
auto it = _entriesByHash.find(head->_hash);
|
||||
TRI_ASSERT(it != _entriesByHash.end());
|
||||
_entriesByHash.erase(it);
|
||||
tryDelete(head);
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief check whether the element can be destroyed, and delete it if yes
|
||||
void QueryCacheDatabaseEntry::tryDelete(QueryCacheResultEntry* e) {
|
||||
e->tryDelete();
|
||||
}
|
||||
|
||||
/// @brief unlink the result entry from the list
|
||||
void QueryCacheDatabaseEntry::unlink(QueryCacheResultEntry* e) {
|
||||
if (e->_prev != nullptr) {
|
||||
|
@ -312,7 +269,7 @@ VPackBuilder QueryCache::properties() {
|
|||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.add("mode", VPackValue(modeString(mode())));
|
||||
builder.add("maxResults", VPackValue(MaxResults));
|
||||
builder.add("maxResults", VPackValue(MaxResults.load()));
|
||||
builder.close();
|
||||
|
||||
return builder;
|
||||
|
@ -323,7 +280,7 @@ void QueryCache::properties(std::pair<std::string, size_t>& result) {
|
|||
MUTEX_LOCKER(mutexLocker, _propertiesLock);
|
||||
|
||||
result.first = modeString(mode());
|
||||
result.second = MaxResults;
|
||||
result.second = MaxResults.load();
|
||||
}
|
||||
|
||||
/// @brief set the cache properties
|
||||
|
@ -361,8 +318,8 @@ std::string QueryCache::modeString(QueryCacheMode mode) {
|
|||
}
|
||||
|
||||
/// @brief lookup a query result in the cache
|
||||
QueryCacheResultEntry* QueryCache::lookup(TRI_vocbase_t* vocbase, uint64_t hash,
|
||||
QueryString const& queryString) {
|
||||
std::shared_ptr<QueryCacheResultEntry> QueryCache::lookup(TRI_vocbase_t* vocbase, uint64_t hash,
|
||||
QueryString const& queryString) {
|
||||
auto const part = getPart(vocbase);
|
||||
READ_LOCKER(readLocker, _entriesLock[part]);
|
||||
|
||||
|
@ -379,7 +336,7 @@ QueryCacheResultEntry* QueryCache::lookup(TRI_vocbase_t* vocbase, uint64_t hash,
|
|||
/// @brief store a query in the cache
|
||||
/// if the call is successful, the cache has taken over ownership for the
|
||||
/// query result!
|
||||
QueryCacheResultEntry* QueryCache::store(
|
||||
std::shared_ptr<QueryCacheResultEntry> QueryCache::store(
|
||||
TRI_vocbase_t* vocbase, uint64_t hash, QueryString const& queryString,
|
||||
std::shared_ptr<VPackBuilder> result,
|
||||
std::vector<std::string> const& collections) {
|
||||
|
@ -392,7 +349,7 @@ QueryCacheResultEntry* QueryCache::store(
|
|||
auto const part = getPart(vocbase);
|
||||
|
||||
// create the cache entry outside the lock
|
||||
auto entry = std::make_unique<QueryCacheResultEntry>(
|
||||
auto entry = std::make_shared<QueryCacheResultEntry>(
|
||||
hash, queryString, result, collections);
|
||||
|
||||
WRITE_LOCKER(writeLocker, _entriesLock[part]);
|
||||
|
@ -407,8 +364,8 @@ QueryCacheResultEntry* QueryCache::store(
|
|||
}
|
||||
|
||||
// store cache entry
|
||||
(*it).second->store(hash, entry.get());
|
||||
return entry.release();
|
||||
(*it).second->store(hash, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/// @brief invalidate all queries for the given collections
|
||||
|
@ -512,16 +469,19 @@ void QueryCache::invalidate(unsigned int part) {
|
|||
}
|
||||
|
||||
/// @brief sets the maximum number of results in each per-database cache
|
||||
/// is called under the mutex
|
||||
void QueryCache::setMaxResults(size_t value) {
|
||||
if (value == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (value > MaxResults) {
|
||||
size_t maxResults = MaxResults.load();
|
||||
|
||||
if (value > maxResults) {
|
||||
enforceMaxResults(value);
|
||||
}
|
||||
|
||||
MaxResults = value;
|
||||
maxResults = value;
|
||||
}
|
||||
|
||||
/// @brief sets the caching mode
|
||||
|
|
|
@ -48,15 +48,6 @@ struct QueryCacheResultEntry {
|
|||
|
||||
~QueryCacheResultEntry() = default;
|
||||
|
||||
/// @brief check whether the element can be destroyed, and delete it if yes
|
||||
void tryDelete();
|
||||
|
||||
/// @brief use the element, so it cannot be deleted meanwhile
|
||||
void use();
|
||||
|
||||
/// @brief unuse the element, so it can be deleted if required
|
||||
void unuse();
|
||||
|
||||
uint64_t const _hash;
|
||||
std::string const _queryString;
|
||||
std::shared_ptr<arangodb::velocypack::Builder> _queryResult;
|
||||
|
@ -67,28 +58,6 @@ struct QueryCacheResultEntry {
|
|||
std::atomic<uint32_t> _deletionRequested;
|
||||
};
|
||||
|
||||
class QueryCacheResultEntryGuard {
|
||||
QueryCacheResultEntryGuard(QueryCacheResultEntryGuard const&) = delete;
|
||||
QueryCacheResultEntryGuard& operator=(QueryCacheResultEntryGuard const&) =
|
||||
delete;
|
||||
QueryCacheResultEntryGuard() = delete;
|
||||
|
||||
public:
|
||||
explicit QueryCacheResultEntryGuard(QueryCacheResultEntry* entry)
|
||||
: _entry(entry) {}
|
||||
|
||||
~QueryCacheResultEntryGuard() {
|
||||
if (_entry != nullptr) {
|
||||
_entry->unuse();
|
||||
}
|
||||
}
|
||||
|
||||
QueryCacheResultEntry* get() { return _entry; }
|
||||
|
||||
private:
|
||||
QueryCacheResultEntry* _entry;
|
||||
};
|
||||
|
||||
struct QueryCacheDatabaseEntry {
|
||||
QueryCacheDatabaseEntry(QueryCacheDatabaseEntry const&) = delete;
|
||||
QueryCacheDatabaseEntry& operator=(QueryCacheDatabaseEntry const&) = delete;
|
||||
|
@ -100,10 +69,10 @@ struct QueryCacheDatabaseEntry {
|
|||
~QueryCacheDatabaseEntry();
|
||||
|
||||
/// @brief lookup a query result in the database-specific cache
|
||||
QueryCacheResultEntry* lookup(uint64_t, QueryString const&);
|
||||
std::shared_ptr<QueryCacheResultEntry> lookup(uint64_t, QueryString const&);
|
||||
|
||||
/// @brief store a query result in the database-specific cache
|
||||
void store(uint64_t, QueryCacheResultEntry*);
|
||||
void store(uint64_t, std::shared_ptr<QueryCacheResultEntry>);
|
||||
|
||||
/// @brief invalidate all entries for the given collections in the
|
||||
/// database-specific cache
|
||||
|
@ -116,9 +85,6 @@ struct QueryCacheDatabaseEntry {
|
|||
/// @brief enforce maximum number of results
|
||||
void enforceMaxResults(size_t);
|
||||
|
||||
/// @brief check whether the element can be destroyed, and delete it if yes
|
||||
void tryDelete(QueryCacheResultEntry*);
|
||||
|
||||
/// @brief unlink the result entry from the list
|
||||
void unlink(QueryCacheResultEntry*);
|
||||
|
||||
|
@ -126,7 +92,7 @@ struct QueryCacheDatabaseEntry {
|
|||
void link(QueryCacheResultEntry*);
|
||||
|
||||
/// @brief hash table that maps query hashes to query results
|
||||
std::unordered_map<uint64_t, QueryCacheResultEntry*> _entriesByHash;
|
||||
std::unordered_map<uint64_t, std::shared_ptr<QueryCacheResultEntry>> _entriesByHash;
|
||||
|
||||
/// @brief hash table that contains all collection-specific query results
|
||||
/// maps from collection names to a set of query results as defined in
|
||||
|
@ -177,12 +143,12 @@ class QueryCache {
|
|||
static std::string modeString(QueryCacheMode);
|
||||
|
||||
/// @brief lookup a query result in the cache
|
||||
QueryCacheResultEntry* lookup(TRI_vocbase_t*, uint64_t, QueryString const&);
|
||||
std::shared_ptr<QueryCacheResultEntry> lookup(TRI_vocbase_t*, uint64_t, QueryString const&);
|
||||
|
||||
/// @brief store a query in the cache
|
||||
/// if the call is successful, the cache has taken over ownership for the
|
||||
/// query result!
|
||||
QueryCacheResultEntry* store(TRI_vocbase_t*, uint64_t, QueryString const&,
|
||||
std::shared_ptr<QueryCacheResultEntry> store(TRI_vocbase_t*, uint64_t, QueryString const&,
|
||||
std::shared_ptr<arangodb::velocypack::Builder>,
|
||||
std::vector<std::string> const&);
|
||||
|
||||
|
@ -211,9 +177,6 @@ class QueryCache {
|
|||
/// note that the caller of this method must hold the write lock
|
||||
void invalidate(unsigned int);
|
||||
|
||||
/// @brief sets the maximum number of elements in the cache
|
||||
void setMaxResults(size_t);
|
||||
|
||||
/// @brief enable or disable the query cache
|
||||
void setMode(QueryCacheMode);
|
||||
|
||||
|
@ -221,6 +184,9 @@ class QueryCache {
|
|||
void setMode(std::string const&);
|
||||
|
||||
private:
|
||||
/// @brief sets the maximum number of elements in the cache
|
||||
void setMaxResults(size_t);
|
||||
|
||||
/// @brief number of R/W locks for the query cache
|
||||
static uint64_t const NumberOfParts = 8;
|
||||
|
||||
|
|
Loading…
Reference in New Issue