1
0
Fork 0
arangodb/arangod/Aql/IResearchViewExecutor.cpp

1191 lines
41 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
/// @author Andrey Abramov
////////////////////////////////////////////////////////////////////////////////
#include "IResearchViewExecutor.h"
#include "Aql/ExecutionStats.h"
#include "Aql/Query.h"
#include "Aql/SingleRowFetcher.h"
#include "Aql/OutputAqlItemRow.h"
#include "IResearch/IResearchCommon.h"
#include "IResearch/IResearchDocument.h"
#include "IResearch/IResearchFilterFactory.h"
#include "IResearch/IResearchOrderFactory.h"
#include "IResearch/IResearchView.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionCollection.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h"
#include <analysis/token_attributes.hpp>
#include <search/boolean_filter.hpp>
#include <search/score.hpp>
// TODO Eliminate access to the plan if possible!
// I think it is used for two things only:
// - to get the Ast, which can simply be passed on its own, and
// - to call plan->getVarSetBy() in aql::Expression, which could be removed by
// passing (a reference to) plan->_varSetBy instead.
// But changing this, even only for the IResearch part, would involve more
// refactoring than I currently find appropriate for this.
#include "Aql/ExecutionPlan.h"
using namespace arangodb;
using namespace arangodb::aql;
template <typename Impl, typename Traits>
constexpr bool IResearchViewExecutorBase<Impl, Traits>::Properties::preservesOrder;
template <typename Impl, typename Traits>
constexpr BlockPassthrough IResearchViewExecutorBase<Impl, Traits>::Properties::allowsBlockPassthrough;
template <typename Impl, typename Traits>
constexpr bool IResearchViewExecutorBase<Impl, Traits>::Properties::inputSizeRestrictsOutputSize;
using namespace arangodb::iresearch;
namespace {
inline std::shared_ptr<arangodb::LogicalCollection> lookupCollection( // find collection
arangodb::transaction::Methods& trx, // transaction
TRI_voc_cid_t cid, // collection identifier
Query& query) {
TRI_ASSERT(trx.state());
// this is necessary for MMFiles
trx.pinData(cid);
// `Methods::documentCollection(TRI_voc_cid_t)` may throw exception
auto* collection = trx.state()->collection(cid, arangodb::AccessMode::Type::READ);
if (!collection) {
std::stringstream msg;
msg << "failed to find collection while reading document from arangosearch "
"view, cid '"
<< cid << "'";
query.registerWarning(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, msg.str());
return nullptr; // not a valid collection reference
}
return collection->collection();
}
inline irs::columnstore_reader::values_reader_f pkColumn(irs::sub_reader const& segment) {
auto const* reader = segment.column_reader(DocumentPrimaryKey::PK());
return reader ? reader->values() : irs::columnstore_reader::values_reader_f{};
}
inline irs::columnstore_reader::values_reader_f sortColumn(irs::sub_reader const& segment) {
auto const* reader = segment.sort();
return reader ? reader->values() : irs::columnstore_reader::values_reader_f{};
}
} // namespace
///////////////////////////////////////////////////////////////////////////////
/// --SECTION-- IResearchViewExecutorBase
///////////////////////////////////////////////////////////////////////////////
IResearchViewExecutorInfos::IResearchViewExecutorInfos(
ExecutorInfos&& infos, std::shared_ptr<const IResearchView::Snapshot> reader,
RegisterId firstOutputRegister, RegisterId numScoreRegisters, Query& query,
std::vector<Scorer> const& scorers,
std::pair<arangodb::iresearch::IResearchViewSort const*, size_t> const& sort,
ExecutionPlan const& plan, Variable const& outVariable,
aql::AstNode const& filterCondition, std::pair<bool, bool> volatility,
IResearchViewExecutorInfos::VarInfoMap const& varInfoMap, int depth)
: ExecutorInfos(std::move(infos)),
_firstOutputRegister(firstOutputRegister),
_numScoreRegisters(numScoreRegisters),
_reader(std::move(reader)),
_query(query),
_scorers(scorers),
_sort(sort),
_plan(plan),
_outVariable(outVariable),
_filterCondition(filterCondition),
_volatileSort(volatility.second),
// `_volatileSort` implies `_volatileFilter`
_volatileFilter(_volatileSort || volatility.first),
_varInfoMap(varInfoMap),
_depth(depth) {
TRI_ASSERT(_reader != nullptr);
TRI_ASSERT(getOutputRegisters()->find(firstOutputRegister) !=
getOutputRegisters()->end());
}
RegisterId IResearchViewExecutorInfos::getOutputRegister() const noexcept {
return _firstOutputRegister + getNumScoreRegisters();
}
RegisterId IResearchViewExecutorInfos::getNumScoreRegisters() const noexcept {
return _numScoreRegisters;
}
RegisterId IResearchViewExecutorInfos::getFirstScoreRegister() const noexcept {
TRI_ASSERT(getNumScoreRegisters() > 0);
return _firstOutputRegister;
}
std::shared_ptr<const arangodb::iresearch::IResearchView::Snapshot> IResearchViewExecutorInfos::getReader() const
noexcept {
return _reader;
}
Query& IResearchViewExecutorInfos::getQuery() const noexcept { return _query; }
const std::vector<arangodb::iresearch::Scorer>& IResearchViewExecutorInfos::scorers() const
noexcept {
return _scorers;
}
ExecutionPlan const& IResearchViewExecutorInfos::plan() const noexcept {
return _plan;
}
Variable const& IResearchViewExecutorInfos::outVariable() const noexcept {
return _outVariable;
}
aql::AstNode const& IResearchViewExecutorInfos::filterCondition() const noexcept {
return _filterCondition;
}
const IResearchViewExecutorInfos::VarInfoMap& IResearchViewExecutorInfos::varInfoMap() const
noexcept {
return _varInfoMap;
}
int IResearchViewExecutorInfos::getDepth() const noexcept { return _depth; }
bool IResearchViewExecutorInfos::volatileSort() const noexcept {
return _volatileSort;
}
bool IResearchViewExecutorInfos::volatileFilter() const noexcept {
return _volatileFilter;
}
const std::pair<const arangodb::iresearch::IResearchViewSort*, size_t>& IResearchViewExecutorInfos::sort() const
noexcept {
return _sort;
}
bool IResearchViewExecutorInfos::isScoreReg(RegisterId reg) const noexcept {
return getNumScoreRegisters() > 0 &&
getFirstScoreRegister() <= reg && reg < getFirstScoreRegister() + getNumScoreRegisters();
}
IResearchViewStats::IResearchViewStats() noexcept : _scannedIndex(0) {}
void IResearchViewStats::incrScanned() noexcept { _scannedIndex++; }
void IResearchViewStats::incrScanned(size_t value) noexcept {
_scannedIndex = _scannedIndex + value;
}
std::size_t IResearchViewStats::getScanned() const noexcept {
return _scannedIndex;
}
ExecutionStats& aql::operator+=(ExecutionStats& executionStats,
const IResearchViewStats& iResearchViewStats) noexcept {
executionStats.scannedIndex += iResearchViewStats.getScanned();
return executionStats;
}
///////////////////////////////////////////////////////////////////////////////
/// --SECTION-- IResearchViewExecutorBase
///////////////////////////////////////////////////////////////////////////////
template <typename Impl, typename Traits>
IndexIterator::DocumentCallback IResearchViewExecutorBase<Impl, Traits>::ReadContext::copyDocumentCallback(
ReadContext& ctx) {
auto* engine = EngineSelectorFeature::ENGINE;
TRI_ASSERT(engine);
typedef std::function<IndexIterator::DocumentCallback(ReadContext&)> CallbackFactory;
static CallbackFactory const callbackFactories[]{
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
AqlValue a{AqlValueHintCopy(doc.begin())};
bool mustDestroy = true;
AqlValueGuard guard{a, mustDestroy};
ctx.outputRow.moveValueInto(ctx.docOutReg, ctx.inputRow, guard);
return true;
};
},
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
AqlValue a{AqlValueHintDocumentNoCopy(doc.begin())};
bool mustDestroy = true;
AqlValueGuard guard{a, mustDestroy};
ctx.outputRow.moveValueInto(ctx.docOutReg, ctx.inputRow, guard);
return true;
};
}};
return callbackFactories[size_t(engine->useRawDocumentPointers())](ctx);
}
template <typename Impl, typename Traits>
IResearchViewExecutorBase<Impl, Traits>::ReadContext::ReadContext(
aql::RegisterId docOutReg, InputAqlItemRow& inputRow, OutputAqlItemRow& outputRow)
: docOutReg(docOutReg),
inputRow(inputRow),
outputRow(outputRow),
callback(copyDocumentCallback(*this)) {}
template <typename Impl, typename Traits>
IResearchViewExecutorBase<Impl, Traits>::IndexReadBufferEntry::IndexReadBufferEntry(std::size_t keyIdx) noexcept
: _keyIdx(keyIdx) {}
template <typename Impl, typename Traits>
template <typename ValueType>
IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::IndexReadBuffer::ScoreIterator::ScoreIterator(
std::vector<AqlValue>& scoreBuffer, std::size_t keyIdx, std::size_t numScores) noexcept
: _scoreBuffer(scoreBuffer), _scoreBaseIdx(keyIdx * numScores), _numScores(numScores) {
TRI_ASSERT(_scoreBaseIdx + _numScores <= _scoreBuffer.size());
}
template <typename Impl, typename Traits>
template <typename ValueType>
std::vector<AqlValue>::iterator IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<
ValueType>::IndexReadBuffer::ScoreIterator::begin() noexcept {
return _scoreBuffer.begin() + _scoreBaseIdx;
}
template <typename Impl, typename Traits>
template <typename ValueType>
std::vector<AqlValue>::iterator IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<
ValueType>::IndexReadBuffer::ScoreIterator::end() noexcept {
return _scoreBuffer.begin() + _scoreBaseIdx + _numScores;
}
template <typename Impl, typename Traits>
template <typename ValueType>
IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::IndexReadBuffer(std::size_t const numScoreRegisters)
: _keyBuffer(), _scoreBuffer(), _numScoreRegisters(numScoreRegisters), _keyBaseIdx(0) {}
template <typename Impl, typename Traits>
template <typename ValueType>
ValueType const& IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::getValue(
const IResearchViewExecutorBase::IndexReadBufferEntry bufferEntry) const noexcept {
assertSizeCoherence();
TRI_ASSERT(bufferEntry._keyIdx < _keyBuffer.size());
return _keyBuffer[bufferEntry._keyIdx];
}
template <typename Impl, typename Traits>
template <typename ValueType>
typename IResearchViewExecutorBase<Impl, Traits>::template IndexReadBuffer<ValueType>::ScoreIterator
IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::getScores(
const IResearchViewExecutorBase::IndexReadBufferEntry bufferEntry) noexcept {
assertSizeCoherence();
return ScoreIterator{_scoreBuffer, bufferEntry._keyIdx, _numScoreRegisters};
}
template <typename Impl, typename Traits>
template <typename ValueType>
template <typename... Args>
void IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::pushValue(Args&&... args) {
_keyBuffer.emplace_back(std::forward<Args>(args)...);
}
template <typename Impl, typename Traits>
template <typename ValueType>
void IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::pushScore(float_t const scoreValue) {
_scoreBuffer.emplace_back(AqlValueHintDouble{scoreValue});
}
template <typename Impl, typename Traits>
template <typename ValueType>
void IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::pushScoreNone() {
_scoreBuffer.emplace_back();
}
template <typename Impl, typename Traits>
template <typename ValueType>
void IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::reset() noexcept {
// Should only be called after everything was consumed
TRI_ASSERT(empty());
_keyBaseIdx = 0;
_keyBuffer.clear();
_scoreBuffer.clear();
}
template <typename Impl, typename Traits>
template <typename ValueType>
std::size_t IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::size() const
noexcept {
assertSizeCoherence();
return _keyBuffer.size() - _keyBaseIdx;
}
template <typename Impl, typename Traits>
template <typename ValueType>
bool IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::empty() const
noexcept {
return size() == 0;
}
template <typename Impl, typename Traits>
template <typename ValueType>
typename IResearchViewExecutorBase<Impl, Traits>::IndexReadBufferEntry
IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::pop_front() noexcept {
TRI_ASSERT(!empty());
TRI_ASSERT(_keyBaseIdx < _keyBuffer.size());
assertSizeCoherence();
IndexReadBufferEntry entry{_keyBaseIdx};
++_keyBaseIdx;
return entry;
}
template <typename Impl, typename Traits>
template <typename ValueType>
void IResearchViewExecutorBase<Impl, Traits>::IndexReadBuffer<ValueType>::assertSizeCoherence() const
noexcept {
TRI_ASSERT(_scoreBuffer.size() == _keyBuffer.size() * _numScoreRegisters);
}
template <typename Impl, typename Traits>
IResearchViewExecutorBase<Impl, Traits>::IResearchViewExecutorBase(
IResearchViewExecutorBase::Fetcher& fetcher, IResearchViewExecutorBase::Infos& infos)
: _infos(infos),
_fetcher(fetcher),
_inputRow(CreateInvalidInputRowHint{}),
_upstreamState(ExecutionState::HASMORE),
_indexReadBuffer(_infos.getNumScoreRegisters()),
_filterCtx(1), // arangodb::iresearch::ExpressionExecutionContext
_ctx(&infos.getQuery(), infos.numberOfOutputRegisters(),
infos.outVariable(), infos.varInfoMap(), infos.getDepth()),
_reader(infos.getReader()),
_filter(irs::filter::prepared::empty()),
_execCtx(*infos.getQuery().trx(), _ctx),
_inflight(0),
_hasMore(true), // has more data initially
_isInitialized(false) {
TRI_ASSERT(infos.getQuery().trx() != nullptr);
// add expression execution context
_filterCtx.emplace(_execCtx);
}
template <typename Impl, typename Traits>
std::pair<ExecutionState, typename IResearchViewExecutorBase<Impl, Traits>::Stats>
IResearchViewExecutorBase<Impl, Traits>::produceRows(OutputAqlItemRow& output) {
IResearchViewStats stats{};
bool documentWritten = false;
while (!documentWritten) {
if (!_inputRow.isInitialized()) {
if (_upstreamState == ExecutionState::DONE) {
// There will be no more rows, stop fetching.
return {ExecutionState::DONE, stats};
}
std::tie(_upstreamState, _inputRow) = _fetcher.fetchRow();
if (_upstreamState == ExecutionState::WAITING) {
return {_upstreamState, stats};
}
if (!_inputRow.isInitialized()) {
return {ExecutionState::DONE, stats};
}
// reset must be called exactly after we've got a new and valid input row.
static_cast<Impl&>(*this).reset();
}
ReadContext ctx(infos().getOutputRegister(), _inputRow, output);
documentWritten = next(ctx);
if (documentWritten) {
stats.incrScanned();
} else {
_inputRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
// no document written, repeat.
}
}
return {ExecutionState::HASMORE, stats};
}
template <typename Impl, typename Traits>
std::tuple<ExecutionState, typename IResearchViewExecutorBase<Impl, Traits>::Stats, size_t>
IResearchViewExecutorBase<Impl, Traits>::skipRows(size_t toSkip) {
TRI_ASSERT(_indexReadBuffer.empty());
IResearchViewStats stats{};
size_t skipped = 0;
auto& impl = static_cast<Impl&>(*this);
if (!_inputRow.isInitialized()) {
if (_upstreamState == ExecutionState::DONE) {
// There will be no more rows, stop fetching.
return std::make_tuple(ExecutionState::DONE, stats,
0); // tupple, cannot use initializer list due to build failure
}
std::tie(_upstreamState, _inputRow) = _fetcher.fetchRow();
if (_upstreamState == ExecutionState::WAITING) {
return std::make_tuple(_upstreamState, stats, 0); // tupple, cannot use initializer list due to build failure
}
if (!_inputRow.isInitialized()) {
return std::make_tuple(ExecutionState::DONE, stats,
0); // tupple, cannot use initializer list due to build failure
}
// reset must be called exactly after we've got a new and valid input row.
impl.reset();
}
TRI_ASSERT(_inputRow.isInitialized());
skipped = static_cast<Impl&>(*this).skip(toSkip);
TRI_ASSERT(_indexReadBuffer.empty());
stats.incrScanned(skipped);
if (skipped < toSkip) {
_inputRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
}
return std::make_tuple(ExecutionState::HASMORE, stats,
skipped); // tupple, cannot use initializer list due to build failure
}
template <typename Impl, typename Traits>
bool IResearchViewExecutorBase<Impl, Traits>::next(ReadContext& ctx) {
auto& impl = static_cast<Impl&>(*this);
while (true) {
if (_indexReadBuffer.empty()) {
impl.fillBuffer(ctx);
}
if (_indexReadBuffer.empty()) {
return false;
}
IndexReadBufferEntry bufferEntry = _indexReadBuffer.pop_front();
if (impl.writeRow(ctx, bufferEntry)) {
break;
} else {
// to get correct stats we should continue looking for
// other documents inside this one call
LOG_TOPIC("550cd", TRACE, arangodb::iresearch::TOPIC)
<< "failed to write row in node executor";
}
}
return true;
}
template <typename Impl, typename Traits>
typename IResearchViewExecutorBase<Impl, Traits>::Infos const&
IResearchViewExecutorBase<Impl, Traits>::infos() const noexcept {
return _infos;
}
template <typename Impl, typename Traits>
void IResearchViewExecutorBase<Impl, Traits>::fillScores(ReadContext const& ctx,
float_t const* begin,
float_t const* end) {
TRI_ASSERT(Traits::Ordered);
// scorer registers are placed right before document output register
// is used here currently only for assertions.
RegisterId scoreReg = infos().getFirstScoreRegister();
// copy scores, registerId's are sequential
for (; begin != end; ++begin, ++scoreReg) {
TRI_ASSERT(infos().isScoreReg(scoreReg));
_indexReadBuffer.pushScore(*begin);
}
// We should either have no _scrVals to evaluate, or all
TRI_ASSERT(begin == nullptr || !infos().isScoreReg(scoreReg));
while (infos().isScoreReg(scoreReg)) {
_indexReadBuffer.pushScoreNone();
++scoreReg;
}
// we should have written exactly all score registers by now
TRI_ASSERT(!infos().isScoreReg(scoreReg));
TRI_ASSERT(scoreReg == infos().getOutputRegister());
}
template <typename Impl, typename Traits>
void IResearchViewExecutorBase<Impl, Traits>::reset() {
_ctx._inputRow = _inputRow;
ExecutionPlan const* plan = &infos().plan();
QueryContext const queryCtx = {infos().getQuery().trx(), plan, plan->getAst(),
&_ctx, &infos().outVariable()};
if (infos().volatileFilter() || !_isInitialized) { // `_volatileSort` implies `_volatileFilter`
irs::Or root;
auto rv = FilterFactory::filter(&root, queryCtx, infos().filterCondition());
if (rv.fail()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
rv.errorNumber(),
"failed to build filter while querying arangosearch view, query '" +
infos().filterCondition().toVelocyPack(true)->toJson() +
"': " + rv.errorMessage());
}
if (infos().volatileSort() || !_isInitialized) {
irs::order order;
irs::sort::ptr scorer;
for (auto const& scorerNode : infos().scorers()) {
TRI_ASSERT(scorerNode.node);
if (!OrderFactory::scorer(&scorer, *scorerNode.node, queryCtx)) {
// failed to append sort
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
}
// sorting order doesn't matter
order.add(true, std::move(scorer));
}
// compile order
_order = order.prepare();
}
// compile filter
_filter = root.prepare(*_reader, _order, irs::no_boost(), _filterCtx);
_isInitialized = true;
}
}
template<typename Impl, typename Traits>
bool IResearchViewExecutorBase<Impl, Traits>::writeLocalDocumentId(
ReadContext& ctx,
LocalDocumentId const& documentId,
LogicalCollection const& collection) {
// we will need collection Id also as View could produce documents from multiple collections
if (ADB_LIKELY(documentId.isSet())) {
{
// For sake of performance we store raw pointer to collection
// It is safe as pipeline work inside one process
static_assert(sizeof(void*) <= sizeof(uint64_t), "Pointer not fits in uint64_t");
AqlValue a(AqlValueHintUInt(reinterpret_cast<uint64_t>(&collection)));
bool mustDestroy = true;
AqlValueGuard guard{ a, mustDestroy };
ctx.outputRow.moveValueInto(ctx.getNmColPtrOutReg(), ctx.inputRow, guard);
}
{
AqlValue a(AqlValueHintUInt(documentId.id()));
bool mustDestroy = true;
AqlValueGuard guard{ a, mustDestroy };
ctx.outputRow.moveValueInto(ctx.getNmDocIdOutReg(), ctx.inputRow, guard);
}
return true;
} else {
return false;
}
}
template<typename Impl, typename Traits>
bool IResearchViewExecutorBase<Impl, Traits>::writeRow(ReadContext& ctx,
IndexReadBufferEntry bufferEntry,
LocalDocumentId const& documentId,
LogicalCollection const& collection) {
TRI_ASSERT(documentId.isSet());
bool writeDocOk;
if (Traits::Materialized) {
// read document from underlying storage engine, if we got an id
writeDocOk = collection.readDocumentWithCallback(infos().getQuery().trx(), documentId, ctx.callback);
} else {
// no need to look into collection. Somebody down the stream will do materialization. Just emit LocalDocumentIds
writeDocOk = writeLocalDocumentId(ctx, documentId, collection);
}
if (writeDocOk) {
// in the ordered case we have to write scores as well as a document
if /* constexpr */ (Traits::Ordered) {
// scorer register are placed right before the document output register
RegisterId scoreReg = infos().getFirstScoreRegister();
for (auto& it : _indexReadBuffer.getScores(bufferEntry)) {
TRI_ASSERT(infos().isScoreReg(scoreReg));
bool mustDestroy = false;
AqlValueGuard guard{it, mustDestroy};
ctx.outputRow.moveValueInto(scoreReg, ctx.inputRow, guard);
++scoreReg;
}
// we should have written exactly all score registers by now
TRI_ASSERT(!infos().isScoreReg(scoreReg));
}
return true;
}
return false;
}
///////////////////////////////////////////////////////////////////////////////
/// --SECTION-- IResearchViewExecutor
///////////////////////////////////////////////////////////////////////////////
template <bool ordered, bool materialized>
IResearchViewExecutor<ordered, materialized>::IResearchViewExecutor(Fetcher& fetcher, Infos& infos)
: Base(fetcher, infos),
_pkReader(),
_itr(),
_readerOffset(0),
_scr(&irs::score::no_score()),
_scrVal() {
TRI_ASSERT(ordered == (infos.getNumScoreRegisters() != 0));
}
template <bool ordered, bool materialized>
void IResearchViewExecutor<ordered, materialized>::evaluateScores(ReadContext const& ctx) {
// This must not be called in the unordered case.
TRI_ASSERT(ordered);
// evaluate scores
_scr->evaluate();
// in arangodb we assume all scorers return float_t
auto begin = reinterpret_cast<const float_t*>(_scrVal.begin());
auto end = reinterpret_cast<const float_t*>(_scrVal.end());
this->fillScores(ctx, begin, end);
}
template<bool ordered, bool materialized>
bool IResearchViewExecutor<ordered, materialized>::readPK(LocalDocumentId& documentId) {
TRI_ASSERT(!documentId.isSet());
TRI_ASSERT(_itr);
TRI_ASSERT(_doc);
TRI_ASSERT(_pkReader);
if (_itr->next()) {
if (_pkReader(_doc->value, this->_pk)) {
bool const readSuccess = DocumentPrimaryKey::read(documentId, this->_pk);
TRI_ASSERT(readSuccess == documentId.isSet());
if (!readSuccess) {
LOG_TOPIC("6442f", WARN, arangodb::iresearch::TOPIC)
<< "failed to read document primary key while reading document "
"from arangosearch view, doc_id '"
<< _doc->value << "'";
}
}
return true;
}
return false;
}
template <bool ordered, bool materialized>
void IResearchViewExecutor<ordered, materialized>::fillBuffer(IResearchViewExecutor::ReadContext& ctx) {
TRI_ASSERT(this->_filter != nullptr);
std::size_t const atMost = ctx.outputRow.numRowsLeft();
size_t const count = this->_reader->size();
for (; _readerOffset < count;) {
if (!_itr) {
if (!this->_indexReadBuffer.empty()) {
// We may not reset the iterator and continue with the next reader if we
// still have documents in the buffer, as the cid changes with each
// reader.
break;
}
if (!resetIterator()) {
continue;
}
// CID is constant until the next resetIterator(). Save the corresponding
// collection so we don't have to look it up every time.
TRI_voc_cid_t const cid = this->_reader->cid(_readerOffset);
Query& query = this->infos().getQuery();
auto collection = lookupCollection(*query.trx(), cid, query);
if (!collection) {
std::stringstream msg;
msg << "failed to find collection while reading document from "
"arangosearch view, cid '"
<< cid << "'";
query.registerWarning(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, msg.str());
// We don't have a collection, skip the current reader.
++_readerOffset;
_itr.reset();
_doc = nullptr;
continue;
}
_collection = collection.get();
this->_indexReadBuffer.reset();
}
TRI_ASSERT(_pkReader);
LocalDocumentId documentId;
// try to read a document PK from iresearch
bool const iteratorExhausted = !readPK(documentId);
if (!documentId.isSet()) {
// No document read, we cannot write it.
if (iteratorExhausted) {
// The iterator is exhausted, we need to continue with the next reader.
++_readerOffset;
_itr.reset();
_doc = nullptr;
}
continue;
}
// The CID must stay the same for all documents in the buffer
TRI_ASSERT(this->_collection->id() == this->_reader->cid(_readerOffset));
this->_indexReadBuffer.pushValue(documentId);
// in the ordered case we have to write scores as well as a document
if /* constexpr */ (ordered) {
// Writes into _scoreBuffer
evaluateScores(ctx);
}
// doc and scores are both pushed, sizes must now be coherent
this->_indexReadBuffer.assertSizeCoherence();
if (iteratorExhausted) {
// The iterator is exhausted, we need to continue with the next reader.
++_readerOffset;
_itr.reset();
_doc = nullptr;
// Here we have at least one document in _indexReadBuffer, so we may not
// add documents from a new reader.
break;
}
if (this->_indexReadBuffer.size() >= atMost) {
break;
}
}
}
template <bool ordered, bool materialized>
bool IResearchViewExecutor<ordered, materialized>::resetIterator() {
TRI_ASSERT(this->_filter);
TRI_ASSERT(!_itr);
auto& segmentReader = (*this->_reader)[_readerOffset];
_pkReader = ::pkColumn(segmentReader);
if (!_pkReader) {
LOG_TOPIC("bd01b", WARN, arangodb::iresearch::TOPIC)
<< "encountered a sub-reader without a primary key column while "
"executing a query, ignoring";
return false;
}
_itr = segmentReader.mask(
this->_filter->execute(segmentReader, this->_order, this->_filterCtx));
TRI_ASSERT(_itr);
_doc = _itr->attributes().get<irs::document>().get();
TRI_ASSERT(_doc);
if /* constexpr */ (ordered) {
_scr = _itr->attributes().get<irs::score>().get();
if (_scr) {
_scrVal = _scr->value();
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto const numScores =
static_cast<size_t>(std::distance(_scrVal.begin(), _scrVal.end())) /
sizeof(float_t);
TRI_ASSERT(numScores == this->infos().scorers().size());
#endif
} else {
_scr = &irs::score::no_score();
_scrVal = irs::bytes_ref::NIL;
}
}
return true;
}
template <bool ordered, bool materialized>
void IResearchViewExecutor<ordered, materialized>::reset() {
Base::reset();
// reset iterator state
_itr.reset();
_doc = nullptr;
_readerOffset = 0;
}
template <bool ordered, bool materialized>
size_t IResearchViewExecutor<ordered, materialized>::skip(size_t limit) {
TRI_ASSERT(this->_indexReadBuffer.empty());
TRI_ASSERT(this->_filter);
size_t const toSkip = limit;
for (size_t count = this->_reader->size(); _readerOffset < count;) {
if (!_itr && !resetIterator()) {
continue;
}
while (limit && _itr->next()) {
--limit;
}
if (!limit) {
break; // do not change iterator if already reached limit
}
++_readerOffset;
_itr.reset();
_doc = nullptr;
}
// We're in the middle of a reader, save the collection in case produceRows()
// needs it.
if (_itr) {
// CID is constant until the next resetIterator(). Save the corresponding
// collection so we don't have to look it up every time.
TRI_voc_cid_t const cid = this->_reader->cid(_readerOffset);
Query& query = this->infos().getQuery();
auto collection = lookupCollection(*query.trx(), cid, query);
if (!collection) {
std::stringstream msg;
msg << "failed to find collection while reading document from "
"arangosearch view, cid '"
<< cid << "'";
query.registerWarning(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, msg.str());
// We don't have a collection, skip the current reader.
++_readerOffset;
_itr.reset();
_doc = nullptr;
}
this->_indexReadBuffer.reset();
_collection = collection.get();
}
return toSkip - limit;
}
template <bool ordered, bool materialized>
bool IResearchViewExecutor<ordered, materialized>::writeRow(IResearchViewExecutor::ReadContext& ctx,
IResearchViewExecutor::IndexReadBufferEntry bufferEntry) {
TRI_ASSERT(_collection);
return Base::writeRow(ctx, bufferEntry,
this->_indexReadBuffer.getValue(bufferEntry), *_collection);
}
///////////////////////////////////////////////////////////////////////////////
/// --SECTION-- IResearchViewMergeExecutor
///////////////////////////////////////////////////////////////////////////////
template <bool ordered, bool materialized>
IResearchViewMergeExecutor<ordered, materialized>::IResearchViewMergeExecutor(Fetcher& fetcher, Infos& infos)
: Base{fetcher, infos},
_heap_it{MinHeapContext{*infos.sort().first, infos.sort().second, _segments}} {
TRI_ASSERT(infos.sort().first);
TRI_ASSERT(!infos.sort().first->empty());
TRI_ASSERT(infos.sort().first->size() >= infos.sort().second);
TRI_ASSERT(infos.sort().second);
TRI_ASSERT(ordered == (infos.getNumScoreRegisters() != 0));
}
template <bool ordered, bool materialized>
IResearchViewMergeExecutor<ordered, materialized>::Segment::Segment(
irs::doc_iterator::ptr&& docs, irs::document const& doc,
irs::score const& score, LogicalCollection const& collection,
irs::columnstore_reader::values_reader_f&& sortReader,
irs::columnstore_reader::values_reader_f&& pkReader) noexcept
: docs(std::move(docs)),
doc(&doc),
score(&score),
collection(&collection),
sortReader(std::move(sortReader)),
pkReader(std::move(pkReader)) {
TRI_ASSERT(this->docs);
TRI_ASSERT(this->doc);
TRI_ASSERT(this->score);
TRI_ASSERT(this->collection);
TRI_ASSERT(this->pkReader);
}
template <bool ordered, bool materialized>
IResearchViewMergeExecutor<ordered, materialized>::MinHeapContext::MinHeapContext(
const IResearchViewSort& sort, size_t sortBuckets, std::vector<Segment>& segments) noexcept
: _less(sort, sortBuckets), _segments(&segments) {}
template <bool ordered, bool materialized>
bool IResearchViewMergeExecutor<ordered, materialized>::MinHeapContext::operator()(const size_t i) const {
assert(i < _segments->size());
auto& segment = (*_segments)[i];
while (segment.docs->next()) {
auto const doc = segment.docs->value();
if (segment.sortReader(doc, segment.sortValue)) {
return true;
}
// FIXME read pk as well
}
return false;
}
template <bool ordered, bool materialized>
bool IResearchViewMergeExecutor<ordered, materialized>::MinHeapContext::operator()(const size_t lhs,
const size_t rhs) const {
assert(lhs < _segments->size());
assert(rhs < _segments->size());
return _less((*_segments)[rhs].sortValue, (*_segments)[lhs].sortValue);
}
template <bool ordered, bool materialized>
void IResearchViewMergeExecutor<ordered, materialized>::evaluateScores(ReadContext const& ctx,
irs::score const& score) {
// This must not be called in the unordered case.
TRI_ASSERT(ordered);
// evaluate scores
score.evaluate();
// in arangodb we assume all scorers return float_t
auto& scrVal = score.value();
auto begin = reinterpret_cast<const float_t*>(scrVal.c_str());
auto end = reinterpret_cast<const float_t*>(scrVal.c_str() + scrVal.size());
this->fillScores(ctx, begin, end);
}
template <bool ordered, bool materialized>
void IResearchViewMergeExecutor<ordered, materialized>::reset() {
Base::reset();
_segments.clear();
_segments.reserve(this->_reader->size());
for (size_t i = 0, size = this->_reader->size(); i < size; ++i) {
auto& segment = (*this->_reader)[i];
auto sortReader = ::sortColumn(segment);
if (!sortReader) {
LOG_TOPIC("af4cd", WARN, arangodb::iresearch::TOPIC)
<< "encountered a sub-reader without a sort column while "
"executing a query, ignoring";
continue;
}
irs::doc_iterator::ptr it =
segment.mask(this->_filter->execute(segment, this->_order, this->_filterCtx));
TRI_ASSERT(it);
auto const* doc = it->attributes().get<irs::document>().get();
TRI_ASSERT(doc);
auto const* score = &irs::score::no_score();
if /* constexpr */ (ordered) {
auto& scoreRef = it->attributes().get<irs::score>();
if (scoreRef) {
score = scoreRef.get();
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
irs::bytes_ref const scrVal = score->value();
auto const numScores =
static_cast<size_t>(std::distance(scrVal.begin(), scrVal.end())) /
sizeof(float_t);
TRI_ASSERT(numScores == this->infos().scorers().size());
#endif
}
}
TRI_voc_cid_t const cid = this->_reader->cid(i);
Query& query = this->infos().getQuery();
auto collection = lookupCollection(*query.trx(), cid, query);
if (!collection) {
std::stringstream msg;
msg << "failed to find collection while reading document from "
"arangosearch view, cid '"
<< cid << "'";
query.registerWarning(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, msg.str());
// We don't have a collection, skip the current segment.
continue;
}
auto pkReader = ::pkColumn(segment);
if (!pkReader) {
LOG_TOPIC("ee041", WARN, arangodb::iresearch::TOPIC)
<< "encountered a sub-reader without a primary key column while "
"executing a query, ignoring";
continue;
}
_segments.emplace_back(std::move(it), *doc, *score, *collection,
std::move(sortReader), std::move(pkReader));
}
_heap_it.reset(_segments.size());
}
template <bool ordered, bool materialized>
LocalDocumentId IResearchViewMergeExecutor<ordered, materialized>::readPK(
IResearchViewMergeExecutor<ordered, materialized>::Segment const& segment) {
LocalDocumentId documentId;
if (segment.pkReader(segment.doc->value, this->_pk)) {
bool const readSuccess = DocumentPrimaryKey::read(documentId, this->_pk);
TRI_ASSERT(readSuccess == documentId.isSet());
if (!readSuccess) {
LOG_TOPIC("6423f", WARN, arangodb::iresearch::TOPIC)
<< "failed to read document primary key while reading document "
"from arangosearch view, doc_id '"
<< segment.doc->value << "'";
}
}
return documentId;
}
template <bool ordered, bool materialized>
void IResearchViewMergeExecutor<ordered, materialized>::fillBuffer(ReadContext& ctx) {
TRI_ASSERT(this->_filter != nullptr);
std::size_t const atMost = ctx.outputRow.numRowsLeft();
while (_heap_it.next()) {
auto& segment = _segments[_heap_it.value()];
TRI_ASSERT(segment.docs);
TRI_ASSERT(segment.doc);
TRI_ASSERT(segment.score);
TRI_ASSERT(segment.collection);
TRI_ASSERT(segment.pkReader);
// try to read a document PK from iresearch
LocalDocumentId const documentId = readPK(segment);
if (!documentId.isSet()) {
// No document read, we cannot write it.
continue;
}
this->_indexReadBuffer.pushValue(documentId, segment.collection);
// in the ordered case we have to write scores as well as a document
if /* constexpr */ (ordered) {
// Writes into _scoreBuffer
evaluateScores(ctx, *segment.score);
}
// doc and scores are both pushed, sizes must now be coherent
this->_indexReadBuffer.assertSizeCoherence();
if (this->_indexReadBuffer.size() >= atMost) {
break;
}
}
}
template <bool ordered, bool materialized>
size_t IResearchViewMergeExecutor<ordered, materialized>::skip(size_t limit) {
TRI_ASSERT(this->_indexReadBuffer.empty());
TRI_ASSERT(this->_filter != nullptr);
size_t const toSkip = limit;
while (limit && _heap_it.next()) {
--limit;
}
return toSkip - limit;
}
template <bool ordered, bool materialized>
bool IResearchViewMergeExecutor<ordered, materialized>::writeRow(
IResearchViewMergeExecutor::ReadContext& ctx,
IResearchViewMergeExecutor::IndexReadBufferEntry bufferEntry) {
auto const& id = this->_indexReadBuffer.getValue(bufferEntry);
LocalDocumentId const& documentId = id.first;
TRI_ASSERT(documentId.isSet());
LogicalCollection const* collection = id.second;
TRI_ASSERT(collection);
return Base::writeRow(ctx, bufferEntry, documentId, *collection);
}
///////////////////////////////////////////////////////////////////////////////
/// --SECTION-- explicit template instantiation
///////////////////////////////////////////////////////////////////////////////
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewExecutor<false, true>>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewExecutor<true, true>>;
template class ::arangodb::aql::IResearchViewExecutor<false, true>;
template class ::arangodb::aql::IResearchViewExecutor<true, true>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewMergeExecutor<false, true>>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewMergeExecutor<true, true>>;
template class ::arangodb::aql::IResearchViewMergeExecutor<false, true>;
template class ::arangodb::aql::IResearchViewMergeExecutor<true, true>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewExecutor<false, false>>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewExecutor<true, false>>;
template class ::arangodb::aql::IResearchViewExecutor<false, false>;
template class ::arangodb::aql::IResearchViewExecutor<true, false>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewMergeExecutor<false, false>>;
template class ::arangodb::aql::IResearchViewExecutorBase<::arangodb::aql::IResearchViewMergeExecutor<true, false>>;
template class ::arangodb::aql::IResearchViewMergeExecutor<false, false>;
template class ::arangodb::aql::IResearchViewMergeExecutor<true, false>;