mirror of https://gitee.com/bigwinds/arangodb
fix some memleaks (#8432)
This commit is contained in:
parent
5d527168d0
commit
23f7fc1368
|
@ -288,6 +288,28 @@ void AqlItemBlock::shrink(size_t nrItems) {
|
|||
|
||||
decreaseMemoryUsage(sizeof(AqlValue) * (_nrItems - nrItems) * _nrRegs);
|
||||
|
||||
for (size_t i = _nrItems * _nrRegs; i < _data.size(); ++i) {
|
||||
AqlValue& a = _data[i];
|
||||
if (a.requiresDestruction()) {
|
||||
auto it = _valueCount.find(a);
|
||||
|
||||
if (it != _valueCount.end()) {
|
||||
TRI_ASSERT((*it).second > 0);
|
||||
|
||||
if (--((*it).second) == 0) {
|
||||
decreaseMemoryUsage(a.memoryUsage());
|
||||
a.destroy();
|
||||
try {
|
||||
_valueCount.erase(it);
|
||||
continue; // no need for an extra a.erase() here
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
a.erase();
|
||||
}
|
||||
|
||||
// adjust the size of the block
|
||||
_nrItems = nrItems;
|
||||
}
|
||||
|
@ -308,7 +330,7 @@ void AqlItemBlock::rescale(size_t nrItems, RegisterId nrRegs) {
|
|||
// way; because currently, we are tracking the memory we need, instead of the
|
||||
// memory we have.
|
||||
if (targetSize > _data.size()) {
|
||||
_data.resize(targetSize);
|
||||
_data.resize(targetSize);
|
||||
}
|
||||
|
||||
TRI_ASSERT(targetSize <= _data.size());
|
||||
|
|
|
@ -241,6 +241,7 @@ class AqlItemBlock {
|
|||
if (_data[fromRow * _nrRegs + i].requiresDestruction()) {
|
||||
++_valueCount[_data[fromRow * _nrRegs + i]];
|
||||
}
|
||||
TRI_ASSERT(_data[currentRow * _nrRegs + i].isEmpty());
|
||||
_data[currentRow * _nrRegs + i] = _data[fromRow * _nrRegs + i];
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,8 +69,8 @@ class AqlItemBlockShell {
|
|||
public:
|
||||
using SmartAqlItemBlockPtr = std::unique_ptr<AqlItemBlock, AqlItemBlockDeleter>;
|
||||
|
||||
AqlItemBlock const& block() const { return *_block; };
|
||||
AqlItemBlock& block() { return *_block; };
|
||||
inline AqlItemBlock const& block() const { return *_block; }
|
||||
inline AqlItemBlock& block() { return *_block; }
|
||||
|
||||
AqlItemBlockShell(AqlItemBlockManager& manager, std::unique_ptr<AqlItemBlock> block);
|
||||
|
||||
|
@ -89,7 +89,6 @@ class AqlItemBlockShell {
|
|||
SmartAqlItemBlockPtr _block;
|
||||
};
|
||||
|
||||
|
||||
} // namespace aql
|
||||
} // namespace arangodb
|
||||
|
||||
|
|
|
@ -30,9 +30,9 @@ ExecutionState BlockFetcher<passBlocksThrough>::prefetchBlock(size_t atMost) {
|
|||
ExecutionState state;
|
||||
std::unique_ptr<AqlItemBlock> block;
|
||||
std::tie(state, block) = upstreamBlock().getSome(atMost);
|
||||
TRI_IF_FAILURE("ExecutionBlock::getBlock") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
TRI_IF_FAILURE("ExecutionBlock::getBlock") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(block == nullptr);
|
||||
|
@ -56,7 +56,7 @@ ExecutionState BlockFetcher<passBlocksThrough>::prefetchBlock(size_t atMost) {
|
|||
_blockShellPassThroughQueue.push({state, blockShell});
|
||||
}
|
||||
|
||||
_blockShellQueue.push({state, blockShell});
|
||||
_blockShellQueue.push({state, std::move(blockShell)});
|
||||
return ExecutionState::HASMORE;
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,8 @@ BlockFetcher<passBlocksThrough>::fetchBlock(size_t atMost) {
|
|||
TRI_ASSERT(state == ExecutionState::HASMORE);
|
||||
}
|
||||
|
||||
TRI_ASSERT(!_blockShellQueue.empty());
|
||||
|
||||
ExecutionState state;
|
||||
std::shared_ptr<AqlItemBlockShell> blockShell;
|
||||
std::tie(state, blockShell) = _blockShellQueue.front();
|
||||
|
@ -81,7 +83,7 @@ BlockFetcher<passBlocksThrough>::fetchBlock(size_t atMost) {
|
|||
|
||||
//auto inputBlockShell =
|
||||
// std::make_shared<InputAqlItemBlockShell>(blockShell, _inputRegisters);
|
||||
return {state, blockShell};
|
||||
return {state, std::move(blockShell)};
|
||||
}
|
||||
|
||||
template <bool allowBlockPassthrough>
|
||||
|
@ -99,6 +101,8 @@ BlockFetcher<allowBlockPassthrough>::fetchBlockForPassthrough(size_t atMost) {
|
|||
TRI_ASSERT(state == ExecutionState::HASMORE);
|
||||
}
|
||||
|
||||
TRI_ASSERT(!_blockShellPassThroughQueue.empty());
|
||||
|
||||
ExecutionState state;
|
||||
std::shared_ptr<AqlItemBlockShell> blockShell;
|
||||
std::tie(state, blockShell) = _blockShellPassThroughQueue.front();
|
||||
|
|
|
@ -55,13 +55,20 @@ DistinctCollectExecutorInfos::DistinctCollectExecutorInfos(
|
|||
}
|
||||
|
||||
DistinctCollectExecutor::DistinctCollectExecutor(Fetcher& fetcher, Infos& infos)
|
||||
: _infos(infos), _fetcher(fetcher) {
|
||||
_seen = std::make_unique<std::unordered_set<std::vector<AqlValue>, AqlValueGroupHash, AqlValueGroupEqual>>(
|
||||
1024,
|
||||
AqlValueGroupHash(_infos.getTransaction(), _infos.getGroupRegisters().size()),
|
||||
AqlValueGroupEqual(_infos.getTransaction()));
|
||||
};
|
||||
DistinctCollectExecutor::~DistinctCollectExecutor() = default;
|
||||
: _infos(infos), _fetcher(fetcher),
|
||||
_seen(1024,
|
||||
AqlValueGroupHash(_infos.getTransaction(), _infos.getGroupRegisters().size()),
|
||||
AqlValueGroupEqual(_infos.getTransaction())) {
|
||||
}
|
||||
|
||||
DistinctCollectExecutor::~DistinctCollectExecutor() {
|
||||
// destroy all AqlValues captured
|
||||
for (auto& it : _seen) {
|
||||
for (auto& it2 : it) {
|
||||
const_cast<AqlValue*>(&it2)->destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<ExecutionState, NoStats> DistinctCollectExecutor::produceRow(OutputAqlItemRow& output) {
|
||||
TRI_IF_FAILURE("DistinctCollectExecutor::produceRow") {
|
||||
|
@ -95,9 +102,9 @@ std::pair<ExecutionState, NoStats> DistinctCollectExecutor::produceRow(OutputAql
|
|||
}
|
||||
|
||||
// now check if we already know this group
|
||||
auto foundIt = _seen->find(groupValues);
|
||||
auto foundIt = _seen.find(groupValues);
|
||||
|
||||
bool newGroup = foundIt == _seen->end();
|
||||
bool newGroup = foundIt == _seen.end();
|
||||
if (newGroup) {
|
||||
size_t i = 0;
|
||||
|
||||
|
@ -112,7 +119,7 @@ std::pair<ExecutionState, NoStats> DistinctCollectExecutor::produceRow(OutputAql
|
|||
for (auto const& it : groupValues) {
|
||||
copy.emplace_back(it.clone());
|
||||
}
|
||||
_seen->emplace(std::move(copy));
|
||||
_seen.emplace(std::move(copy));
|
||||
}
|
||||
|
||||
// Abort if upstream is done
|
||||
|
|
|
@ -112,7 +112,7 @@ class DistinctCollectExecutor {
|
|||
private:
|
||||
Infos const& _infos;
|
||||
Fetcher& _fetcher;
|
||||
std::unique_ptr<std::unordered_set<std::vector<AqlValue>, AqlValueGroupHash, AqlValueGroupEqual>> _seen;
|
||||
std::unordered_set<std::vector<AqlValue>, AqlValueGroupHash, AqlValueGroupEqual> _seen;
|
||||
};
|
||||
|
||||
} // namespace aql
|
||||
|
|
|
@ -94,7 +94,7 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos
|
|||
_infos.getProjections(), _infos.getTrxPtr(), _infos.getCoveringIndexAttributePositions(),
|
||||
_allowCoveringIndexOptimization, _infos.getUseRawDocumentPointers()));
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
EnumerateCollectionExecutor::~EnumerateCollectionExecutor() = default;
|
||||
|
||||
|
|
|
@ -65,18 +65,11 @@ ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
|
|||
_rowFetcher(_blockFetcher),
|
||||
_infos(std::move(infos)),
|
||||
_executor(_rowFetcher, _infos),
|
||||
_outputItemRow(nullptr),
|
||||
_outputItemRow(),
|
||||
_query(*engine->getQuery()) {}
|
||||
|
||||
template <class Executor>
|
||||
ExecutionBlockImpl<Executor>::~ExecutionBlockImpl() {
|
||||
if (_outputItemRow) {
|
||||
std::unique_ptr<AqlItemBlock> block = _outputItemRow->stealBlock();
|
||||
if (block != nullptr) {
|
||||
_engine->itemBlockManager().returnBlock(std::move(block));
|
||||
}
|
||||
}
|
||||
}
|
||||
ExecutionBlockImpl<Executor>::~ExecutionBlockImpl() {}
|
||||
|
||||
template <class Executor>
|
||||
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> ExecutionBlockImpl<Executor>::getSome(size_t atMost) {
|
||||
|
@ -136,7 +129,7 @@ ExecutionBlockImpl<Executor>::getSomeWithoutTrace(size_t atMost) {
|
|||
// Count global but executor-specific statistics, like number of filtered
|
||||
// rows.
|
||||
_engine->_stats += executorStats;
|
||||
if (_outputItemRow && _outputItemRow->produced()) {
|
||||
if (_outputItemRow->produced()) {
|
||||
_outputItemRow->advanceRow();
|
||||
}
|
||||
|
||||
|
@ -152,7 +145,7 @@ ExecutionBlockImpl<Executor>::getSomeWithoutTrace(size_t atMost) {
|
|||
auto outputBlock = _outputItemRow->stealBlock();
|
||||
// This is not strictly necessary here, as we shouldn't be called again
|
||||
// after DONE.
|
||||
_outputItemRow.reset(nullptr);
|
||||
_outputItemRow.reset();
|
||||
|
||||
return {state, std::move(outputBlock)};
|
||||
}
|
||||
|
@ -169,7 +162,7 @@ ExecutionBlockImpl<Executor>::getSomeWithoutTrace(size_t atMost) {
|
|||
// we guarantee that we do return a valid pointer in the HASMORE case.
|
||||
TRI_ASSERT(outputBlock != nullptr);
|
||||
// TODO OutputAqlItemRow could get "reset" and "isValid" methods and be reused
|
||||
_outputItemRow.reset(nullptr);
|
||||
_outputItemRow.reset();
|
||||
|
||||
return {state, std::move(outputBlock)};
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ class InputAqlItemRow;
|
|||
|
||||
class ExecutorExpressionContext final : public QueryExpressionContext {
|
||||
public:
|
||||
ExecutorExpressionContext(Query* query, InputAqlItemRow& inputRow,
|
||||
ExecutorExpressionContext(Query* query, InputAqlItemRow const& inputRow,
|
||||
std::vector<Variable const*> const& vars,
|
||||
std::vector<RegisterId> const& regs)
|
||||
: QueryExpressionContext(query), _inputRow(inputRow), _vars(vars), _regs(regs) {}
|
||||
|
@ -52,7 +52,7 @@ class ExecutorExpressionContext final : public QueryExpressionContext {
|
|||
|
||||
private:
|
||||
/// @brief temporary storage for expression data context
|
||||
InputAqlItemRow& _inputRow;
|
||||
InputAqlItemRow const& _inputRow;
|
||||
std::vector<Variable const*> const& _vars;
|
||||
std::vector<RegisterId> const& _regs;
|
||||
};
|
||||
|
|
|
@ -49,10 +49,10 @@ FilterExecutorInfos::FilterExecutorInfos(RegisterId inputRegister, RegisterId nr
|
|||
std::move(registersToClear), std::move(registersToKeep)),
|
||||
_inputRegister(inputRegister) {}
|
||||
|
||||
FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) : _infos(infos), _fetcher(fetcher){};
|
||||
FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) : _infos(infos), _fetcher(fetcher) {}
|
||||
FilterExecutor::~FilterExecutor() = default;
|
||||
|
||||
std::pair<ExecutionState, FilterStats> FilterExecutor::produceRow(OutputAqlItemRow &output) {
|
||||
std::pair<ExecutionState, FilterStats> FilterExecutor::produceRow(OutputAqlItemRow& output) {
|
||||
TRI_IF_FAILURE("FilterExecutor::produceRow") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ class FilterExecutorInfos : public ExecutorInfos {
|
|||
FilterExecutorInfos(FilterExecutorInfos const&) = delete;
|
||||
~FilterExecutorInfos() = default;
|
||||
|
||||
RegisterId getInputRegister() const noexcept { return _inputRegister; };
|
||||
RegisterId getInputRegister() const noexcept { return _inputRegister; }
|
||||
|
||||
private:
|
||||
// This is exactly the value in the parent member ExecutorInfo::_inRegs,
|
||||
|
|
|
@ -88,12 +88,12 @@ class InputAqlItemRow {
|
|||
inline AqlValue stealValue(RegisterId registerId) {
|
||||
TRI_ASSERT(isInitialized());
|
||||
TRI_ASSERT(registerId < getNrRegisters());
|
||||
AqlValue a = block().getValueReference(_baseIndex, registerId);
|
||||
AqlValue const& a = block().getValueReference(_baseIndex, registerId);
|
||||
if (!a.isEmpty() && a.requiresDestruction()) {
|
||||
// Now no one is responsible for AqlValue a
|
||||
block().steal(a);
|
||||
}
|
||||
// This cannot fail, caller needs to take immediate owner shops.
|
||||
// This cannot fail, caller needs to take immediate ownership.
|
||||
return a;
|
||||
}
|
||||
|
||||
|
|
|
@ -58,9 +58,10 @@ OutputAqlItemRow::OutputAqlItemRow(
|
|||
TRI_ASSERT(_blockShell != nullptr);
|
||||
}
|
||||
|
||||
void OutputAqlItemRow::doCopyRow(const InputAqlItemRow& sourceRow, bool ignoreMissing) {
|
||||
void OutputAqlItemRow::doCopyRow(InputAqlItemRow const& sourceRow, bool ignoreMissing) {
|
||||
// Note that _lastSourceRow is invalid right after construction. However, when
|
||||
// _baseIndex > 0, then we must have seen one row already.
|
||||
TRI_ASSERT(!_doNotCopyInputRow);
|
||||
TRI_ASSERT(_baseIndex == 0 || _lastSourceRow.isInitialized());
|
||||
bool mustClone = _baseIndex == 0 || _lastSourceRow != sourceRow;
|
||||
|
||||
|
@ -100,7 +101,7 @@ std::unique_ptr<AqlItemBlock> OutputAqlItemRow::stealBlock() {
|
|||
std::unique_ptr<AqlItemBlock> block = blockShell().stealBlockCompat();
|
||||
if (numRowsWritten() == 0) {
|
||||
// blocks may not be empty
|
||||
block.reset(nullptr);
|
||||
block.reset();
|
||||
} else {
|
||||
// numRowsWritten() returns the exact number of rows that were fully
|
||||
// written and takes into account whether the current row was written.
|
||||
|
|
|
@ -53,6 +53,11 @@ class OutputAqlItemRow {
|
|||
std::shared_ptr<std::unordered_set<RegisterId> const> registersToClear,
|
||||
CopyRowBehaviour = CopyRowBehaviour::CopyInputRows);
|
||||
|
||||
OutputAqlItemRow(OutputAqlItemRow const&) = delete;
|
||||
OutputAqlItemRow& operator=(OutputAqlItemRow const&) = delete;
|
||||
OutputAqlItemRow(OutputAqlItemRow&&) = delete;
|
||||
OutputAqlItemRow& operator=(OutputAqlItemRow&&) = delete;
|
||||
|
||||
// Clones the given AqlValue
|
||||
void cloneValueInto(RegisterId registerId, InputAqlItemRow const& sourceRow,
|
||||
AqlValue const& value) {
|
||||
|
@ -116,6 +121,7 @@ class OutputAqlItemRow {
|
|||
#endif
|
||||
_inputRowCopied = true;
|
||||
_lastSourceRow = sourceRow;
|
||||
_lastBaseIndex = _baseIndex;
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -160,7 +166,7 @@ class OutputAqlItemRow {
|
|||
*/
|
||||
std::unique_ptr<AqlItemBlock> stealBlock();
|
||||
|
||||
bool isFull() { return numRowsWritten() >= block().size(); }
|
||||
bool isFull() const { return numRowsWritten() >= block().size(); }
|
||||
|
||||
/**
|
||||
* @brief Returns the number of rows that were fully written.
|
||||
|
@ -206,20 +212,20 @@ class OutputAqlItemRow {
|
|||
}
|
||||
|
||||
private:
|
||||
AqlItemBlockShell& blockShell() { return *_blockShell; }
|
||||
AqlItemBlockShell const& blockShell() const { return *_blockShell; }
|
||||
inline AqlItemBlockShell& blockShell() { return *_blockShell; }
|
||||
inline AqlItemBlockShell const& blockShell() const { return *_blockShell; }
|
||||
|
||||
std::unordered_set<RegisterId> const& outputRegisters() const {
|
||||
return *_outputRegisters;
|
||||
};
|
||||
}
|
||||
|
||||
std::unordered_set<RegisterId> const& registersToKeep() const {
|
||||
return *_registersToKeep;
|
||||
};
|
||||
}
|
||||
|
||||
std::unordered_set<RegisterId> const& registersToClear() const {
|
||||
return *_registersToClear;
|
||||
};
|
||||
}
|
||||
|
||||
bool isOutputRegister(RegisterId registerId) const {
|
||||
return outputRegisters().find(registerId) != outputRegisters().end();
|
||||
|
@ -275,10 +281,10 @@ class OutputAqlItemRow {
|
|||
|
||||
bool allValuesWritten() const {
|
||||
return _numValuesWritten == numRegistersToWrite();
|
||||
};
|
||||
}
|
||||
|
||||
AqlItemBlock const& block() const { return blockShell().block(); }
|
||||
AqlItemBlock& block() { return blockShell().block(); }
|
||||
inline AqlItemBlock const& block() const { return blockShell().block(); }
|
||||
inline AqlItemBlock& block() { return blockShell().block(); }
|
||||
|
||||
void doCopyRow(InputAqlItemRow const& sourceRow, bool ignoreMissing);
|
||||
};
|
||||
|
|
|
@ -143,12 +143,16 @@ std::pair<ExecutionState, NoStats> ShortestPathExecutor::produceRow(OutputAqlIte
|
|||
while (true) {
|
||||
if (_posInPath < _path->length()) {
|
||||
if (_infos.usesOutputRegister(ShortestPathExecutorInfos::VERTEX)) {
|
||||
output.cloneValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::VERTEX),
|
||||
_input, _path->vertexToAqlValue(_infos.cache(), _posInPath));
|
||||
AqlValue vertex = _path->vertexToAqlValue(_infos.cache(), _posInPath);
|
||||
AqlValueGuard guard{vertex, true};
|
||||
output.moveValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::VERTEX),
|
||||
_input, guard);
|
||||
}
|
||||
if (_infos.usesOutputRegister(ShortestPathExecutorInfos::EDGE)) {
|
||||
output.cloneValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::EDGE),
|
||||
_input, _path->edgeToAqlValue(_infos.cache(), _posInPath));
|
||||
AqlValue edge = _path->edgeToAqlValue(_infos.cache(), _posInPath);
|
||||
AqlValueGuard guard{edge, true};
|
||||
output.moveValueInto(_infos.getOutputRegister(ShortestPathExecutorInfos::EDGE),
|
||||
_input, guard);
|
||||
}
|
||||
_posInPath++;
|
||||
return {computeState(), s};
|
||||
|
|
|
@ -174,11 +174,10 @@ bool arangodb::traverser::Traverser::vertexMatchesConditions(arangodb::velocypac
|
|||
if (_opts->vertexHasFilter(depth)) {
|
||||
// We always need to destroy this vertex
|
||||
aql::AqlValue vertex = fetchVertexData(v);
|
||||
aql::AqlValueGuard guard{vertex, true};
|
||||
if (!_opts->evaluateVertexExpression(vertex.slice(), depth)) {
|
||||
vertex.destroy();
|
||||
return false;
|
||||
}
|
||||
vertex.destroy();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ void SupervisedScheduler::shutdown() {
|
|||
break;
|
||||
}
|
||||
|
||||
LOG_TOPIC(ERR, Logger::THREADS)
|
||||
LOG_TOPIC(WARN, Logger::THREADS)
|
||||
<< "Scheduler received shutdown, but there are still tasks on the "
|
||||
<< "queue: jobsSubmitted=" << jobsSubmitted << " jobsDone=" << jobsDone;
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
|
Loading…
Reference in New Issue