1
0
Fork 0

better initialiseCursor and shutdown for ScatterBlock

This commit is contained in:
James 2014-10-06 10:30:28 +01:00
parent 3fae7606a4
commit aee0605742
2 changed files with 33 additions and 2 deletions

View File

@ -3617,7 +3617,8 @@ ScatterBlock::ScatterBlock (ExecutionEngine* engine,
ScatterNode const* ep,
std::vector<std::string> const& shardIds)
: ExecutionBlock(engine, ep),
_nrClients(shardIds.size()) {
_nrClients(shardIds.size()),
_initOrShutdown(true){
_shardIdMap.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
_shardIdMap.emplace(std::make_pair(shardIds[i], i));
@ -3629,20 +3630,36 @@ ScatterBlock::ScatterBlock (ExecutionEngine* engine,
////////////////////////////////////////////////////////////////////////////////
int ScatterBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
if (!_initOrShutdown) {
return TRI_ERROR_NO_ERROR;
}
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
_posForClient.clear();
_doneForClient.clear();
_doneForClient.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
_posForClient.push_back(std::make_pair(0, 0));
_doneForClient.push_back(false);
}
_initOrShutdown = false;
return TRI_ERROR_NO_ERROR;
}
int ScatterBlock::shutdown () {
if (!_initOrShutdown) {
return TRI_ERROR_NO_ERROR;
}
return ExecutionBlock::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
@ -3707,6 +3724,7 @@ bool ScatterBlock::hasMoreForShard (std::string const& shardId){
// _buffer.at(i) we are sending to <clientId>
if (pos.first > _buffer.size()) {
_initOrShutdown = true;
if (! getBlock(DefaultBatchSize, DefaultBatchSize)) {
_doneForClient.at(clientId) = true;
return false;
@ -3737,6 +3755,7 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast,
// pull more blocks from dependency if necessary . . .
if (pos.first >= _buffer.size()) {
_initOrShutdown = true;
if (! getBlock(atLeast, atMost)) {
_doneForClient.at(clientId) = true;
return TRI_ERROR_NO_ERROR;

View File

@ -1540,6 +1540,12 @@ namespace triagens {
int initializeCursor (AqlItemBlock* items, size_t pos);
////////////////////////////////////////////////////////////////////////////////
/// @brief initializeCursor
////////////////////////////////////////////////////////////////////////////////
int shutdown ();
////////////////////////////////////////////////////////////////////////////////
/// @brief remaining
////////////////////////////////////////////////////////////////////////////////
@ -1649,6 +1655,12 @@ namespace triagens {
size_t _nrClients;
////////////////////////////////////////////////////////////////////////////////
/// @brief _reinit: should we really initialiseCursor or shutdown?
////////////////////////////////////////////////////////////////////////////////
bool _initOrShutdown;
};
// -----------------------------------------------------------------------------