1
0
Fork 0

fix undefined behavior in query cache result handling (#6544)

This commit is contained in:
Jan 2018-09-20 16:58:29 +02:00 committed by Michael Hackstein
parent a6cde70511
commit a93db9a6b9
4 changed files with 65 additions and 154 deletions

View File

@ -578,8 +578,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup(
&_vocbase, queryHash, _queryString
);
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);
if (cacheEntry != nullptr) {
bool hasPermissions = true;
@ -599,11 +598,6 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
// we don't have yet a transaction when we're here, so let's create
// a mimimal context to build the result
queryResult.context = transaction::StandaloneContext::Create(_vocbase);
queryResult.extra = std::make_shared<VPackBuilder>();
{
VPackObjectBuilder guard(queryResult.extra.get(), true);
addWarningsToVelocyPack(*queryResult.extra);
}
TRI_ASSERT(cacheEntry->_queryResult != nullptr);
queryResult.result = cacheEntry->_queryResult;
queryResult.extra = cacheEntry->_stats;
@ -784,7 +778,6 @@ ExecutionState Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry, Q
auto cacheEntry = arangodb::aql::QueryCache::instance()->lookup(
&_vocbase, queryHash, _queryString
);
arangodb::aql::QueryCacheResultEntryGuard guard(cacheEntry);
if (cacheEntry != nullptr) {
bool hasPermissions = true;
@ -974,11 +967,6 @@ ExecutionState Query::finalize(QueryResult& result) {
return state;
}
if (_cacheEntry != nullptr) {
_cacheEntry->_stats = result.extra;
QueryCache::instance()->store(&_vocbase, std::move(_cacheEntry));
}
addWarningsToVelocyPack(*result.extra);
double now = TRI_microtime();
if (_profile != nullptr && _queryOptions.profile >= PROFILE_LEVEL_BASIC) {
@ -987,6 +975,11 @@ ExecutionState Query::finalize(QueryResult& result) {
}
result.extra->close();
if (_cacheEntry != nullptr) {
_cacheEntry->_stats = result.extra;
QueryCache::instance()->store(&_vocbase, std::move(_cacheEntry));
}
// patch executionTime stats value in place
// we do this because "executionTime" should include the whole span of the execution and we have to set it at the very end
double const rt = runTime(now);

View File

@ -36,13 +36,13 @@ using namespace arangodb::aql;
namespace {
/// @brief singleton instance of the query cache
static arangodb::aql::QueryCache Instance;
static arangodb::aql::QueryCache instance;
/// @brief maximum number of results in each per-database cache
static size_t MaxResults = 128; // default value. can be changed later
static std::atomic<size_t> maxResults(128); // default value. can be changed later
/// @brief whether or not the cache is enabled
static std::atomic<arangodb::aql::QueryCacheMode> Mode(CACHE_ON_DEMAND);
static std::atomic<arangodb::aql::QueryCacheMode> mode(CACHE_ON_DEMAND);
}
/// @brief create a cache entry
@ -55,34 +55,7 @@ QueryCacheResultEntry::QueryCacheResultEntry(
_queryResult(queryResult),
_dataSources(dataSources),
_prev(nullptr),
_next(nullptr),
_refCount(0),
_deletionRequested(0) {
}
/// @brief check whether the element can be destroyed, and delete it if yes
void QueryCacheResultEntry::tryDelete() {
_deletionRequested = 1;
if (_refCount == 0) {
delete this;
}
}
/// @brief use the element, so it cannot be deleted meanwhile
void QueryCacheResultEntry::use() { ++_refCount; }
/// @brief unuse the element, so it can be deleted if required
void QueryCacheResultEntry::unuse() {
TRI_ASSERT(_refCount > 0);
if (--_refCount == 0) {
if (_deletionRequested == 1) {
// trigger the deletion
delete this;
}
}
}
_next(nullptr) {}
/// @brief create a database-specific cache
QueryCacheDatabaseEntry::QueryCacheDatabaseEntry()
@ -96,16 +69,12 @@ QueryCacheDatabaseEntry::QueryCacheDatabaseEntry()
/// @brief destroy a database-specific cache
QueryCacheDatabaseEntry::~QueryCacheDatabaseEntry() {
for (auto& it : _entriesByHash) {
tryDelete(it.second);
}
_entriesByHash.clear();
_entriesByDataSource.clear();
}
/// @brief lookup a query result in the database-specific cache
QueryCacheResultEntry* QueryCacheDatabaseEntry::lookup(
std::shared_ptr<QueryCacheResultEntry> QueryCacheDatabaseEntry::lookup(
uint64_t hash, QueryString const& queryString) {
auto it = _entriesByHash.find(hash);
@ -124,26 +93,20 @@ QueryCacheResultEntry* QueryCacheDatabaseEntry::lookup(
}
// found an entry
auto entry = (*it).second;
// mark the entry as being used so noone else can delete it while it is in use
entry->use();
return entry;
return (*it).second;
}
/// @brief store a query result in the database-specific cache
void QueryCacheDatabaseEntry::store(uint64_t hash,
QueryCacheResultEntry* entry) {
std::shared_ptr<QueryCacheResultEntry> entry) {
// insert entry into the cache
if (!_entriesByHash.emplace(hash, entry).second) {
// remove previous entry
auto it = _entriesByHash.find(hash);
TRI_ASSERT(it != _entriesByHash.end());
auto previous = (*it).second;
unlink(previous);
unlink(previous.get());
_entriesByHash.erase(it);
tryDelete(previous);
// and insert again
_entriesByHash.emplace(hash, entry);
@ -178,19 +141,19 @@ void QueryCacheDatabaseEntry::store(uint64_t hash,
TRI_ASSERT(it != _entriesByHash.end());
auto previous = (*it).second;
_entriesByHash.erase(it);
unlink(previous);
tryDelete(previous);
unlink(previous.get());
throw;
}
link(entry);
link(entry.get());
enforceMaxResults(::MaxResults);
size_t mr = ::maxResults.load();
enforceMaxResults(mr);
TRI_ASSERT(_numElements <= ::MaxResults);
TRI_ASSERT(_numElements <= mr);
TRI_ASSERT(_head != nullptr);
TRI_ASSERT(_tail != nullptr);
TRI_ASSERT(_tail == entry);
TRI_ASSERT(_tail == entry.get());
TRI_ASSERT(entry->_next == nullptr);
}
@ -218,13 +181,10 @@ void QueryCacheDatabaseEntry::invalidate(std::string const& dataSource) {
if (it3 != _entriesByHash.end()) {
// remove entry from the linked list
auto entry = (*it3).second;
unlink(entry);
unlink(entry.get());
// erase it from hash table
_entriesByHash.erase(it3);
// delete the object itself
tryDelete(entry);
}
}
@ -232,6 +192,7 @@ void QueryCacheDatabaseEntry::invalidate(std::string const& dataSource) {
}
/// @brief enforce maximum number of results
/// must be called under the shard's lock
void QueryCacheDatabaseEntry::enforceMaxResults(size_t value) {
while (_numElements > value) {
// too many elements. now wipe the first element from the list
@ -242,15 +203,9 @@ void QueryCacheDatabaseEntry::enforceMaxResults(size_t value) {
auto it = _entriesByHash.find(head->_hash);
TRI_ASSERT(it != _entriesByHash.end());
_entriesByHash.erase(it);
tryDelete(head);
}
}
/// @brief check whether the element can be destroyed, and delete it if yes
void QueryCacheDatabaseEntry::tryDelete(QueryCacheResultEntry* e) {
e->tryDelete();
}
/// @brief unlink the result entry from the list
void QueryCacheDatabaseEntry::unlink(QueryCacheResultEntry* e) {
if (e->_prev != nullptr) {
@ -313,7 +268,7 @@ VPackBuilder QueryCache::properties() {
VPackBuilder builder;
builder.openObject();
builder.add("mode", VPackValue(modeString(mode())));
builder.add("maxResults", VPackValue(::MaxResults));
builder.add("maxResults", VPackValue(::maxResults.load()));
builder.close();
return builder;
@ -324,7 +279,7 @@ void QueryCache::properties(std::pair<std::string, size_t>& result) {
MUTEX_LOCKER(mutexLocker, _propertiesLock);
result.first = modeString(mode());
result.second = ::MaxResults;
result.second = ::maxResults.load();
}
/// @brief set the cache properties
@ -343,7 +298,7 @@ bool QueryCache::mayBeActive() const { return (mode() != CACHE_ALWAYS_OFF); }
/// @brief return whether or not the query cache is enabled
QueryCacheMode QueryCache::mode() const {
return ::Mode.load(std::memory_order_relaxed);
return ::mode.load(std::memory_order_relaxed);
}
/// @brief return a string version of the mode
@ -362,8 +317,8 @@ std::string QueryCache::modeString(QueryCacheMode mode) {
}
/// @brief lookup a query result in the cache
QueryCacheResultEntry* QueryCache::lookup(TRI_vocbase_t* vocbase, uint64_t hash,
QueryString const& queryString) {
std::shared_ptr<QueryCacheResultEntry> QueryCache::lookup(TRI_vocbase_t* vocbase, uint64_t hash,
QueryString const& queryString) {
auto const part = getPart(vocbase);
READ_LOCKER(readLocker, _entriesLock[part]);
@ -380,21 +335,21 @@ QueryCacheResultEntry* QueryCache::lookup(TRI_vocbase_t* vocbase, uint64_t hash,
/// @brief store a query in the cache
/// if the call is successful, the cache has taken over ownership for the
/// query result!
QueryCacheResultEntry* QueryCache::store(
void QueryCache::store(
TRI_vocbase_t* vocbase, uint64_t hash, QueryString const& queryString,
std::shared_ptr<VPackBuilder> const& result,
std::shared_ptr<VPackBuilder> const& stats,
std::vector<std::string>&& dataSources) {
if (!result->slice().isArray()) {
return nullptr;
return;
}
// get the right part of the cache to store the result in
auto const part = getPart(vocbase);
// create the cache entry outside the lock
auto entry = std::make_unique<QueryCacheResultEntry>(
auto entry = std::make_shared<QueryCacheResultEntry>(
hash, queryString, result, std::move(dataSources));
WRITE_LOCKER(writeLocker, _entriesLock[part]);
@ -404,17 +359,15 @@ QueryCacheResultEntry* QueryCache::store(
if (it == _entries[part].end()) {
// create entry for the current database
auto db = std::make_unique<QueryCacheDatabaseEntry>();
it = _entries[part].emplace(vocbase, db.get()).first;
db.release();
it = _entries[part].emplace(vocbase, std::move(db)).first;
}
// store cache entry
(*it).second->store(hash, entry.get());
return entry.release();
(*it).second->store(hash, entry);
}
/// @brief store a query in the cache
void QueryCache::store(TRI_vocbase_t* vocbase, std::unique_ptr<QueryCacheResultEntry> entry) {
void QueryCache::store(TRI_vocbase_t* vocbase, std::shared_ptr<QueryCacheResultEntry> entry) {
// get the right part of the cache to store the result in
auto const part = getPart(vocbase);
@ -425,13 +378,11 @@ void QueryCache::store(TRI_vocbase_t* vocbase, std::unique_ptr<QueryCacheResultE
if (it == _entries[part].end()) {
// create entry for the current database
auto db = std::make_unique<QueryCacheDatabaseEntry>();
it = _entries[part].emplace(vocbase, db.get()).first;
db.release();
it = _entries[part].emplace(vocbase, std::move(db)).first;
}
// store cache entry
(*it).second->store(entry->_hash, entry.get());
entry.release();
(*it).second->store(entry->_hash, entry);
}
/// @brief invalidate all queries for the given data sources
@ -467,7 +418,7 @@ void QueryCache::invalidate(TRI_vocbase_t* vocbase, std::string const& dataSourc
/// @brief invalidate all queries for a particular database
void QueryCache::invalidate(TRI_vocbase_t* vocbase) {
QueryCacheDatabaseEntry* databaseQueryCache = nullptr;
std::unique_ptr<QueryCacheDatabaseEntry> databaseQueryCache;
{
auto const part = getPart(vocbase);
@ -479,13 +430,12 @@ void QueryCache::invalidate(TRI_vocbase_t* vocbase) {
return;
}
databaseQueryCache = (*it).second;
databaseQueryCache = std::move((*it).second);
_entries[part].erase(it);
}
// delete without holding the lock
TRI_ASSERT(databaseQueryCache != nullptr);
delete databaseQueryCache;
}
/// @brief invalidate all queries
@ -504,7 +454,7 @@ void QueryCache::invalidate() {
}
/// @brief get the query cache instance
QueryCache* QueryCache::instance() { return &::Instance; }
QueryCache* QueryCache::instance() { return &::instance; }
/// @brief enforce maximum number of elements in each database-specific cache
void QueryCache::enforceMaxResults(size_t value) {
@ -527,10 +477,6 @@ unsigned int QueryCache::getPart(TRI_vocbase_t const* vocbase) const {
/// @brief invalidate all entries in the cache part
/// note that the caller of this method must hold the write lock
void QueryCache::invalidate(unsigned int part) {
for (auto& it : _entries[part]) {
delete it.second;
}
_entries[part].clear();
}
@ -540,11 +486,13 @@ void QueryCache::setMaxResults(size_t value) {
return;
}
if (value > ::MaxResults) {
size_t mr = ::maxResults.load();
if (value > mr) {
enforceMaxResults(value);
}
::MaxResults = value;
mr = value;
}
/// @brief sets the caching mode
@ -556,7 +504,7 @@ void QueryCache::setMode(QueryCacheMode value) {
invalidate();
::Mode.store(value, std::memory_order_release);
::mode.store(value, std::memory_order_release);
}
/// @brief enable or disable the query cache

View File

@ -50,15 +50,6 @@ struct QueryCacheResultEntry {
~QueryCacheResultEntry() = default;
/// @brief check whether the element can be destroyed, and delete it if yes
void tryDelete();
/// @brief use the element, so it cannot be deleted meanwhile
void use();
/// @brief unuse the element, so it can be deleted if required
void unuse();
uint64_t const _hash;
std::string const _queryString;
std::shared_ptr<arangodb::velocypack::Builder> _queryResult;
@ -66,30 +57,6 @@ struct QueryCacheResultEntry {
std::vector<std::string> const _dataSources;
QueryCacheResultEntry* _prev;
QueryCacheResultEntry* _next;
std::atomic<uint32_t> _refCount;
std::atomic<uint32_t> _deletionRequested;
};
class QueryCacheResultEntryGuard {
QueryCacheResultEntryGuard(QueryCacheResultEntryGuard const&) = delete;
QueryCacheResultEntryGuard& operator=(QueryCacheResultEntryGuard const&) =
delete;
QueryCacheResultEntryGuard() = delete;
public:
explicit QueryCacheResultEntryGuard(QueryCacheResultEntry* entry)
: _entry(entry) {}
~QueryCacheResultEntryGuard() {
if (_entry != nullptr) {
_entry->unuse();
}
}
QueryCacheResultEntry* get() { return _entry; }
private:
QueryCacheResultEntry* _entry;
};
struct QueryCacheDatabaseEntry {
@ -103,10 +70,10 @@ struct QueryCacheDatabaseEntry {
~QueryCacheDatabaseEntry();
/// @brief lookup a query result in the database-specific cache
QueryCacheResultEntry* lookup(uint64_t hash, QueryString const& queryString);
std::shared_ptr<QueryCacheResultEntry> lookup(uint64_t hash, QueryString const& queryString);
/// @brief store a query result in the database-specific cache
void store(uint64_t hash, QueryCacheResultEntry* entry);
void store(uint64_t hash, std::shared_ptr<QueryCacheResultEntry> entry);
/// @brief invalidate all entries for the given data sources in the
/// database-specific cache
@ -117,11 +84,9 @@ struct QueryCacheDatabaseEntry {
void invalidate(std::string const& dataSource);
/// @brief enforce maximum number of results
/// must be called under the cache's properties lock
void enforceMaxResults(size_t);
/// @brief check whether the element can be destroyed, and delete it if yes
void tryDelete(QueryCacheResultEntry*);
/// @brief unlink the result entry from the list
void unlink(QueryCacheResultEntry*);
@ -129,7 +94,7 @@ struct QueryCacheDatabaseEntry {
void link(QueryCacheResultEntry*);
/// @brief hash table that maps query hashes to query results
std::unordered_map<uint64_t, QueryCacheResultEntry*> _entriesByHash;
std::unordered_map<uint64_t, std::shared_ptr<QueryCacheResultEntry>> _entriesByHash;
/// @brief hash table that contains all data souce-specific query results
/// maps from data sources names to a set of query results as defined in
@ -180,18 +145,18 @@ class QueryCache {
static std::string modeString(QueryCacheMode);
/// @brief lookup a query result in the cache
QueryCacheResultEntry* lookup(TRI_vocbase_t* vocbase, uint64_t hash, QueryString const& queryString);
std::shared_ptr<QueryCacheResultEntry> lookup(TRI_vocbase_t* vocbase, uint64_t hash, QueryString const& queryString);
/// @brief store a query in the cache
/// if the call is successful, the cache has taken over ownership for the
/// query result!
QueryCacheResultEntry* store(TRI_vocbase_t* vocbase, uint64_t hash, QueryString const& queryString,
std::shared_ptr<arangodb::velocypack::Builder> const& result,
std::shared_ptr<arangodb::velocypack::Builder> const& stats,
std::vector<std::string>&& dataSources);
void store(TRI_vocbase_t* vocbase, uint64_t hash, QueryString const& queryString,
std::shared_ptr<arangodb::velocypack::Builder> const& result,
std::shared_ptr<arangodb::velocypack::Builder> const& stats,
std::vector<std::string>&& dataSources);
/// @brief store a query cache entry in the cache
void store(TRI_vocbase_t* vocbase, std::unique_ptr<QueryCacheResultEntry> entry);
void store(TRI_vocbase_t* vocbase, std::shared_ptr<QueryCacheResultEntry> entry);
/// @brief invalidate all queries for the given data sources
void invalidate(TRI_vocbase_t* vocbase, std::vector<std::string> const& dataSources);
@ -208,12 +173,6 @@ class QueryCache {
/// @brief get the pointer to the global query cache
static QueryCache* instance();
/// @brief enforce maximum number of results in each database-specific cache
void enforceMaxResults(size_t);
/// @brief determine which part of the cache to use for the cache entries
unsigned int getPart(TRI_vocbase_t const*) const;
/// @brief invalidate all entries in the cache part
/// note that the caller of this method must hold the write lock
void invalidate(unsigned int);
@ -226,6 +185,15 @@ class QueryCache {
/// @brief enable or disable the query cache
void setMode(std::string const&);
private:
/// @brief enforce maximum number of results in each database-specific cache
/// must be called under the cache's properties lock
void enforceMaxResults(size_t);
/// @brief determine which part of the cache to use for the cache entries
unsigned int getPart(TRI_vocbase_t const*) const;
private:
/// @brief number of R/W locks for the query cache
@ -238,7 +206,7 @@ class QueryCache {
arangodb::basics::ReadWriteLock _entriesLock[numberOfParts];
/// @brief cached query entries, organized per database
std::unordered_map<TRI_vocbase_t*, QueryCacheDatabaseEntry*>
std::unordered_map<TRI_vocbase_t*, std::unique_ptr<QueryCacheDatabaseEntry>>
_entries[numberOfParts];
};
}

View File

@ -297,6 +297,8 @@ RestStatus RestCursorHandler::handleQueryResult() {
}
result.add(StaticStrings::Error, VPackValue(false));
result.add(StaticStrings::Code, VPackValue(static_cast<int>(ResponseCode::CREATED)));
} catch (std::exception const& ex) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}