From b76eeee92e122860193922541243e99b5c71f013 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Wed, 8 Feb 2017 00:59:06 +0100 Subject: [PATCH] reuse more AqlItemBlocks --- arangod/Aql/AqlItemBlock.cpp | 70 ++++++++++++++----- arangod/Aql/AqlItemBlock.h | 12 +++- arangod/Aql/AqlItemBlockManager.cpp | 78 ++++++++++++++++----- arangod/Aql/AqlItemBlockManager.h | 89 +++++++++++++++++++++++- arangod/Aql/BasicBlocks.cpp | 7 +- arangod/Aql/BlockCollector.cpp | 14 ++-- arangod/Aql/BlockCollector.h | 7 +- arangod/Aql/ClusterBlocks.cpp | 4 +- arangod/Aql/CollectBlock.cpp | 4 +- arangod/Aql/Collection.cpp | 4 ++ arangod/Aql/Collection.h | 2 + arangod/Aql/EnumerateCollectionBlock.cpp | 2 +- arangod/Aql/ExecutionBlock.cpp | 4 +- arangod/Aql/IndexBlock.cpp | 14 ++-- arangod/Aql/OptimizerRules.cpp | 8 ++- 15 files changed, 253 insertions(+), 66 deletions(-) diff --git a/arangod/Aql/AqlItemBlock.cpp b/arangod/Aql/AqlItemBlock.cpp index 7bcdb831ce..94277a2dec 100644 --- a/arangod/Aql/AqlItemBlock.cpp +++ b/arangod/Aql/AqlItemBlock.cpp @@ -206,7 +206,10 @@ void AqlItemBlock::destroy() { } /// @brief shrink the block to the specified number of rows -void AqlItemBlock::shrink(size_t nrItems) { +/// if sweep is set, then the superfluous rows are cleaned +/// if sweep is not set, the caller has to ensure that the +/// superfluous rows are empty +void AqlItemBlock::shrink(size_t nrItems, bool sweep) { TRI_ASSERT(nrItems > 0); if (nrItems == _nrItems) { @@ -220,28 +223,30 @@ void AqlItemBlock::shrink(size_t nrItems) { "cannot use shrink() to increase block"); } - // erase all stored values in the region that we freed - for (size_t i = nrItems; i < _nrItems; ++i) { - for (RegisterId j = 0; j < _nrRegs; ++j) { - AqlValue& a(_data[_nrRegs * i + j]); + if (sweep) { + // erase all stored values in the region that we freed + for (size_t i = nrItems; i < _nrItems; ++i) { + for (RegisterId j = 0; j < _nrRegs; ++j) { + AqlValue& a(_data[_nrRegs * i + j]); - if (a.requiresDestruction()) { - auto it = _valueCount.find(a); + if (a.requiresDestruction()) { + auto it = _valueCount.find(a); - if (it != _valueCount.end()) { - TRI_ASSERT((*it).second > 0); + if (it != _valueCount.end()) { + TRI_ASSERT((*it).second > 0); - if (--((*it).second) == 0) { - decreaseMemoryUsage(a.memoryUsage()); - a.destroy(); - try { - _valueCount.erase(it); - } catch (...) { + if (--((*it).second) == 0) { + decreaseMemoryUsage(a.memoryUsage()); + a.destroy(); + try { + _valueCount.erase(it); + } catch (...) { + } } } } + a.erase(); } - a.erase(); } } @@ -249,6 +254,39 @@ void AqlItemBlock::shrink(size_t nrItems) { // adjust the size of the block _nrItems = nrItems; + _data.resize(_nrItems * _nrRegs); +} + +void AqlItemBlock::rescale(size_t nrItems, RegisterId nrRegs) { + TRI_ASSERT(_valueCount.empty()); + TRI_ASSERT(nrRegs > 0); + TRI_ASSERT(nrRegs <= ExecutionNode::MaxRegisterId); + + size_t const targetSize = nrItems * nrRegs; + size_t const currentSize = _nrItems * _nrRegs; + TRI_ASSERT(currentSize <= _data.size()); + + if (targetSize > _data.size()) { + increaseMemoryUsage(sizeof(AqlValue) * (targetSize - currentSize)); + try { + _data.resize(targetSize); + } catch (...) { + decreaseMemoryUsage(sizeof(AqlValue) * (targetSize - currentSize)); + throw; + } + } else if (targetSize < _data.size()) { + decreaseMemoryUsage(sizeof(AqlValue) * (currentSize - targetSize)); + try { + _data.resize(targetSize); + } catch (...) { + increaseMemoryUsage(sizeof(AqlValue) * (currentSize - targetSize)); + throw; + } + } + + TRI_ASSERT(_data.size() >= targetSize); + _nrItems = nrItems; + _nrRegs = nrRegs; } /// @brief clears out some columns (registers), this deletes the values if diff --git a/arangod/Aql/AqlItemBlock.h b/arangod/Aql/AqlItemBlock.h index 5baf89b091..5e7e1b59ce 100644 --- a/arangod/Aql/AqlItemBlock.h +++ b/arangod/Aql/AqlItemBlock.h @@ -229,9 +229,19 @@ class AqlItemBlock { /// @brief getter for _nrItems inline size_t size() const { return _nrItems; } + + inline size_t capacity() const { return _data.size(); } /// @brief shrink the block to the specified number of rows - void shrink(size_t nrItems); + /// if sweep is set, then the superfluous rows are cleaned + /// if sweep is not set, the caller has to ensure that the + /// superfluous rows are empty + void shrink(size_t nrItems, bool sweep); + + /// @brief rescales the block to the specified dimensions + /// note that the block should be empty before rescaling to prevent + /// losses of still managed AqlValues + void rescale(size_t nrItems, RegisterId nrRegs); /// @brief clears out some columns (registers), this deletes the values if /// necessary, using the reference count. diff --git a/arangod/Aql/AqlItemBlockManager.cpp b/arangod/Aql/AqlItemBlockManager.cpp index 614bc29dcb..ff37d9b397 100644 --- a/arangod/Aql/AqlItemBlockManager.cpp +++ b/arangod/Aql/AqlItemBlockManager.cpp @@ -28,37 +28,79 @@ using namespace arangodb::aql; /// @brief create the manager AqlItemBlockManager::AqlItemBlockManager(ResourceMonitor* resourceMonitor) - : _resourceMonitor(resourceMonitor), _last(nullptr) {} + : _resourceMonitor(resourceMonitor) {} /// @brief destroy the manager -AqlItemBlockManager::~AqlItemBlockManager() { delete _last; } +AqlItemBlockManager::~AqlItemBlockManager() { } /// @brief request a block with the specified size AqlItemBlock* AqlItemBlockManager::requestBlock(size_t nrItems, RegisterId nrRegs) { -/* - if (_last != nullptr && _last->size() == nrItems && - _last->getNrRegs() == nrRegs) { - auto block = _last; - // don't hand out the same block next time - _last = nullptr; - block->eraseAll(); + // LOG(TRACE) << "requesting AqlItemBlock of " << nrItems << " x " << nrRegs; + size_t const targetSize = nrItems * nrRegs; - return block; + AqlItemBlock* block = nullptr; + size_t i = Bucket::getId(targetSize); + + int tries = 0; + while (tries++ < 2) { + TRI_ASSERT(i < NumBuckets); + if (!_buckets[i].empty()) { + block = _buckets[i].pop(); + TRI_ASSERT(block != nullptr); + block->eraseAll(); + block->rescale(nrItems, nrRegs); + // LOG(TRACE) << "returned cached AqlItemBlock with dimensions " << block->size() << " x " << block->getNrRegs(); + break; + } + // try next (bigger) bucket + if (++i >= NumBuckets) { + break; + } } -*/ - return new AqlItemBlock(_resourceMonitor, nrItems, nrRegs); + + if (block == nullptr) { + block = new AqlItemBlock(_resourceMonitor, nrItems, nrRegs); + // LOG(TRACE) << "created AqlItemBlock with dimensions " << block->size() << " x " << block->getNrRegs(); + } + + TRI_ASSERT(block != nullptr); + TRI_ASSERT(block->size() == nrItems); + TRI_ASSERT(block->getNrRegs() == nrRegs); + TRI_ASSERT(block->capacity() >= targetSize); + return block; } /// @brief return a block to the manager void AqlItemBlockManager::returnBlock(AqlItemBlock*& block) { TRI_ASSERT(block != nullptr); - delete block; - /* - block->destroy(); - delete _last; - _last = block; - */ + // LOG(TRACE) << "returning AqlItemBlock of dimensions " << block->size() << " x " << block->getNrRegs(); + + size_t const targetSize = block->size() * block->getNrRegs(); + size_t const i = Bucket::getId(targetSize); + TRI_ASSERT(i < NumBuckets); + + if (!_buckets[i].full()) { + // recycle the block + block->destroy(); + // store block in bucket + _buckets[i].push(block); + } else { + // bucket is full. simply delete the block + delete block; + } block = nullptr; } + +AqlItemBlockManager::Bucket::Bucket() { + for (size_t i = 0; i < NumBlocks; ++i) { + blocks[i] = nullptr; + } +} + +AqlItemBlockManager::Bucket::~Bucket() { + for (size_t i = 0; i < NumBlocks; ++i) { + delete blocks[i]; + } +} diff --git a/arangod/Aql/AqlItemBlockManager.h b/arangod/Aql/AqlItemBlockManager.h index db5078a9e2..c1e553092f 100644 --- a/arangod/Aql/AqlItemBlockManager.h +++ b/arangod/Aql/AqlItemBlockManager.h @@ -48,13 +48,96 @@ class AqlItemBlockManager { /// @brief return a block to the manager void returnBlock(AqlItemBlock*& block); + ResourceMonitor* resourceMonitor() const { return _resourceMonitor; } + private: ResourceMonitor* _resourceMonitor; + + static constexpr size_t NumBuckets = 12; - /// @brief last block handed back to the manager - /// this is the block that may be recycled - AqlItemBlock* _last; + struct Bucket { + static constexpr size_t NumBlocks = 4; + + Bucket(); + ~Bucket(); + + std::array blocks; + + bool empty() const { + return (blocks[0] == nullptr); + } + + bool full() const { + return (blocks[NumBlocks - 1] != nullptr); + } + + AqlItemBlock* pop() { + TRI_ASSERT(!empty()); + size_t i = NumBlocks; + while (i--) { + if (blocks[i] != nullptr) { + AqlItemBlock* result = blocks[i]; + blocks[i] = nullptr; + return result; + } + } + return nullptr; + } + + void push(AqlItemBlock* block) { + TRI_ASSERT(!full()); + for (size_t i = 0; i < NumBlocks; ++i) { + if (blocks[i] == nullptr) { + blocks[i] = block; + return; + } + } + TRI_ASSERT(false); + } + + static size_t getId(size_t targetSize) { + if (targetSize <= 1) { + return 0; + } + if (targetSize <= 10) { + return 1; + } + if (targetSize <= 20) { + return 2; + } + if (targetSize <= 40) { + return 3; + } + if (targetSize <= 100) { + return 4; + } + if (targetSize <= 200) { + return 5; + } + if (targetSize <= 400) { + return 6; + } + if (targetSize <= 1000) { + return 7; + } + if (targetSize <= 2000) { + return 8; + } + if (targetSize <= 4000) { + return 9; + } + if (targetSize <= 10000) { + return 10; + } + return 11; + } + }; + + Bucket _buckets[NumBuckets]; + + static_assert(sizeof(_buckets) <= 400, "buckets memory usage is unexpectedly high"); }; + } } diff --git a/arangod/Aql/BasicBlocks.cpp b/arangod/Aql/BasicBlocks.cpp index bb185149bc..e4ba786c72 100644 --- a/arangod/Aql/BasicBlocks.cpp +++ b/arangod/Aql/BasicBlocks.cpp @@ -148,7 +148,10 @@ int SingletonBlock::getOrSkipSome(size_t, // atLeast, } FilterBlock::FilterBlock(ExecutionEngine* engine, FilterNode const* en) - : ExecutionBlock(engine, en), _inReg(ExecutionNode::MaxRegisterId) { + : ExecutionBlock(engine, en), + _inReg(ExecutionNode::MaxRegisterId), + _collector(&engine->_itemBlockManager) { + auto it = en->getRegisterPlan()->varInfo.find(en->_inVariable->id); TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end()); _inReg = it->second.registerId; @@ -284,7 +287,7 @@ int FilterBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping, } if (!skipping) { - result = _collector.steal(_engine->getQuery()->resourceMonitor()); + result = _collector.steal(); } return TRI_ERROR_NO_ERROR; diff --git a/arangod/Aql/BlockCollector.cpp b/arangod/Aql/BlockCollector.cpp index 97e529fe33..a44b801827 100644 --- a/arangod/Aql/BlockCollector.cpp +++ b/arangod/Aql/BlockCollector.cpp @@ -25,12 +25,13 @@ #include "BlockCollector.h" #include "Aql/AqlItemBlock.h" -#include "Aql/ResourceUsage.h" +#include "Aql/AqlItemBlockManager.h" #include "Basics/Exceptions.h" using namespace arangodb::aql; -BlockCollector::BlockCollector() : _blocks{_arena}, _totalSize(0) {} +BlockCollector::BlockCollector(AqlItemBlockManager* blockManager) + : _blockManager(blockManager), _blocks{_arena}, _totalSize(0) {} BlockCollector::~BlockCollector() { clear(); } @@ -45,7 +46,7 @@ RegisterId BlockCollector::nrRegs() const { void BlockCollector::clear() { for (auto& it : _blocks) { it->eraseAll(); - delete it; + _blockManager->returnBlock(it); } _blocks.clear(); _totalSize = 0; @@ -67,7 +68,7 @@ void BlockCollector::add(AqlItemBlock* block) { _totalSize += block->size(); } -AqlItemBlock* BlockCollector::steal(ResourceMonitor* resourceMonitor) { +AqlItemBlock* BlockCollector::steal() { if (_blocks.empty()) { return nullptr; } @@ -83,9 +84,10 @@ AqlItemBlock* BlockCollector::steal(ResourceMonitor* resourceMonitor) { // only got a single result. return it as it is result = _blocks[0]; } else { - result = AqlItemBlock::concatenate(resourceMonitor, this); + result = AqlItemBlock::concatenate(_blockManager->resourceMonitor(), this); for (auto& it : _blocks) { - delete it; + it->eraseAll(); + _blockManager->returnBlock(it); } } diff --git a/arangod/Aql/BlockCollector.h b/arangod/Aql/BlockCollector.h index 40e79a7163..26ff18b308 100644 --- a/arangod/Aql/BlockCollector.h +++ b/arangod/Aql/BlockCollector.h @@ -31,7 +31,7 @@ namespace arangodb { namespace aql { class AqlItemBlock; -struct ResourceMonitor; +class AqlItemBlockManager; class BlockCollector { friend class AqlItemBlock; @@ -40,7 +40,7 @@ class BlockCollector { BlockCollector(BlockCollector const&) = delete; BlockCollector& operator=(BlockCollector const&) = delete; - BlockCollector(); + explicit BlockCollector(AqlItemBlockManager*); ~BlockCollector(); size_t totalSize() const; @@ -51,9 +51,10 @@ class BlockCollector { void add(std::unique_ptr block); void add(AqlItemBlock* block); - AqlItemBlock* steal(ResourceMonitor*); + AqlItemBlock* steal(); private: + AqlItemBlockManager* _blockManager; SmallVector::allocator_type::arena_type _arena; SmallVector _blocks; size_t _totalSize; diff --git a/arangod/Aql/ClusterBlocks.cpp b/arangod/Aql/ClusterBlocks.cpp index 9a332dc299..78a746b882 100644 --- a/arangod/Aql/ClusterBlocks.cpp +++ b/arangod/Aql/ClusterBlocks.cpp @@ -889,7 +889,7 @@ int DistributeBlock::getOrSkipSomeForShard(size_t atLeast, size_t atMost, std::deque>& buf = _distBuffer.at(clientId); - BlockCollector collector; + BlockCollector collector(&_engine->_itemBlockManager); if (buf.empty()) { if (!getBlockForClient(atLeast, atMost, clientId)) { @@ -929,7 +929,7 @@ int DistributeBlock::getOrSkipSomeForShard(size_t atLeast, size_t atMost, } if (!skipping) { - result = collector.steal(_engine->getQuery()->resourceMonitor()); + result = collector.steal(); } // _buffer is left intact, deleted and cleared at shutdown diff --git a/arangod/Aql/CollectBlock.cpp b/arangod/Aql/CollectBlock.cpp index f085fb03d3..6e2de92888 100644 --- a/arangod/Aql/CollectBlock.cpp +++ b/arangod/Aql/CollectBlock.cpp @@ -409,7 +409,7 @@ int SortedCollectBlock::getOrSkipSome(size_t atLeast, size_t atMost, TRI_ASSERT(cur != nullptr); emitGroup(cur, res.get(), skipped); ++skipped; - res->shrink(skipped); + res->shrink(skipped, false); } else { ++skipped; } @@ -448,7 +448,7 @@ int SortedCollectBlock::getOrSkipSome(size_t atLeast, size_t atMost, if (!skipping) { TRI_ASSERT(skipped > 0); - res->shrink(skipped); + res->shrink(skipped, false); } result = res.release(); diff --git a/arangod/Aql/Collection.cpp b/arangod/Aql/Collection.cpp index b8efb483a4..797d732092 100644 --- a/arangod/Aql/Collection.cpp +++ b/arangod/Aql/Collection.cpp @@ -122,6 +122,10 @@ std::vector Collection::shardKeys() const { return keys; } +size_t Collection::numberOfShards() const { + return getCollection()->numberOfShards(); +} + /// @brief whether or not the collection uses the default sharding bool Collection::usesDefaultSharding() const { return getCollection()->usesDefaultShardKeys(); diff --git a/arangod/Aql/Collection.h b/arangod/Aql/Collection.h index 19c7acac6f..7f2487bec5 100644 --- a/arangod/Aql/Collection.h +++ b/arangod/Aql/Collection.h @@ -76,6 +76,8 @@ struct Collection { /// @brief returns the shard keys of a collection std::vector shardKeys() const; + + size_t numberOfShards() const; /// @brief whether or not the collection uses the default sharding bool usesDefaultSharding() const; diff --git a/arangod/Aql/EnumerateCollectionBlock.cpp b/arangod/Aql/EnumerateCollectionBlock.cpp index 1172c22fe8..32c34924f5 100644 --- a/arangod/Aql/EnumerateCollectionBlock.cpp +++ b/arangod/Aql/EnumerateCollectionBlock.cpp @@ -229,7 +229,7 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast, if (send < atMost) { // The collection did not have enough results - res->shrink(send); + res->shrink(send, false); } // Clear out registers no longer needed later: diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 871c25819e..6f3c1e49ec 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -392,7 +392,7 @@ int ExecutionBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping, } // if _buffer.size() is > 0 then _pos points to a valid place . . . - BlockCollector collector; + BlockCollector collector(&_engine->_itemBlockManager); while (skipped < atLeast) { if (_buffer.empty()) { @@ -464,7 +464,7 @@ int ExecutionBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping, TRI_ASSERT(result == nullptr); if (!skipping) { - result = collector.steal(_engine->getQuery()->resourceMonitor()); + result = collector.steal(); } return TRI_ERROR_NO_ERROR; diff --git a/arangod/Aql/IndexBlock.cpp b/arangod/Aql/IndexBlock.cpp index cc6828434a..ad71cb6f85 100644 --- a/arangod/Aql/IndexBlock.cpp +++ b/arangod/Aql/IndexBlock.cpp @@ -50,8 +50,8 @@ IndexBlock::IndexBlock(ExecutionEngine* engine, IndexNode const* en) _cursor(nullptr), _cursors(_indexes.size()), _condition(en->_condition->root()), - _hasV8Expression(false) { - + _hasV8Expression(false), + _collector(&_engine->_itemBlockManager) { _mmdr.reset(new ManagedDocumentResult); if (_condition != nullptr) { @@ -498,7 +498,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) { traceGetSomeBegin(); if (_done) { traceGetSomeEnd(nullptr); - return _collector.steal(_engine->getQuery()->resourceMonitor()); + return _collector.steal(); } std::unique_ptr res; @@ -514,7 +514,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) { if (!ExecutionBlock::getBlock(toFetch, toFetch) || (!initIndexes())) { _done = true; traceGetSomeEnd(nullptr); - return _collector.steal(_engine->getQuery()->resourceMonitor()); + return _collector.steal(); } _pos = 0; // this is in the first block @@ -536,7 +536,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) { if (!ExecutionBlock::getBlock(DefaultBatchSize(), DefaultBatchSize())) { _done = true; traceGetSomeEnd(nullptr); - return _collector.steal(_engine->getQuery()->resourceMonitor()); + return _collector.steal(); } _pos = 0; // this is in the first block } @@ -544,7 +544,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) { if (!initIndexes()) { _done = true; traceGetSomeEnd(nullptr); - return _collector.steal(_engine->getQuery()->resourceMonitor()); + return _collector.steal(); } readIndex(atMost); } @@ -588,7 +588,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) { TRI_ASSERT(res.get() == nullptr); if (_collector.totalSize() >= atMost) { - res.reset(_collector.steal(_engine->getQuery()->resourceMonitor())); + res.reset(_collector.steal()); } } diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index 63f710f0bb..08f740430d 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -2406,7 +2406,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, std::unique_ptrregisterNode(gatherNode); TRI_ASSERT(remoteNode); gatherNode->addDependency(remoteNode); - if (!elements.empty()) { + if (!elements.empty() && gatherNode->collection()->numberOfShards() > 1) { gatherNode->setElements(elements); } @@ -2810,10 +2810,12 @@ void arangodb::aql::distributeSortToClusterRule(Optimizer* opt, // then unlink the filter/calculator from the plan plan->unlinkNode(inspectNode); // and re-insert into plan in front of the remoteNode - if(thisSortNode->_reinsertInCluster){ + if (thisSortNode->_reinsertInCluster){ plan->insertDependency(rn, inspectNode); } - gatherNode->setElements(thisSortNode->getElements()); + if (gatherNode->collection()->numberOfShards() > 1) { + gatherNode->setElements(thisSortNode->getElements()); + } modified = true; // ready to rumble! }