1
0
Fork 0

reuse more AqlItemBlocks

This commit is contained in:
jsteemann 2017-02-08 00:59:06 +01:00
parent 59b3967273
commit b76eeee92e
15 changed files with 253 additions and 66 deletions

View File

@ -206,7 +206,10 @@ void AqlItemBlock::destroy() {
}
/// @brief shrink the block to the specified number of rows
void AqlItemBlock::shrink(size_t nrItems) {
/// if sweep is set, then the superfluous rows are cleaned
/// if sweep is not set, the caller has to ensure that the
/// superfluous rows are empty
void AqlItemBlock::shrink(size_t nrItems, bool sweep) {
TRI_ASSERT(nrItems > 0);
if (nrItems == _nrItems) {
@ -220,28 +223,30 @@ void AqlItemBlock::shrink(size_t nrItems) {
"cannot use shrink() to increase block");
}
// erase all stored values in the region that we freed
for (size_t i = nrItems; i < _nrItems; ++i) {
for (RegisterId j = 0; j < _nrRegs; ++j) {
AqlValue& a(_data[_nrRegs * i + j]);
if (sweep) {
// erase all stored values in the region that we freed
for (size_t i = nrItems; i < _nrItems; ++i) {
for (RegisterId j = 0; j < _nrRegs; ++j) {
AqlValue& a(_data[_nrRegs * i + j]);
if (a.requiresDestruction()) {
auto it = _valueCount.find(a);
if (a.requiresDestruction()) {
auto it = _valueCount.find(a);
if (it != _valueCount.end()) {
TRI_ASSERT((*it).second > 0);
if (it != _valueCount.end()) {
TRI_ASSERT((*it).second > 0);
if (--((*it).second) == 0) {
decreaseMemoryUsage(a.memoryUsage());
a.destroy();
try {
_valueCount.erase(it);
} catch (...) {
if (--((*it).second) == 0) {
decreaseMemoryUsage(a.memoryUsage());
a.destroy();
try {
_valueCount.erase(it);
} catch (...) {
}
}
}
}
a.erase();
}
a.erase();
}
}
@ -249,6 +254,39 @@ void AqlItemBlock::shrink(size_t nrItems) {
// adjust the size of the block
_nrItems = nrItems;
_data.resize(_nrItems * _nrRegs);
}
void AqlItemBlock::rescale(size_t nrItems, RegisterId nrRegs) {
TRI_ASSERT(_valueCount.empty());
TRI_ASSERT(nrRegs > 0);
TRI_ASSERT(nrRegs <= ExecutionNode::MaxRegisterId);
size_t const targetSize = nrItems * nrRegs;
size_t const currentSize = _nrItems * _nrRegs;
TRI_ASSERT(currentSize <= _data.size());
if (targetSize > _data.size()) {
increaseMemoryUsage(sizeof(AqlValue) * (targetSize - currentSize));
try {
_data.resize(targetSize);
} catch (...) {
decreaseMemoryUsage(sizeof(AqlValue) * (targetSize - currentSize));
throw;
}
} else if (targetSize < _data.size()) {
decreaseMemoryUsage(sizeof(AqlValue) * (currentSize - targetSize));
try {
_data.resize(targetSize);
} catch (...) {
increaseMemoryUsage(sizeof(AqlValue) * (currentSize - targetSize));
throw;
}
}
TRI_ASSERT(_data.size() >= targetSize);
_nrItems = nrItems;
_nrRegs = nrRegs;
}
/// @brief clears out some columns (registers), this deletes the values if

View File

@ -229,9 +229,19 @@ class AqlItemBlock {
/// @brief getter for _nrItems
inline size_t size() const { return _nrItems; }
inline size_t capacity() const { return _data.size(); }
/// @brief shrink the block to the specified number of rows
void shrink(size_t nrItems);
/// if sweep is set, then the superfluous rows are cleaned
/// if sweep is not set, the caller has to ensure that the
/// superfluous rows are empty
void shrink(size_t nrItems, bool sweep);
/// @brief rescales the block to the specified dimensions
/// note that the block should be empty before rescaling to prevent
/// losses of still managed AqlValues
void rescale(size_t nrItems, RegisterId nrRegs);
/// @brief clears out some columns (registers), this deletes the values if
/// necessary, using the reference count.

View File

@ -28,37 +28,79 @@ using namespace arangodb::aql;
/// @brief create the manager
AqlItemBlockManager::AqlItemBlockManager(ResourceMonitor* resourceMonitor)
: _resourceMonitor(resourceMonitor), _last(nullptr) {}
: _resourceMonitor(resourceMonitor) {}
/// @brief destroy the manager
AqlItemBlockManager::~AqlItemBlockManager() { delete _last; }
AqlItemBlockManager::~AqlItemBlockManager() { }
/// @brief request a block with the specified size
AqlItemBlock* AqlItemBlockManager::requestBlock(size_t nrItems,
RegisterId nrRegs) {
/*
if (_last != nullptr && _last->size() == nrItems &&
_last->getNrRegs() == nrRegs) {
auto block = _last;
// don't hand out the same block next time
_last = nullptr;
block->eraseAll();
// LOG(TRACE) << "requesting AqlItemBlock of " << nrItems << " x " << nrRegs;
size_t const targetSize = nrItems * nrRegs;
return block;
AqlItemBlock* block = nullptr;
size_t i = Bucket::getId(targetSize);
int tries = 0;
while (tries++ < 2) {
TRI_ASSERT(i < NumBuckets);
if (!_buckets[i].empty()) {
block = _buckets[i].pop();
TRI_ASSERT(block != nullptr);
block->eraseAll();
block->rescale(nrItems, nrRegs);
// LOG(TRACE) << "returned cached AqlItemBlock with dimensions " << block->size() << " x " << block->getNrRegs();
break;
}
// try next (bigger) bucket
if (++i >= NumBuckets) {
break;
}
}
*/
return new AqlItemBlock(_resourceMonitor, nrItems, nrRegs);
if (block == nullptr) {
block = new AqlItemBlock(_resourceMonitor, nrItems, nrRegs);
// LOG(TRACE) << "created AqlItemBlock with dimensions " << block->size() << " x " << block->getNrRegs();
}
TRI_ASSERT(block != nullptr);
TRI_ASSERT(block->size() == nrItems);
TRI_ASSERT(block->getNrRegs() == nrRegs);
TRI_ASSERT(block->capacity() >= targetSize);
return block;
}
/// @brief return a block to the manager
void AqlItemBlockManager::returnBlock(AqlItemBlock*& block) {
TRI_ASSERT(block != nullptr);
delete block;
/*
block->destroy();
delete _last;
_last = block;
*/
// LOG(TRACE) << "returning AqlItemBlock of dimensions " << block->size() << " x " << block->getNrRegs();
size_t const targetSize = block->size() * block->getNrRegs();
size_t const i = Bucket::getId(targetSize);
TRI_ASSERT(i < NumBuckets);
if (!_buckets[i].full()) {
// recycle the block
block->destroy();
// store block in bucket
_buckets[i].push(block);
} else {
// bucket is full. simply delete the block
delete block;
}
block = nullptr;
}
AqlItemBlockManager::Bucket::Bucket() {
for (size_t i = 0; i < NumBlocks; ++i) {
blocks[i] = nullptr;
}
}
AqlItemBlockManager::Bucket::~Bucket() {
for (size_t i = 0; i < NumBlocks; ++i) {
delete blocks[i];
}
}

View File

@ -48,13 +48,96 @@ class AqlItemBlockManager {
/// @brief return a block to the manager
void returnBlock(AqlItemBlock*& block);
ResourceMonitor* resourceMonitor() const { return _resourceMonitor; }
private:
ResourceMonitor* _resourceMonitor;
static constexpr size_t NumBuckets = 12;
/// @brief last block handed back to the manager
/// this is the block that may be recycled
AqlItemBlock* _last;
struct Bucket {
static constexpr size_t NumBlocks = 4;
Bucket();
~Bucket();
std::array<AqlItemBlock*, NumBlocks> blocks;
bool empty() const {
return (blocks[0] == nullptr);
}
bool full() const {
return (blocks[NumBlocks - 1] != nullptr);
}
AqlItemBlock* pop() {
TRI_ASSERT(!empty());
size_t i = NumBlocks;
while (i--) {
if (blocks[i] != nullptr) {
AqlItemBlock* result = blocks[i];
blocks[i] = nullptr;
return result;
}
}
return nullptr;
}
void push(AqlItemBlock* block) {
TRI_ASSERT(!full());
for (size_t i = 0; i < NumBlocks; ++i) {
if (blocks[i] == nullptr) {
blocks[i] = block;
return;
}
}
TRI_ASSERT(false);
}
static size_t getId(size_t targetSize) {
if (targetSize <= 1) {
return 0;
}
if (targetSize <= 10) {
return 1;
}
if (targetSize <= 20) {
return 2;
}
if (targetSize <= 40) {
return 3;
}
if (targetSize <= 100) {
return 4;
}
if (targetSize <= 200) {
return 5;
}
if (targetSize <= 400) {
return 6;
}
if (targetSize <= 1000) {
return 7;
}
if (targetSize <= 2000) {
return 8;
}
if (targetSize <= 4000) {
return 9;
}
if (targetSize <= 10000) {
return 10;
}
return 11;
}
};
Bucket _buckets[NumBuckets];
static_assert(sizeof(_buckets) <= 400, "buckets memory usage is unexpectedly high");
};
}
}

View File

@ -148,7 +148,10 @@ int SingletonBlock::getOrSkipSome(size_t, // atLeast,
}
FilterBlock::FilterBlock(ExecutionEngine* engine, FilterNode const* en)
: ExecutionBlock(engine, en), _inReg(ExecutionNode::MaxRegisterId) {
: ExecutionBlock(engine, en),
_inReg(ExecutionNode::MaxRegisterId),
_collector(&engine->_itemBlockManager) {
auto it = en->getRegisterPlan()->varInfo.find(en->_inVariable->id);
TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end());
_inReg = it->second.registerId;
@ -284,7 +287,7 @@ int FilterBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
}
if (!skipping) {
result = _collector.steal(_engine->getQuery()->resourceMonitor());
result = _collector.steal();
}
return TRI_ERROR_NO_ERROR;

View File

@ -25,12 +25,13 @@
#include "BlockCollector.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/ResourceUsage.h"
#include "Aql/AqlItemBlockManager.h"
#include "Basics/Exceptions.h"
using namespace arangodb::aql;
BlockCollector::BlockCollector() : _blocks{_arena}, _totalSize(0) {}
BlockCollector::BlockCollector(AqlItemBlockManager* blockManager)
: _blockManager(blockManager), _blocks{_arena}, _totalSize(0) {}
BlockCollector::~BlockCollector() { clear(); }
@ -45,7 +46,7 @@ RegisterId BlockCollector::nrRegs() const {
void BlockCollector::clear() {
for (auto& it : _blocks) {
it->eraseAll();
delete it;
_blockManager->returnBlock(it);
}
_blocks.clear();
_totalSize = 0;
@ -67,7 +68,7 @@ void BlockCollector::add(AqlItemBlock* block) {
_totalSize += block->size();
}
AqlItemBlock* BlockCollector::steal(ResourceMonitor* resourceMonitor) {
AqlItemBlock* BlockCollector::steal() {
if (_blocks.empty()) {
return nullptr;
}
@ -83,9 +84,10 @@ AqlItemBlock* BlockCollector::steal(ResourceMonitor* resourceMonitor) {
// only got a single result. return it as it is
result = _blocks[0];
} else {
result = AqlItemBlock::concatenate(resourceMonitor, this);
result = AqlItemBlock::concatenate(_blockManager->resourceMonitor(), this);
for (auto& it : _blocks) {
delete it;
it->eraseAll();
_blockManager->returnBlock(it);
}
}

View File

@ -31,7 +31,7 @@
namespace arangodb {
namespace aql {
class AqlItemBlock;
struct ResourceMonitor;
class AqlItemBlockManager;
class BlockCollector {
friend class AqlItemBlock;
@ -40,7 +40,7 @@ class BlockCollector {
BlockCollector(BlockCollector const&) = delete;
BlockCollector& operator=(BlockCollector const&) = delete;
BlockCollector();
explicit BlockCollector(AqlItemBlockManager*);
~BlockCollector();
size_t totalSize() const;
@ -51,9 +51,10 @@ class BlockCollector {
void add(std::unique_ptr<AqlItemBlock> block);
void add(AqlItemBlock* block);
AqlItemBlock* steal(ResourceMonitor*);
AqlItemBlock* steal();
private:
AqlItemBlockManager* _blockManager;
SmallVector<AqlItemBlock*>::allocator_type::arena_type _arena;
SmallVector<AqlItemBlock*> _blocks;
size_t _totalSize;

View File

@ -889,7 +889,7 @@ int DistributeBlock::getOrSkipSomeForShard(size_t atLeast, size_t atMost,
std::deque<std::pair<size_t, size_t>>& buf = _distBuffer.at(clientId);
BlockCollector collector;
BlockCollector collector(&_engine->_itemBlockManager);
if (buf.empty()) {
if (!getBlockForClient(atLeast, atMost, clientId)) {
@ -929,7 +929,7 @@ int DistributeBlock::getOrSkipSomeForShard(size_t atLeast, size_t atMost,
}
if (!skipping) {
result = collector.steal(_engine->getQuery()->resourceMonitor());
result = collector.steal();
}
// _buffer is left intact, deleted and cleared at shutdown

View File

@ -409,7 +409,7 @@ int SortedCollectBlock::getOrSkipSome(size_t atLeast, size_t atMost,
TRI_ASSERT(cur != nullptr);
emitGroup(cur, res.get(), skipped);
++skipped;
res->shrink(skipped);
res->shrink(skipped, false);
} else {
++skipped;
}
@ -448,7 +448,7 @@ int SortedCollectBlock::getOrSkipSome(size_t atLeast, size_t atMost,
if (!skipping) {
TRI_ASSERT(skipped > 0);
res->shrink(skipped);
res->shrink(skipped, false);
}
result = res.release();

View File

@ -122,6 +122,10 @@ std::vector<std::string> Collection::shardKeys() const {
return keys;
}
size_t Collection::numberOfShards() const {
return getCollection()->numberOfShards();
}
/// @brief whether or not the collection uses the default sharding
bool Collection::usesDefaultSharding() const {
return getCollection()->usesDefaultShardKeys();

View File

@ -76,6 +76,8 @@ struct Collection {
/// @brief returns the shard keys of a collection
std::vector<std::string> shardKeys() const;
size_t numberOfShards() const;
/// @brief whether or not the collection uses the default sharding
bool usesDefaultSharding() const;

View File

@ -229,7 +229,7 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
if (send < atMost) {
// The collection did not have enough results
res->shrink(send);
res->shrink(send, false);
}
// Clear out registers no longer needed later:

View File

@ -392,7 +392,7 @@ int ExecutionBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
}
// if _buffer.size() is > 0 then _pos points to a valid place . . .
BlockCollector collector;
BlockCollector collector(&_engine->_itemBlockManager);
while (skipped < atLeast) {
if (_buffer.empty()) {
@ -464,7 +464,7 @@ int ExecutionBlock::getOrSkipSome(size_t atLeast, size_t atMost, bool skipping,
TRI_ASSERT(result == nullptr);
if (!skipping) {
result = collector.steal(_engine->getQuery()->resourceMonitor());
result = collector.steal();
}
return TRI_ERROR_NO_ERROR;

View File

@ -50,8 +50,8 @@ IndexBlock::IndexBlock(ExecutionEngine* engine, IndexNode const* en)
_cursor(nullptr),
_cursors(_indexes.size()),
_condition(en->_condition->root()),
_hasV8Expression(false) {
_hasV8Expression(false),
_collector(&_engine->_itemBlockManager) {
_mmdr.reset(new ManagedDocumentResult);
if (_condition != nullptr) {
@ -498,7 +498,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) {
traceGetSomeBegin();
if (_done) {
traceGetSomeEnd(nullptr);
return _collector.steal(_engine->getQuery()->resourceMonitor());
return _collector.steal();
}
std::unique_ptr<AqlItemBlock> res;
@ -514,7 +514,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) {
if (!ExecutionBlock::getBlock(toFetch, toFetch) || (!initIndexes())) {
_done = true;
traceGetSomeEnd(nullptr);
return _collector.steal(_engine->getQuery()->resourceMonitor());
return _collector.steal();
}
_pos = 0; // this is in the first block
@ -536,7 +536,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) {
if (!ExecutionBlock::getBlock(DefaultBatchSize(), DefaultBatchSize())) {
_done = true;
traceGetSomeEnd(nullptr);
return _collector.steal(_engine->getQuery()->resourceMonitor());
return _collector.steal();
}
_pos = 0; // this is in the first block
}
@ -544,7 +544,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) {
if (!initIndexes()) {
_done = true;
traceGetSomeEnd(nullptr);
return _collector.steal(_engine->getQuery()->resourceMonitor());
return _collector.steal();
}
readIndex(atMost);
}
@ -588,7 +588,7 @@ AqlItemBlock* IndexBlock::getSome(size_t atLeast, size_t atMost) {
TRI_ASSERT(res.get() == nullptr);
if (_collector.totalSize() >= atMost) {
res.reset(_collector.steal(_engine->getQuery()->resourceMonitor()));
res.reset(_collector.steal());
}
}

View File

@ -2406,7 +2406,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, std::unique_ptr<Executi
plan->registerNode(gatherNode);
TRI_ASSERT(remoteNode);
gatherNode->addDependency(remoteNode);
if (!elements.empty()) {
if (!elements.empty() && gatherNode->collection()->numberOfShards() > 1) {
gatherNode->setElements(elements);
}
@ -2810,10 +2810,12 @@ void arangodb::aql::distributeSortToClusterRule(Optimizer* opt,
// then unlink the filter/calculator from the plan
plan->unlinkNode(inspectNode);
// and re-insert into plan in front of the remoteNode
if(thisSortNode->_reinsertInCluster){
if (thisSortNode->_reinsertInCluster){
plan->insertDependency(rn, inspectNode);
}
gatherNode->setElements(thisSortNode->getElements());
if (gatherNode->collection()->numberOfShards() > 1) {
gatherNode->setElements(thisSortNode->getElements());
}
modified = true;
// ready to rumble!
}