diff --git a/arangod/Aql/AqlItemBlock.cpp b/arangod/Aql/AqlItemBlock.cpp index f975a58c81..56d15f7235 100644 --- a/arangod/Aql/AqlItemBlock.cpp +++ b/arangod/Aql/AqlItemBlock.cpp @@ -287,6 +287,28 @@ void AqlItemBlock::shrink(size_t nrItems) { } decreaseMemoryUsage(sizeof(AqlValue) * (_nrItems - nrItems) * _nrRegs); + + for (size_t i = _nrItems * _nrRegs; i < _data.size(); ++i) { + AqlValue& a = _data[i]; + if (a.requiresDestruction()) { + auto it = _valueCount.find(a); + + if (it != _valueCount.end()) { + TRI_ASSERT((*it).second > 0); + + if (--((*it).second) == 0) { + decreaseMemoryUsage(a.memoryUsage()); + a.destroy(); + try { + _valueCount.erase(it); + continue; // no need for an extra a.erase() here + } catch (...) { + } + } + } + } + a.erase(); + } // adjust the size of the block _nrItems = nrItems; @@ -308,7 +330,7 @@ void AqlItemBlock::rescale(size_t nrItems, RegisterId nrRegs) { // way; because currently, we are tracking the memory we need, instead of the // memory we have. if (targetSize > _data.size()) { - _data.resize(targetSize); + _data.resize(targetSize); } TRI_ASSERT(targetSize <= _data.size()); diff --git a/arangod/Aql/AqlItemBlock.h b/arangod/Aql/AqlItemBlock.h index 294d17ad8d..87bb77dcf7 100644 --- a/arangod/Aql/AqlItemBlock.h +++ b/arangod/Aql/AqlItemBlock.h @@ -241,6 +241,7 @@ class AqlItemBlock { if (_data[fromRow * _nrRegs + i].requiresDestruction()) { ++_valueCount[_data[fromRow * _nrRegs + i]]; } + TRI_ASSERT(_data[currentRow * _nrRegs + i].isEmpty()); _data[currentRow * _nrRegs + i] = _data[fromRow * _nrRegs + i]; } } diff --git a/arangod/Aql/AqlItemBlockShell.h b/arangod/Aql/AqlItemBlockShell.h index 37773346ea..1a5883b7c7 100644 --- a/arangod/Aql/AqlItemBlockShell.h +++ b/arangod/Aql/AqlItemBlockShell.h @@ -69,8 +69,8 @@ class AqlItemBlockShell { public: using SmartAqlItemBlockPtr = std::unique_ptr; - AqlItemBlock const& block() const { return *_block; }; - AqlItemBlock& block() { return *_block; }; + inline AqlItemBlock const& block() const { return *_block; } + inline AqlItemBlock& block() { return *_block; } AqlItemBlockShell(AqlItemBlockManager& manager, std::unique_ptr block); @@ -89,7 +89,6 @@ class AqlItemBlockShell { SmartAqlItemBlockPtr _block; }; - } // namespace aql } // namespace arangodb diff --git a/arangod/Aql/BlockFetcher.cpp b/arangod/Aql/BlockFetcher.cpp index 209b04d013..b5bc16f29b 100644 --- a/arangod/Aql/BlockFetcher.cpp +++ b/arangod/Aql/BlockFetcher.cpp @@ -30,9 +30,9 @@ ExecutionState BlockFetcher::prefetchBlock(size_t atMost) { ExecutionState state; std::unique_ptr block; std::tie(state, block) = upstreamBlock().getSome(atMost); - TRI_IF_FAILURE("ExecutionBlock::getBlock") { - THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); - } + TRI_IF_FAILURE("ExecutionBlock::getBlock") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); + } if (state == ExecutionState::WAITING) { TRI_ASSERT(block == nullptr); @@ -56,7 +56,7 @@ ExecutionState BlockFetcher::prefetchBlock(size_t atMost) { _blockShellPassThroughQueue.push({state, blockShell}); } - _blockShellQueue.push({state, blockShell}); + _blockShellQueue.push({state, std::move(blockShell)}); return ExecutionState::HASMORE; } @@ -73,6 +73,8 @@ BlockFetcher::fetchBlock(size_t atMost) { } TRI_ASSERT(state == ExecutionState::HASMORE); } + + TRI_ASSERT(!_blockShellQueue.empty()); ExecutionState state; std::shared_ptr blockShell; @@ -81,7 +83,7 @@ BlockFetcher::fetchBlock(size_t atMost) { //auto inputBlockShell = // std::make_shared(blockShell, _inputRegisters); - return {state, blockShell}; + return {state, std::move(blockShell)}; } template @@ -99,6 +101,8 @@ BlockFetcher::fetchBlockForPassthrough(size_t atMost) { TRI_ASSERT(state == ExecutionState::HASMORE); } + TRI_ASSERT(!_blockShellPassThroughQueue.empty()); + ExecutionState state; std::shared_ptr blockShell; std::tie(state, blockShell) = _blockShellPassThroughQueue.front(); diff --git a/arangod/Aql/DistinctCollectExecutor.cpp b/arangod/Aql/DistinctCollectExecutor.cpp index e25f5e8723..3f9636f3cc 100644 --- a/arangod/Aql/DistinctCollectExecutor.cpp +++ b/arangod/Aql/DistinctCollectExecutor.cpp @@ -55,13 +55,20 @@ DistinctCollectExecutorInfos::DistinctCollectExecutorInfos( } DistinctCollectExecutor::DistinctCollectExecutor(Fetcher& fetcher, Infos& infos) - : _infos(infos), _fetcher(fetcher) { - _seen = std::make_unique, AqlValueGroupHash, AqlValueGroupEqual>>( - 1024, - AqlValueGroupHash(_infos.getTransaction(), _infos.getGroupRegisters().size()), - AqlValueGroupEqual(_infos.getTransaction())); -}; -DistinctCollectExecutor::~DistinctCollectExecutor() = default; + : _infos(infos), _fetcher(fetcher), + _seen(1024, + AqlValueGroupHash(_infos.getTransaction(), _infos.getGroupRegisters().size()), + AqlValueGroupEqual(_infos.getTransaction())) { +} + +DistinctCollectExecutor::~DistinctCollectExecutor() { + // destroy all AqlValues captured + for (auto& it : _seen) { + for (auto& it2 : it) { + const_cast(&it2)->destroy(); + } + } +} std::pair DistinctCollectExecutor::produceRow(OutputAqlItemRow& output) { TRI_IF_FAILURE("DistinctCollectExecutor::produceRow") { @@ -95,9 +102,9 @@ std::pair DistinctCollectExecutor::produceRow(OutputAql } // now check if we already know this group - auto foundIt = _seen->find(groupValues); + auto foundIt = _seen.find(groupValues); - bool newGroup = foundIt == _seen->end(); + bool newGroup = foundIt == _seen.end(); if (newGroup) { size_t i = 0; @@ -112,7 +119,7 @@ std::pair DistinctCollectExecutor::produceRow(OutputAql for (auto const& it : groupValues) { copy.emplace_back(it.clone()); } - _seen->emplace(std::move(copy)); + _seen.emplace(std::move(copy)); } // Abort if upstream is done diff --git a/arangod/Aql/DistinctCollectExecutor.h b/arangod/Aql/DistinctCollectExecutor.h index 814e008f2b..0401b6ddd4 100644 --- a/arangod/Aql/DistinctCollectExecutor.h +++ b/arangod/Aql/DistinctCollectExecutor.h @@ -112,7 +112,7 @@ class DistinctCollectExecutor { private: Infos const& _infos; Fetcher& _fetcher; - std::unique_ptr, AqlValueGroupHash, AqlValueGroupEqual>> _seen; + std::unordered_set, AqlValueGroupHash, AqlValueGroupEqual> _seen; }; } // namespace aql diff --git a/arangod/Aql/EnumerateCollectionExecutor.cpp b/arangod/Aql/EnumerateCollectionExecutor.cpp index 59a91134c1..b0130d32bf 100644 --- a/arangod/Aql/EnumerateCollectionExecutor.cpp +++ b/arangod/Aql/EnumerateCollectionExecutor.cpp @@ -94,7 +94,7 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos _infos.getProjections(), _infos.getTrxPtr(), _infos.getCoveringIndexAttributePositions(), _allowCoveringIndexOptimization, _infos.getUseRawDocumentPointers())); -}; +} EnumerateCollectionExecutor::~EnumerateCollectionExecutor() = default; diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 976315a686..81d53e1e09 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -65,18 +65,11 @@ ExecutionBlockImpl::ExecutionBlockImpl(ExecutionEngine* engine, _rowFetcher(_blockFetcher), _infos(std::move(infos)), _executor(_rowFetcher, _infos), - _outputItemRow(nullptr), + _outputItemRow(), _query(*engine->getQuery()) {} template -ExecutionBlockImpl::~ExecutionBlockImpl() { - if (_outputItemRow) { - std::unique_ptr block = _outputItemRow->stealBlock(); - if (block != nullptr) { - _engine->itemBlockManager().returnBlock(std::move(block)); - } - } -} +ExecutionBlockImpl::~ExecutionBlockImpl() {} template std::pair> ExecutionBlockImpl::getSome(size_t atMost) { @@ -136,7 +129,7 @@ ExecutionBlockImpl::getSomeWithoutTrace(size_t atMost) { // Count global but executor-specific statistics, like number of filtered // rows. _engine->_stats += executorStats; - if (_outputItemRow && _outputItemRow->produced()) { + if (_outputItemRow->produced()) { _outputItemRow->advanceRow(); } @@ -152,7 +145,7 @@ ExecutionBlockImpl::getSomeWithoutTrace(size_t atMost) { auto outputBlock = _outputItemRow->stealBlock(); // This is not strictly necessary here, as we shouldn't be called again // after DONE. - _outputItemRow.reset(nullptr); + _outputItemRow.reset(); return {state, std::move(outputBlock)}; } @@ -169,7 +162,7 @@ ExecutionBlockImpl::getSomeWithoutTrace(size_t atMost) { // we guarantee that we do return a valid pointer in the HASMORE case. TRI_ASSERT(outputBlock != nullptr); // TODO OutputAqlItemRow could get "reset" and "isValid" methods and be reused - _outputItemRow.reset(nullptr); + _outputItemRow.reset(); return {state, std::move(outputBlock)}; } diff --git a/arangod/Aql/ExecutorExpressionContext.h b/arangod/Aql/ExecutorExpressionContext.h index 2d7aac17a8..16871dae97 100644 --- a/arangod/Aql/ExecutorExpressionContext.h +++ b/arangod/Aql/ExecutorExpressionContext.h @@ -34,7 +34,7 @@ class InputAqlItemRow; class ExecutorExpressionContext final : public QueryExpressionContext { public: - ExecutorExpressionContext(Query* query, InputAqlItemRow& inputRow, + ExecutorExpressionContext(Query* query, InputAqlItemRow const& inputRow, std::vector const& vars, std::vector const& regs) : QueryExpressionContext(query), _inputRow(inputRow), _vars(vars), _regs(regs) {} @@ -52,7 +52,7 @@ class ExecutorExpressionContext final : public QueryExpressionContext { private: /// @brief temporary storage for expression data context - InputAqlItemRow& _inputRow; + InputAqlItemRow const& _inputRow; std::vector const& _vars; std::vector const& _regs; }; diff --git a/arangod/Aql/FilterExecutor.cpp b/arangod/Aql/FilterExecutor.cpp index a11575ffe9..327f4673a2 100644 --- a/arangod/Aql/FilterExecutor.cpp +++ b/arangod/Aql/FilterExecutor.cpp @@ -49,10 +49,10 @@ FilterExecutorInfos::FilterExecutorInfos(RegisterId inputRegister, RegisterId nr std::move(registersToClear), std::move(registersToKeep)), _inputRegister(inputRegister) {} -FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) : _infos(infos), _fetcher(fetcher){}; +FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) : _infos(infos), _fetcher(fetcher) {} FilterExecutor::~FilterExecutor() = default; -std::pair FilterExecutor::produceRow(OutputAqlItemRow &output) { +std::pair FilterExecutor::produceRow(OutputAqlItemRow& output) { TRI_IF_FAILURE("FilterExecutor::produceRow") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } diff --git a/arangod/Aql/FilterExecutor.h b/arangod/Aql/FilterExecutor.h index 955c650cee..7e65959cab 100644 --- a/arangod/Aql/FilterExecutor.h +++ b/arangod/Aql/FilterExecutor.h @@ -54,7 +54,7 @@ class FilterExecutorInfos : public ExecutorInfos { FilterExecutorInfos(FilterExecutorInfos const&) = delete; ~FilterExecutorInfos() = default; - RegisterId getInputRegister() const noexcept { return _inputRegister; }; + RegisterId getInputRegister() const noexcept { return _inputRegister; } private: // This is exactly the value in the parent member ExecutorInfo::_inRegs, diff --git a/arangod/Aql/InputAqlItemRow.h b/arangod/Aql/InputAqlItemRow.h index 62efafd42a..7464b16939 100644 --- a/arangod/Aql/InputAqlItemRow.h +++ b/arangod/Aql/InputAqlItemRow.h @@ -88,12 +88,12 @@ class InputAqlItemRow { inline AqlValue stealValue(RegisterId registerId) { TRI_ASSERT(isInitialized()); TRI_ASSERT(registerId < getNrRegisters()); - AqlValue a = block().getValueReference(_baseIndex, registerId); + AqlValue const& a = block().getValueReference(_baseIndex, registerId); if (!a.isEmpty() && a.requiresDestruction()) { // Now no one is responsible for AqlValue a block().steal(a); } - // This cannot fail, caller needs to take immediate owner shops. + // This cannot fail, caller needs to take immediate ownership. return a; } diff --git a/arangod/Aql/OutputAqlItemRow.cpp b/arangod/Aql/OutputAqlItemRow.cpp index 6708e128ef..993b93afa2 100644 --- a/arangod/Aql/OutputAqlItemRow.cpp +++ b/arangod/Aql/OutputAqlItemRow.cpp @@ -58,9 +58,10 @@ OutputAqlItemRow::OutputAqlItemRow( TRI_ASSERT(_blockShell != nullptr); } -void OutputAqlItemRow::doCopyRow(const InputAqlItemRow& sourceRow, bool ignoreMissing) { +void OutputAqlItemRow::doCopyRow(InputAqlItemRow const& sourceRow, bool ignoreMissing) { // Note that _lastSourceRow is invalid right after construction. However, when // _baseIndex > 0, then we must have seen one row already. + TRI_ASSERT(!_doNotCopyInputRow); TRI_ASSERT(_baseIndex == 0 || _lastSourceRow.isInitialized()); bool mustClone = _baseIndex == 0 || _lastSourceRow != sourceRow; @@ -100,7 +101,7 @@ std::unique_ptr OutputAqlItemRow::stealBlock() { std::unique_ptr block = blockShell().stealBlockCompat(); if (numRowsWritten() == 0) { // blocks may not be empty - block.reset(nullptr); + block.reset(); } else { // numRowsWritten() returns the exact number of rows that were fully // written and takes into account whether the current row was written. diff --git a/arangod/Aql/OutputAqlItemRow.h b/arangod/Aql/OutputAqlItemRow.h index 655b0367b7..4a5f754361 100644 --- a/arangod/Aql/OutputAqlItemRow.h +++ b/arangod/Aql/OutputAqlItemRow.h @@ -52,6 +52,11 @@ class OutputAqlItemRow { std::shared_ptr const> registersToKeep, std::shared_ptr const> registersToClear, CopyRowBehaviour = CopyRowBehaviour::CopyInputRows); + + OutputAqlItemRow(OutputAqlItemRow const&) = delete; + OutputAqlItemRow& operator=(OutputAqlItemRow const&) = delete; + OutputAqlItemRow(OutputAqlItemRow&&) = delete; + OutputAqlItemRow& operator=(OutputAqlItemRow&&) = delete; // Clones the given AqlValue void cloneValueInto(RegisterId registerId, InputAqlItemRow const& sourceRow, @@ -116,6 +121,7 @@ class OutputAqlItemRow { #endif _inputRowCopied = true; _lastSourceRow = sourceRow; + _lastBaseIndex = _baseIndex; return; } @@ -160,7 +166,7 @@ class OutputAqlItemRow { */ std::unique_ptr stealBlock(); - bool isFull() { return numRowsWritten() >= block().size(); } + bool isFull() const { return numRowsWritten() >= block().size(); } /** * @brief Returns the number of rows that were fully written. @@ -206,20 +212,20 @@ class OutputAqlItemRow { } private: - AqlItemBlockShell& blockShell() { return *_blockShell; } - AqlItemBlockShell const& blockShell() const { return *_blockShell; } + inline AqlItemBlockShell& blockShell() { return *_blockShell; } + inline AqlItemBlockShell const& blockShell() const { return *_blockShell; } std::unordered_set const& outputRegisters() const { return *_outputRegisters; - }; + } std::unordered_set const& registersToKeep() const { return *_registersToKeep; - }; + } std::unordered_set const& registersToClear() const { return *_registersToClear; - }; + } bool isOutputRegister(RegisterId registerId) const { return outputRegisters().find(registerId) != outputRegisters().end(); @@ -275,10 +281,10 @@ class OutputAqlItemRow { bool allValuesWritten() const { return _numValuesWritten == numRegistersToWrite(); - }; + } - AqlItemBlock const& block() const { return blockShell().block(); } - AqlItemBlock& block() { return blockShell().block(); } + inline AqlItemBlock const& block() const { return blockShell().block(); } + inline AqlItemBlock& block() { return blockShell().block(); } void doCopyRow(InputAqlItemRow const& sourceRow, bool ignoreMissing); }; diff --git a/arangod/Aql/ShortestPathExecutor.cpp b/arangod/Aql/ShortestPathExecutor.cpp index 15e3f74e23..a5484fbd3e 100644 --- a/arangod/Aql/ShortestPathExecutor.cpp +++ b/arangod/Aql/ShortestPathExecutor.cpp @@ -143,12 +143,16 @@ std::pair ShortestPathExecutor::produceRow(OutputAqlIte while (true) { if (_posInPath < _path->length()) { if (_infos.usesOutputRegister(ShortestPathExecutorInfos::VERTEX)) { - output.cloneValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::VERTEX), - _input, _path->vertexToAqlValue(_infos.cache(), _posInPath)); + AqlValue vertex = _path->vertexToAqlValue(_infos.cache(), _posInPath); + AqlValueGuard guard{vertex, true}; + output.moveValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::VERTEX), + _input, guard); } if (_infos.usesOutputRegister(ShortestPathExecutorInfos::EDGE)) { - output.cloneValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::EDGE), - _input, _path->edgeToAqlValue(_infos.cache(), _posInPath)); + AqlValue edge = _path->edgeToAqlValue(_infos.cache(), _posInPath); + AqlValueGuard guard{edge, true}; + output.moveValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::EDGE), + _input, guard); } _posInPath++; return {computeState(), s}; diff --git a/arangod/Graph/Traverser.cpp b/arangod/Graph/Traverser.cpp index ff11d62137..2a0402b532 100644 --- a/arangod/Graph/Traverser.cpp +++ b/arangod/Graph/Traverser.cpp @@ -174,11 +174,10 @@ bool arangodb::traverser::Traverser::vertexMatchesConditions(arangodb::velocypac if (_opts->vertexHasFilter(depth)) { // We always need to destroy this vertex aql::AqlValue vertex = fetchVertexData(v); + aql::AqlValueGuard guard{vertex, true}; if (!_opts->evaluateVertexExpression(vertex.slice(), depth)) { - vertex.destroy(); return false; } - vertex.destroy(); } return true; } diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index c32067627b..e8ce330085 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -169,7 +169,7 @@ void SupervisedScheduler::shutdown() { break; } - LOG_TOPIC(ERR, Logger::THREADS) + LOG_TOPIC(WARN, Logger::THREADS) << "Scheduler received shutdown, but there are still tasks on the " << "queue: jobsSubmitted=" << jobsSubmitted << " jobsDone=" << jobsDone; std::this_thread::sleep_for(std::chrono::seconds(1));