1
0
Fork 0

Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel

This commit is contained in:
Jan Steemann 2014-10-07 12:36:36 +02:00
commit aef384da1a
4 changed files with 238 additions and 8 deletions

View File

@ -3743,7 +3743,6 @@ int ScatterBlock::getOrSkipSomeForShard (size_t atLeast,
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
TRI_ASSERT(result == nullptr && skipped == 0);
size_t clientId = getClientId(shardId);
@ -3872,15 +3871,155 @@ size_t ScatterBlock::getClientId (std::string const& shardId) {
DistributeBlock::DistributeBlock (ExecutionEngine* engine,
DistributeNode const* ep,
std::vector<std::string> const& shardIds)
std::vector<std::string> const& shardIds,
std::string const& collectionName)
: ExecutionBlock(engine, ep),
_nrClients(shardIds.size()){
_nrClients(shardIds.size()),
_collectionName(collectionName){
_shardIdMap.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
_shardIdMap.emplace(std::make_pair(shardIds[i], i));
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getClientId: get the number <clientId> (used internally)
/// corresponding to <shardId>
////////////////////////////////////////////////////////////////////////////////
size_t DistributeBlock::getClientId (std::string const& shardId) {
auto it = _shardIdMap.find(shardId);
if (it == _shardIdMap.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "AQL: unknown shard id");
}
return ((*it).second);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getBlockForShard: try to get atLeast pairs into
/// _distBuffer.at(clientId);
////////////////////////////////////////////////////////////////////////////////
bool DistributeBlock::getBlockForShard(size_t atLeast,
size_t atMost,
size_t clientId) {
return false;
};
/* // from pos on check if we should send a block to the client
bool usesDefaultShardingAttributes = false; //FIXME is this right?
ClusterInfo::getResponsibleShard (_cid,
TRI_json_t const* json,
true,
shardId,
usesDefaultShardingAttributes);
if (_buffer.empty()) {
if (skipping) {
_dependencies[0]->skip(atLeast - skipped);
skipped = atLeast;
freeCollector();
return TRI_ERROR_NO_ERROR;
}
else {
if (! getBlock(atLeast - skipped,
(std::max)(atMost - skipped, DefaultBatchSize))) {
_done = true;
break; // must still put things in the result from the collector . . .
}
_pos = 0;
}
}
}*/
int DistributeBlock::getOrSkipSomeForShard (size_t atLeast,
size_t atMost,
bool skipping,
AqlItemBlock*& result,
size_t& skipped,
std::string const& shardId) {
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
TRI_ASSERT(result == nullptr && skipped == 0);
size_t clientId = getClientId(shardId);
if (_doneForClient.at(clientId)) {
return TRI_ERROR_NO_ERROR;
}
std::deque<pair<size_t, size_t>>& buf = _distBuffer.at(clientId);
vector<AqlItemBlock*> collector;
auto freeCollector = [&collector]() {
for (auto x : collector) {
delete x;
}
collector.clear();
};
try {
if (buf.empty()) {
if (! getBlockForShard(atLeast, atMost, clientId)) {
_doneForClient.at(clientId) = true;
return TRI_ERROR_NO_ERROR;
}
}
skipped = std::min(buf.size(), atMost);
if (skipping) {
for (size_t i = 0; i < skipped; i++){
buf.pop_front();
}
freeCollector();
return TRI_ERROR_NO_ERROR;
}
size_t i = 0;
while (i < skipped) {
std::vector<size_t> chosen;
size_t n = buf.front().first;
while (buf.front().first == n && i < skipped) {
chosen.push_back(buf.front().second);
buf.pop_front();
i++;
}
unique_ptr<AqlItemBlock> more(_buffer.at(n)->slice(chosen, 0, chosen.size()));
collector.push_back(more.get());
more.release(); // do not delete it!
}
}
catch (...) {
freeCollector();
throw;
}
if (! skipping) {
if (collector.size() == 1) {
result = collector[0];
collector.clear();
}
else if (! collector.empty()) {
try {
result = AqlItemBlock::concatenate(collector);
}
catch (...) {
freeCollector();
throw;
}
}
}
freeCollector();
//TODO clean up the _buffer and _distBuffer
return TRI_ERROR_NO_ERROR;
}
// -----------------------------------------------------------------------------
// --SECTION-- class RemoteBlock
// -----------------------------------------------------------------------------

View File

@ -1677,7 +1677,8 @@ namespace triagens {
DistributeBlock (ExecutionEngine* engine,
DistributeNode const* ep,
std::vector<std::string> const& shardIds);
std::vector<std::string> const& shardIds,
arango::CollectionID const& cid);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
@ -1694,6 +1695,62 @@ namespace triagens {
return ExecutionBlock::initialize();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getSome: shouldn't be used!!!
////////////////////////////////////////////////////////////////////////////////
virtual AqlItemBlock* getSome (size_t atLeast, size_t atMost) {
TRI_ASSERT(false);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief skipSome: shouldn't be used!!!
////////////////////////////////////////////////////////////////////////////////
virtual size_t skipSome (size_t atLeast, size_t atMost) {
TRI_ASSERT(false);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief hasMoreForShard: any more for shard <shardId>?
////////////////////////////////////////////////////////////////////////////////
bool hasMoreForShard (std::string const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief remainingForShard: remaining for shard <shardId>?
////////////////////////////////////////////////////////////////////////////////
int64_t remainingForShard (std::string const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief getOrSkipSomeForShard
////////////////////////////////////////////////////////////////////////////////
int getOrSkipSomeForShard (size_t atLeast, size_t atMost,
bool skipping, AqlItemBlock*& result, size_t& skipped,
std::string const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief getSomeForShard
////////////////////////////////////////////////////////////////////////////////
AqlItemBlock* getSomeForShard (size_t atLeast, size_t atMost,
std::string const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief skipSomeForShard
////////////////////////////////////////////////////////////////////////////////
size_t skipSomeForShard (size_t atLeast, size_t atMost, std::string
const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief skipForShard
////////////////////////////////////////////////////////////////////////////////
bool skipForShard (size_t number, std::string const& shardId);
private:
////////////////////////////////////////////////////////////////////////////////
@ -1703,11 +1760,27 @@ namespace triagens {
size_t getClientId (std::string const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief getBlockForShard: try to get at atLeast/aMost pairs into
/// _distBuffer.at(clientId).
////////////////////////////////////////////////////////////////////////////////
bool getBlockForShard (size_t atLeast,
size_t atMost,
size_t clientId);
////////////////////////////////////////////////////////////////////////////////
/// @brief _shardIdMap: map from shardIds to clientNrs
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<std::string, size_t> _shardIdMap;
////////////////////////////////////////////////////////////////////////////////
/// @brief _distBuffer.at(i) is a deque containing pairs (j,k) such that
// _buffer.at(j) row k should be sent to the client with id = i.
////////////////////////////////////////////////////////////////////////////////
std::vector<std::deque<std::pair<size_t, size_t>>> _distBuffer;
////////////////////////////////////////////////////////////////////////////////
/// @brief _nrClients: total number of clients
@ -1715,6 +1788,19 @@ namespace triagens {
size_t _nrClients;
////////////////////////////////////////////////////////////////////////////////
/// @brief _doneForClient: the analogue of _done: _doneForClient.at(i) = true
/// if we are done for the shard with clientId = i
////////////////////////////////////////////////////////////////////////////////
std::vector<bool> _doneForClient;
////////////////////////////////////////////////////////////////////////////////
/// @brief _cid: the id of the sharded collection
////////////////////////////////////////////////////////////////////////////////
std::string const& _collectionName;
};
// -----------------------------------------------------------------------------
// --SECTION-- RemoteBlock

View File

@ -121,7 +121,9 @@ static ExecutionBlock* createBlock (ExecutionEngine* engine,
auto&& shardIds = static_cast<DistributeNode const*>(en)->collection()->shardIds();
return new DistributeBlock(engine,
static_cast<DistributeNode const*>(en),
shardIds);
shardIds,
static_cast<DistributeNode const*>
(en)->collection()->getName());
}
case ExecutionNode::GATHER: {
return new GatherBlock(engine,

View File

@ -1668,12 +1668,14 @@ int triagens::aql::distributeInCluster (Optimizer* opt,
}
// insert a scatter node
ExecutionNode* scatterNode = new ScatterNode(plan, plan->nextId(), vocbase, collection);
ExecutionNode* scatterNode = new ScatterNode(plan, plan->nextId(),
vocbase, collection);
plan->registerNode(scatterNode);
scatterNode->addDependency(deps[0]);
// insert a remote node
ExecutionNode* remoteNode = new RemoteNode(plan, plan->nextId(), vocbase, collection, "", "", "");
ExecutionNode* remoteNode = new RemoteNode(plan, plan->nextId(), vocbase,
collection, "", "", "");
plan->registerNode(remoteNode);
remoteNode->addDependency(scatterNode);
@ -1687,7 +1689,8 @@ int triagens::aql::distributeInCluster (Optimizer* opt,
remoteNode->addDependency(node);
// insert a gather node
ExecutionNode* gatherNode = new GatherNode(plan, plan->nextId(), vocbase, collection);
ExecutionNode* gatherNode = new GatherNode(plan, plan->nextId(), vocbase,
collection);
plan->registerNode(gatherNode);
gatherNode->addDependency(remoteNode);