mirror of https://gitee.com/bigwinds/arangodb
change algorithm a bit
This commit is contained in:
parent
ce539002e1
commit
cebf0fbb1a
|
@ -73,12 +73,9 @@ std::pair<ExecutionState, NoStats> UnsortedGatherExecutor::produceRows(OutputAql
|
||||||
ExecutionState state;
|
ExecutionState state;
|
||||||
InputAqlItemRow inputRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
InputAqlItemRow inputRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
|
|
||||||
bool more = false;
|
size_t x;
|
||||||
bool waiting = false;
|
for (x = 0; x < _numberDependencies; ++x) {
|
||||||
|
size_t i = (_currentDependency + x) % _numberDependencies;
|
||||||
size_t i = _currentDependency;
|
|
||||||
for (size_t x = 0; x < _numberDependencies; ++x) {
|
|
||||||
i = (_currentDependency + x) % _numberDependencies;
|
|
||||||
|
|
||||||
if (_upstream[i] == ExecutionState::DONE) {
|
if (_upstream[i] == ExecutionState::DONE) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -98,23 +95,25 @@ std::pair<ExecutionState, NoStats> UnsortedGatherExecutor::produceRows(OutputAql
|
||||||
}
|
}
|
||||||
|
|
||||||
_upstream[i] = state;
|
_upstream[i] = state;
|
||||||
if (state == ExecutionState::HASMORE) {
|
|
||||||
more = true;
|
|
||||||
} else if (state == ExecutionState::WAITING) {
|
|
||||||
waiting = true;
|
|
||||||
}
|
|
||||||
if (output.isFull()) {
|
if (output.isFull()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_currentDependency = i;
|
_currentDependency = x;
|
||||||
|
|
||||||
NoStats stats;
|
NoStats stats;
|
||||||
if (more) {
|
size_t numWaiting = 0;
|
||||||
|
for (x = 0; x < _numberDependencies; ++x) {
|
||||||
|
if (_upstream[x] == ExecutionState::HASMORE) {
|
||||||
return {ExecutionState::HASMORE, stats};
|
return {ExecutionState::HASMORE, stats};
|
||||||
} else if (waiting) {
|
} else if (_upstream[x] == ExecutionState::WAITING) {
|
||||||
|
numWaiting++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (numWaiting > 0) {
|
||||||
return {ExecutionState::WAITING, stats};
|
return {ExecutionState::WAITING, stats};
|
||||||
}
|
}
|
||||||
|
|
||||||
TRI_ASSERT(std::all_of(_upstream.begin(), _upstream.end(), [](auto const& s) { return s == ExecutionState::DONE; } ));
|
TRI_ASSERT(std::all_of(_upstream.begin(), _upstream.end(), [](auto const& s) { return s == ExecutionState::DONE; } ));
|
||||||
return {ExecutionState::DONE, stats};
|
return {ExecutionState::DONE, stats};
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
#include "Aql/ExecutionState.h"
|
#include "Aql/ExecutionState.h"
|
||||||
#include "Aql/ExecutorInfos.h"
|
#include "Aql/ExecutorInfos.h"
|
||||||
#include "Aql/InputAqlItemRow.h"
|
#include "Aql/InputAqlItemRow.h"
|
||||||
|
#include "Containers/SmallVector.h"
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
|
|
||||||
|
@ -76,9 +77,6 @@ class UnsortedGatherExecutor {
|
||||||
*/
|
*/
|
||||||
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
||||||
|
|
||||||
// std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
|
|
||||||
// std::tuple<ExecutionState, Stats, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
|
|
||||||
|
|
||||||
std::tuple<ExecutionState, Stats, size_t> skipRows(size_t atMost);
|
std::tuple<ExecutionState, Stats, size_t> skipRows(size_t atMost);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -88,7 +86,8 @@ class UnsortedGatherExecutor {
|
||||||
private:
|
private:
|
||||||
Fetcher& _fetcher;
|
Fetcher& _fetcher;
|
||||||
|
|
||||||
std::vector<ExecutionState> _upstream;
|
::arangodb::containers::SmallVector<ExecutionState>::allocator_type::arena_type _arena;
|
||||||
|
::arangodb::containers::SmallVector<ExecutionState> _upstream{_arena};
|
||||||
|
|
||||||
// Total Number of dependencies
|
// Total Number of dependencies
|
||||||
size_t _numberDependencies;
|
size_t _numberDependencies;
|
||||||
|
|
Loading…
Reference in New Issue