1
0
Fork 0

SortExecutor optimization for few lines from upstream (#8192)

* Added an optimization where the SortExecutor prefetches all rows from above and only allocates a result block of the expected size

* Fixed catch tests

* APplied review fixes
This commit is contained in:
Michael Hackstein 2019-02-19 13:25:28 +01:00 committed by GitHub
parent e3f5a88762
commit cb4d870c6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 147 additions and 69 deletions

View File

@ -33,9 +33,17 @@ using namespace arangodb::aql;
std::pair<ExecutionState, AqlItemMatrix const*> AllRowsFetcher::fetchAllRows() {
// Avoid unnecessary upstream calls
if (_upstreamState == ExecutionState::DONE) {
return {ExecutionState::DONE, nullptr};
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix.get()};
}
if (fetchUntilDone() == ExecutionState::WAITING) {
return {ExecutionState::WAITING, nullptr};
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix.get()};
}
ExecutionState AllRowsFetcher::fetchUntilDone() {
if (_aqlItemMatrix == nullptr) {
_aqlItemMatrix = std::make_unique<AqlItemMatrix>(getNrInputRegisters());
}
@ -47,7 +55,7 @@ std::pair<ExecutionState, AqlItemMatrix const*> AllRowsFetcher::fetchAllRows() {
std::tie(state, block) = fetchBlock();
if (state == ExecutionState::WAITING) {
TRI_ASSERT(block == nullptr);
return {ExecutionState::WAITING, nullptr};
return state;
}
if (block == nullptr) {
TRI_ASSERT(state == ExecutionState::DONE);
@ -56,9 +64,21 @@ std::pair<ExecutionState, AqlItemMatrix const*> AllRowsFetcher::fetchAllRows() {
}
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
TRI_ASSERT(state == ExecutionState::DONE);
return state;
}
return {ExecutionState::DONE, _aqlItemMatrix.get()};
std::pair<ExecutionState, size_t> AllRowsFetcher::preFetchNumberOfRows() {
if (_upstreamState == ExecutionState::DONE) {
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix->size()};
}
if (fetchUntilDone() == ExecutionState::WAITING) {
return {ExecutionState::WAITING, 0};
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix->size()};
}
AllRowsFetcher::AllRowsFetcher(BlockFetcher<false>& executionBlock)

View File

@ -62,12 +62,12 @@ class AllRowsFetcher {
* ExecutionState:
* WAITING => IO going on, immediatly return to caller.
* DONE => No more to expect from Upstream, if you are done with
* this row return DONE to caller.
* HASMORE => There is potentially more from above, call again if
* you need more input.
* this Matrix return DONE to caller.
* HASMORE => Cannot be returned here
*
* AqlItemRow:
* If WAITING => Do not use this Row, it is a nullptr.
* If HASMORE => The Row is guaranteed to not be a nullptr.
* If HASMORE => impossible
* If DONE => Row can be a nullptr (nothing received) or valid.
*/
TEST_VIRTUAL std::pair<ExecutionState, AqlItemMatrix const*> fetchAllRows();
@ -80,6 +80,25 @@ class AllRowsFetcher {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
};
/**
* @brief Prefetch the number of rows that will be returned from upstream.
* calling this function will render the fetchAllRows() a noop function
* as this function will already fill the local result caches.
*
* @return A pair with the following properties:
* ExecutionState:
* WAITING => IO going on, immediatly return to caller.
* DONE => No more to expect from Upstream, if you are done with
* this Matrix return DONE to caller.
* HASMORE => Cannot be returned here
*
* AqlItemRow:
* If WAITING => Do not use this number, it is 0.
* If HASMORE => impossible
* If DONE => Number contains the correct number of rows upstream.
*/
TEST_VIRTUAL std::pair<ExecutionState, size_t> preFetchNumberOfRows();
private:
BlockFetcher<false>* _blockFetcher;
@ -97,6 +116,11 @@ class AllRowsFetcher {
* @brief Delegates to ExecutionBlock::fetchBlock()
*/
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>> fetchBlock();
/**
* @brief Fetch blocks from upstream until done
*/
ExecutionState fetchUntilDone();
};
} // namespace aql

View File

@ -76,8 +76,13 @@ class ConstFetcher {
void injectBlock(std::shared_ptr<AqlItemBlockShell> block);
// Argument will be ignored!
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>>
fetchBlockForPassthrough(size_t);
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>> fetchBlockForPassthrough(size_t);
std::pair<ExecutionState, size_t> preFetchNumberOfRows() {
// This is not implemented for this fetcher
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
private:
/**

View File

@ -350,6 +350,26 @@ ExecutionBlockImpl<Executor>::requestWrappedBlock(size_t nrItems, RegisterId nrR
}
}
#endif
} else if (std::is_same<Executor, SortExecutor>::value) {
// The SortExecutor should refetch a block to save memory in case if only few elements to sort
ExecutionState state;
size_t expectedRows = 0;
std::tie(state, expectedRows) = _rowFetcher.preFetchNumberOfRows();
if (state == ExecutionState::WAITING) {
TRI_ASSERT(expectedRows == 0);
return {state, nullptr};
}
// All Rows Fetcher cannot return HASMORE
TRI_ASSERT(state == ExecutionState::DONE);
nrItems = (std::min)(expectedRows, nrItems);
if (nrItems == 0) {
TRI_ASSERT(state == ExecutionState::DONE);
return {state, nullptr};
}
AqlItemBlock* block = requestBlock(nrItems, nrRegs);
blockShell =
std::make_unique<AqlItemBlockShell>(_engine->itemBlockManager(),
std::unique_ptr<AqlItemBlock>{block});
} else {
AqlItemBlock* block = requestBlock(nrItems, nrRegs);

View File

@ -58,7 +58,6 @@ class SingleRowFetcher {
SingleRowFetcher();
public:
/**
* @brief Fetch one new AqlItemRow from upstream.
* **Guarantee**: the pointer returned is valid only
@ -89,6 +88,12 @@ class SingleRowFetcher {
// TODO enable_if<passBlocksThrough>
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>> fetchBlockForPassthrough(size_t atMost);
std::pair<ExecutionState, size_t> preFetchNumberOfRows() {
// This is not implemented for this fetcher
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
private:
BlockFetcher<passBlocksThrough>* _blockFetcher;
@ -117,17 +122,16 @@ class SingleRowFetcher {
size_t _rowIndex;
/**
* @brief The current row, as returned last by fetchRow(). Must stay valid
* until the next fetchRow() call.
*/
* @brief The current row, as returned last by fetchRow(). Must stay valid
* until the next fetchRow() call.
*/
InputAqlItemRow _currentRow;
private:
/**
* @brief Delegates to ExecutionBlock::fetchBlock()
*/
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>>
fetchBlock(size_t atMost);
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>> fetchBlock(size_t atMost);
/**
* @brief Delegates to ExecutionBlock::getNrInputRegisters()
@ -146,7 +150,6 @@ class SingleRowFetcher {
TRI_ASSERT(indexIsValid());
return _rowIndex;
}
};
template <bool passBlocksThrough>

View File

@ -37,10 +37,9 @@ namespace arangodb {
namespace tests {
namespace aql {
SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
ExecutionState state;
AqlItemMatrix const *matrix = nullptr;
AqlItemMatrix const* matrix = nullptr;
GIVEN("there are no blocks upstream") {
VPackBuilder input;
@ -60,13 +59,14 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->empty());
REQUIRE(matrix->size() == 0);
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -92,14 +92,15 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->empty());
REQUIRE(matrix->size() == 0);
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -129,13 +130,14 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->size() == 1);
REQUIRE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -158,13 +160,14 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->size() == 1);
REQUIRE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -192,14 +195,15 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->getNrRegisters() == 1);
REQUIRE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -228,14 +232,15 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->getNrRegisters() == 1);
REQUIRE(matrix->getRow(0).getValue(0).slice().getInt() == 42);
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -250,15 +255,15 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
BlockFetcherMock<false> blockFetcherMock{monitor, 1};
// three 1-column matrices with 3, 2 and 1 rows, respectively
std::unique_ptr<AqlItemBlock> block1 = buildBlock<1>(&monitor, {{{1}}, {{2}}, {{3}}}),
std::unique_ptr<AqlItemBlock> block1 =
buildBlock<1>(&monitor, {{{1}}, {{2}}, {{3}}}),
block2 = buildBlock<1>(&monitor, {{{4}}, {{5}}}),
block3 = buildBlock<1>(&monitor, {{{6}}});
WHEN("the producer does not wait") {
blockFetcherMock.shouldReturn(ExecutionState::HASMORE, std::move(block1))
.andThenReturn(ExecutionState::HASMORE, std::move(block2))
.andThenReturn(ExecutionState::DONE, std::move(block3));
.andThenReturn(ExecutionState::HASMORE, std::move(block2))
.andThenReturn(ExecutionState::DONE, std::move(block3));
{
AllRowsFetcher testee(blockFetcherMock);
@ -272,19 +277,20 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->getNrRegisters() == 1);
for (int64_t i = 0; i < 6; i++) {
int64_t rowIdx = i;
int64_t rowValue = i+1;
int64_t rowValue = i + 1;
auto row = matrix->getRow(rowIdx);
REQUIRE(row.isInitialized());
REQUIRE(row.getValue(0).slice().getInt() == rowValue);
}
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -324,18 +330,19 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->getNrRegisters() == 1);
for (int64_t i = 0; i < 6; i++) {
int64_t rowIdx = i;
int64_t rowValue = i+1;
int64_t rowValue = i + 1;
REQUIRE(matrix->getRow(rowIdx).getValue(0).slice().getInt() == rowValue);
}
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());
@ -344,18 +351,16 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
WHEN("the producer waits and does not return DONE asap") {
blockFetcherMock.shouldReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::HASMORE, std::move(block1))
.andThenReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::HASMORE, std::move(block2))
.andThenReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::HASMORE, std::move(block3))
.andThenReturn(ExecutionState::DONE, nullptr)
;
.andThenReturn(ExecutionState::HASMORE, std::move(block1))
.andThenReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::HASMORE, std::move(block2))
.andThenReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::HASMORE, std::move(block3))
.andThenReturn(ExecutionState::DONE, nullptr);
{
AllRowsFetcher testee(blockFetcherMock);
THEN("the fetcher should return WAITING three times") {
// wait when fetching the 1st and 2nd block
std::tie(state, matrix) = testee.fetchAllRows();
@ -378,18 +383,19 @@ SCENARIO("AllRowsFetcher", "[AQL][EXECUTOR][FETCHER]") {
REQUIRE(matrix->getNrRegisters() == 1);
for (int64_t i = 0; i < 6; i++) {
int64_t rowIdx = i;
int64_t rowValue = i+1;
int64_t rowValue = i + 1;
REQUIRE(matrix->getRow(rowIdx).getValue(0).slice().getInt() == rowValue);
}
AND_THEN("null should be returned") {
std::tie(state, matrix) = testee.fetchAllRows();
AND_THEN("the same matrix should be returned") {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
REQUIRE(state == ExecutionState::DONE);
REQUIRE(matrix == nullptr);
REQUIRE(matrix2 == matrix);
}
}
}
} // testee is destroyed here
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
REQUIRE(blockFetcherMock.allBlocksFetched());