1
0
Fork 0

Performance improvement on Sort (#9198)

* Modified AqlItemMatrix to use two uint32 values to identify for in Block instead of one uint64 value encoding the two and calculation. Also now throw an error as soon as we exceed the limit there

* Removed new error code, thanks jsteemann for pointing out.
This commit is contained in:
Michael Hackstein 2019-06-05 12:45:17 +02:00 committed by GitHub
parent d07ee8f7cf
commit 8a4d0bc068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 132 deletions

View File

@ -25,7 +25,6 @@
#include <lib/Logger/Logger.h>
#include "Aql/AqlItemBlock.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/types.h"
#include "Basics/Common.h"
@ -40,6 +39,11 @@ namespace aql {
*/
class AqlItemMatrix {
public:
// uint32_t in this vector is a reasonable trade-off between performance and amount of data.
// With this values we can sort up to ~ 4.000.000.000 times 1000 elements in memory.
// Anything beyond that has a questionable runtime on nowadays hardware anyways.
typedef std::pair<uint32_t, uint32_t> RowIndex;
explicit AqlItemMatrix(size_t nrRegs) : _size(0), _nrRegs(nrRegs) {}
~AqlItemMatrix() = default;
@ -50,9 +54,22 @@ class AqlItemMatrix {
*/
void addBlock(SharedAqlItemBlockPtr blockPtr) {
TRI_ASSERT(blockPtr->getNrRegs() == getNrRegisters());
size_t blockSize = blockPtr->size();
_blocks.emplace_back(_size, std::move(blockPtr));
_size += blockSize;
// Test if we have more than uint32_t many blocks
if (ADB_UNLIKELY(_blocks.size() == std::numeric_limits<uint32_t>::max())) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_RESOURCE_LIMIT,
"Reaching the limit of AqlItems to SORT, please consider using a "
"limit after sorting.");
}
// Test if we have more than uint32_t many rows within a block
if (ADB_UNLIKELY(blockPtr->size() > std::numeric_limits<uint32_t>::max())) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_RESOURCE_LIMIT,
"Reaching the limit of AqlItems to SORT, please consider lowering "
"the batch size.");
}
_size += blockPtr->size();
_blocks.emplace_back(std::move(blockPtr));
}
/**
@ -60,19 +77,34 @@ class AqlItemMatrix {
*
* @return Number of Rows
*/
size_t size() const { return _size; }
uint64_t size() const noexcept { return _size; }
/**
* @brief Number of registers, i.e. width of the matrix.
*/
size_t getNrRegisters() const { return _nrRegs; };
size_t getNrRegisters() const noexcept { return _nrRegs; };
/**
* @brief Test if this matrix is empty
*
* @return True if empty
*/
bool empty() const { return _blocks.empty(); }
bool empty() const noexcept { return _blocks.empty(); }
std::vector<RowIndex> produceRowIndexes() const {
std::vector<RowIndex> result;
if (!empty()) {
result.reserve(size());
uint32_t index = 0;
for (auto const& block : _blocks) {
for (uint32_t row = 0; row < block->size(); ++row) {
result.emplace_back(index, row);
}
++index;
}
}
return result;
}
/**
* @brief Get the AqlItemRow at the given index
@ -81,97 +113,22 @@ class AqlItemMatrix {
*
* @return A single row in the Matrix
*/
InputAqlItemRow getRow(size_t index) const {
TRI_ASSERT(index < _size);
TRI_ASSERT(!empty());
// Most blocks will have DefaultBatchSize
size_t mostLikelyIndex =
(std::min)(static_cast<size_t>(floor(index / ExecutionBlock::DefaultBatchSize())),
_blocks.size() - 1);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
size_t iterations = 0;
#endif
size_t minIndex = 0;
size_t maxIndex = _blocks.size() - 1;
// Under the assumption that all but the last block will have the
// DefaultBatchSize size, this algorithm will hit the correct block in the
// first attempt.
// As a fallback, it does a binary search on the blocks.
while (true) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
++iterations;
TRI_ASSERT(iterations < _blocks.size() * 2);
#endif
TRI_ASSERT(minIndex <= maxIndex);
TRI_ASSERT(maxIndex < _blocks.size());
TRI_ASSERT(mostLikelyIndex <= maxIndex);
TRI_ASSERT(mostLikelyIndex >= minIndex);
auto& candidate = _blocks[mostLikelyIndex];
TRI_ASSERT(candidate.second != nullptr);
if (index < candidate.first) {
// This block starts after the requested index, go left.
// Assert that there is a left to go to. This could only go wrong if the
// candidate.first values are wrong.
TRI_ASSERT(mostLikelyIndex > 0);
// To assure yourself of the correctness, remember that / rounds down and
// 0 <= mostLikelyIndex / 2 < mostLikelyIndex
// Narrow down upper Border
maxIndex = mostLikelyIndex;
mostLikelyIndex = minIndex + (mostLikelyIndex - minIndex) / 2;
} else if (index >= candidate.first + candidate.second->size()) {
minIndex = mostLikelyIndex;
// This block ends before the requested index, go right.
// Assert that there is a right to go to. This could only go wrong if
// the candidate.first values are wrong.
TRI_ASSERT(mostLikelyIndex < _blocks.size() - 1);
// To assure yourself of the correctness, remember that / rounds down,
// numBlocksRightFromHere >= 1 (see assert above) and
// 0
// <= numBlocksRightFromHere / 2
// < numBlocksRightFromHere
// . Therefore
// mostLikelyIndex
// < mostLikelyIndex + 1 + numBlocksRightFromHere / 2
// < _blocks.size()
// .
//
// Please do not bother to shorten the following expression, the
// compilers are perfectly capable of doing that, and here the
// correctness is easier to see.
size_t const numBlocksRightFromHere = maxIndex - mostLikelyIndex;
mostLikelyIndex += 1 + numBlocksRightFromHere / 2;
} else {
#if 0
LOG_TOPIC_IF("c8c68", WARN, Logger::AQL, iterations > 1)
<< "Suboptimal AqlItemMatrix index lookup: Did " << iterations
<< " iterations.";
#endif
// Got it
return InputAqlItemRow{candidate.second, index - candidate.first};
}
InputAqlItemRow getRow(RowIndex index) const noexcept {
auto const& block = getBlock(index.first);
return InputAqlItemRow{block, index.second};
}
// We have asked for a row outside of this Vector
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"Internal Aql Logic Error: An executor "
"block is reading out of bounds.");
}
inline size_t numberOfBlocks() const noexcept { return _blocks.size(); }
inline size_t numberOfBlocks() const { return _blocks.size(); }
inline SharedAqlItemBlockPtr getBlock(size_t index) {
inline SharedAqlItemBlockPtr getBlock(uint32_t index) const noexcept {
TRI_ASSERT(index < numberOfBlocks());
return _blocks.at(index).second;
return _blocks[index];
}
private:
std::vector<std::pair<size_t, SharedAqlItemBlockPtr>> _blocks;
std::vector<SharedAqlItemBlockPtr> _blocks;
size_t _size;
uint64_t _size;
size_t _nrRegs;
};

View File

@ -24,7 +24,6 @@
#include "Basics/Common.h"
#include "Aql/AqlItemMatrix.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/OutputAqlItemRow.h"
@ -45,7 +44,7 @@ class OurLessThan {
std::vector<SortRegister> const& sortRegisters) noexcept
: _trx(trx), _input(input), _sortRegisters(sortRegisters) {}
bool operator()(size_t const& a, size_t const& b) const {
bool operator()(AqlItemMatrix::RowIndex const& a, AqlItemMatrix::RowIndex const& b) const {
InputAqlItemRow left = _input.getRow(a);
InputAqlItemRow right = _input.getRow(b);
for (auto const& reg : _sortRegisters) {
@ -156,10 +155,7 @@ void SortExecutor::doSorting() {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
TRI_ASSERT(_input != nullptr);
_sortedIndexes.reserve(_input->size());
for (size_t i = 0; i < _input->size(); ++i) {
_sortedIndexes.emplace_back(i);
}
_sortedIndexes = _input->produceRowIndexes();
// comparison function
OurLessThan ourLessThan(_infos.trx(), *_input, _infos.sortRegisters());
if (_infos.stable()) {

View File

@ -26,9 +26,10 @@
#ifndef ARANGOD_AQL_SORT_EXECUTOR_H
#define ARANGOD_AQL_SORT_EXECUTOR_H
#include "Aql/AqlItemBlockManager.h"
#include "Aql/AqlItemMatrix.h"
#include "Aql/ExecutionState.h"
#include "Aql/ExecutorInfos.h"
#include "AqlItemBlockManager.h"
#include <memory>
@ -40,7 +41,6 @@ class Methods;
namespace aql {
class AllRowsFetcher;
class AqlItemMatrix;
class ExecutorInfos;
class NoStats;
class OutputAqlItemRow;
@ -112,7 +112,7 @@ class SortExecutor {
AqlItemMatrix const* _input;
std::vector<size_t> _sortedIndexes;
std::vector<AqlItemMatrix::RowIndex> _sortedIndexes;
size_t _returnNext;
};

View File

@ -110,10 +110,12 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_done_immedia
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 1);
ASSERT_TRUE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(1, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
ASSERT_EQ(42, matrix->getRow(rowIndexes[0]).getValue(0).slice().getInt());
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
@ -139,10 +141,12 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_hasmore_then
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 1);
ASSERT_TRUE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(1, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
EXPECT_EQ(42, matrix->getRow(rowIndexes[0]).getValue(0).slice().getInt());
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
@ -172,10 +176,12 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_then_returns_d
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 1);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
ASSERT_TRUE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(1, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
EXPECT_EQ(42, matrix->getRow(rowIndexes[0]).getValue(0).slice().getInt());
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
@ -206,10 +212,12 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_returns_hasmor
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 1);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
ASSERT_TRUE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(1, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
EXPECT_EQ(42, matrix->getRow(rowIndexes[0]).getValue(0).slice().getInt());
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
@ -242,15 +250,16 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_does_not_wait) {
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 6);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(6, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
for (int64_t i = 0; i < 6; i++) {
int64_t rowIdx = i;
int64_t rowValue = i + 1;
auto row = matrix->getRow(rowIdx);
auto row = matrix->getRow(rowIndexes[i]);
ASSERT_TRUE(row.isInitialized());
ASSERT_TRUE(row.getValue(0).slice().getInt() == rowValue);
EXPECT_EQ(rowValue, row.getValue(0).slice().getInt());
}
AqlItemMatrix const* matrix2 = nullptr;
@ -296,13 +305,15 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits) {
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 6);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(6, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
for (int64_t i = 0; i < 6; i++) {
int64_t rowIdx = i;
int64_t rowValue = i + 1;
ASSERT_TRUE(matrix->getRow(rowIdx).getValue(0).slice().getInt() == rowValue);
auto row = matrix->getRow(rowIndexes[i]);
EXPECT_EQ(rowValue, row.getValue(0).slice().getInt());
}
AqlItemMatrix const* matrix2 = nullptr;
@ -349,13 +360,15 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits_and_does_not_
std::tie(state, matrix) = testee.fetchAllRows();
ASSERT_TRUE(state == ExecutionState::DONE);
ASSERT_TRUE(matrix != nullptr);
ASSERT_TRUE(!matrix->empty());
ASSERT_TRUE(matrix->size() == 6);
ASSERT_TRUE(matrix->getNrRegisters() == 1);
EXPECT_EQ(1, matrix->getNrRegisters());
EXPECT_FALSE(matrix->empty());
EXPECT_EQ(6, matrix->size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(matrix->size(), rowIndexes.size());
for (int64_t i = 0; i < 6; i++) {
int64_t rowIdx = i;
int64_t rowValue = i + 1;
ASSERT_TRUE(matrix->getRow(rowIdx).getValue(0).slice().getInt() == rowValue);
auto row = matrix->getRow(rowIndexes[i]);
EXPECT_EQ(rowValue, row.getValue(0).slice().getInt());
}
AqlItemMatrix const* matrix2 = nullptr;
@ -369,6 +382,6 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits_and_does_not_
ASSERT_TRUE(dependencyProxyMock.numFetchBlockCalls() == 7);
}
} // namespace arangodb
} // namespace aql
} // namespace tests
} // namespace arangodb