1
0
Fork 0

Merge branch 'devel' of github.com:triAGENS/ArangoDB into devel

This commit is contained in:
Willi Goesgens 2014-10-06 14:16:39 +02:00
commit 7dac9d8cc7
6 changed files with 210 additions and 1 deletions

View File

@ -3862,6 +3862,25 @@ size_t ScatterBlock::getClientId (std::string const& shardId) {
return ((*it).second);
}
// -----------------------------------------------------------------------------
// --SECTION-- class DistributeBlock
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
DistributeBlock::DistributeBlock (ExecutionEngine* engine,
DistributeNode const* ep,
std::vector<std::string> const& shardIds)
: ExecutionBlock(engine, ep),
_nrClients(shardIds.size()){
_shardIdMap.reserve(_nrClients);
for (size_t i = 0; i < _nrClients; i++) {
_shardIdMap.emplace(std::make_pair(shardIds[i], i));
}
}
// -----------------------------------------------------------------------------
// --SECTION-- class RemoteBlock
// -----------------------------------------------------------------------------

View File

@ -1664,6 +1664,59 @@ namespace triagens {
};
// -----------------------------------------------------------------------------
// --SECTION-- DistributeBlock
// -----------------------------------------------------------------------------
class DistributeBlock : public ExecutionBlock {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
DistributeBlock (ExecutionEngine* engine,
DistributeNode const* ep,
std::vector<std::string> const& shardIds);
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~DistributeBlock () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initialize
////////////////////////////////////////////////////////////////////////////////
int initialize () {
return ExecutionBlock::initialize();
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief getClientId: get the number <clientId> (used internally)
/// corresponding to <shardId>
////////////////////////////////////////////////////////////////////////////////
size_t getClientId (std::string const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief _shardIdMap: map from shardIds to clientNrs
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<std::string, size_t> _shardIdMap;
////////////////////////////////////////////////////////////////////////////////
/// @brief _nrClients: total number of clients
////////////////////////////////////////////////////////////////////////////////
size_t _nrClients;
};
// -----------------------------------------------------------------------------
// --SECTION-- RemoteBlock
// -----------------------------------------------------------------------------

View File

@ -117,6 +117,12 @@ static ExecutionBlock* createBlock (ExecutionEngine* engine,
static_cast<ScatterNode const*>(en),
shardIds);
}
case ExecutionNode::DISTRIBUTE: {
auto&& shardIds = static_cast<DistributeNode const*>(en)->collection()->shardIds();
return new DistributeBlock(engine,
static_cast<DistributeNode const*>(en),
shardIds);
}
case ExecutionNode::GATHER: {
return new GatherBlock(engine,
static_cast<GatherNode const*>(en));

View File

@ -199,6 +199,8 @@ ExecutionNode* ExecutionNode::fromJsonFactory (ExecutionPlan* plan,
}
case SCATTER:
return new ScatterNode(plan, oneNode);
case DISTRIBUTE:
return new DistributeNode(plan, oneNode);
case ILLEGAL: {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid node type");
}
@ -898,6 +900,7 @@ void ExecutionNode::VarOverview::after (ExecutionNode *en) {
case ExecutionNode::FILTER:
case ExecutionNode::LIMIT:
case ExecutionNode::SCATTER:
case ExecutionNode::DISTRIBUTE:
case ExecutionNode::GATHER:
case ExecutionNode::REMOTE:
case ExecutionNode::NORESULTS: {
@ -2356,6 +2359,26 @@ void ScatterNode::toJsonHelper (triagens::basics::Json& nodes,
nodes(json);
}
// -----------------------------------------------------------------------------
// --SECTION-- methods of DistributeNode
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief construct a distribute node from JSON
////////////////////////////////////////////////////////////////////////////////
DistributeNode::DistributeNode (ExecutionPlan* plan,
triagens::basics::Json const& base)
: ExecutionNode(plan, base),
_vocbase(plan->getAst()->query()->vocbase()),
_collection(plan->getAst()->query()->collections()->get(JsonHelper::checkAndGetStringValue(base.json(), "collection"))) {
}
void DistributeNode::toJsonHelper (triagens::basics::Json& nodes,
TRI_memory_zone_t* zone,
bool verbose) const {
}
// -----------------------------------------------------------------------------
// --SECTION-- methods of GatherNode
// -----------------------------------------------------------------------------

View File

@ -100,7 +100,8 @@ namespace triagens {
REPLACE = 16,
UPDATE = 17,
RETURN = 18,
NORESULTS = 19
NORESULTS = 19,
DISTRIBUTE = 20
};
// -----------------------------------------------------------------------------
@ -2999,6 +3000,107 @@ namespace triagens {
};
// -----------------------------------------------------------------------------
// --SECTION-- class DistributeNode
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief class DistributeNode
////////////////////////////////////////////////////////////////////////////////
class DistributeNode : public ExecutionNode {
friend class ExecutionBlock;
friend class DistributeBlock;
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor with an id
////////////////////////////////////////////////////////////////////////////////
public:
DistributeNode (ExecutionPlan* plan,
size_t id,
TRI_vocbase_t* vocbase,
Collection const* collection)
: ExecutionNode(plan, id),
_vocbase(vocbase),
_collection(collection) {
}
DistributeNode (ExecutionPlan*,
triagens::basics::Json const& base);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the type of the node
////////////////////////////////////////////////////////////////////////////////
NodeType getType () const override {
return DISTRIBUTE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief export to JSON
////////////////////////////////////////////////////////////////////////////////
virtual void toJsonHelper (triagens::basics::Json&,
TRI_memory_zone_t*,
bool) const override;
////////////////////////////////////////////////////////////////////////////////
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies,
bool withProperties) const {
auto c = new DistributeNode(plan, _id, _vocbase, _collection);
CloneHelper (c, plan, withDependencies, withProperties);
return static_cast<ExecutionNode*>(c);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief the cost of a Distribute node is 1
////////////////////////////////////////////////////////////////////////////////
double estimateCost () {
return 1;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the database
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_t* vocbase () const {
return _vocbase;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection
////////////////////////////////////////////////////////////////////////////////
Collection const* collection () const {
return _collection;
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the underlying database
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_t* _vocbase;
////////////////////////////////////////////////////////////////////////////////
/// @brief the underlying collection
////////////////////////////////////////////////////////////////////////////////
Collection const* _collection;
};
// -----------------------------------------------------------------------------
// --SECTION-- class GatherNode
// -----------------------------------------------------------------------------

View File

@ -731,6 +731,7 @@ class FilterToEnumCollFinder : public WalkerWorker<ExecutionNode> {
}
case EN::AGGREGATE:
case EN::SCATTER:
case EN::DISTRIBUTE:
case EN::GATHER:
case EN::REMOTE:
// in these cases we simply ignore the intermediate nodes, note
@ -1408,6 +1409,7 @@ class SortToIndexNode : public WalkerWorker<ExecutionNode> {
case EN::RETURN:
case EN::NORESULTS:
case EN::SCATTER:
case EN::DISTRIBUTE:
case EN::GATHER:
case EN::REMOTE:
case EN::ILLEGAL:
@ -1750,6 +1752,7 @@ int triagens::aql::distributeFilternCalcToCluster (Optimizer* opt,
case EN::RETURN:
case EN::NORESULTS:
case EN::SCATTER:
case EN::DISTRIBUTE:
case EN::GATHER:
case EN::ILLEGAL:
//do break
@ -1836,6 +1839,7 @@ int triagens::aql::distributeSortToCluster (Optimizer* opt,
case EN::RETURN:
case EN::NORESULTS:
case EN::SCATTER:
case EN::DISTRIBUTE:
case EN::GATHER:
case EN::ILLEGAL:
//do break
@ -1921,6 +1925,7 @@ class RemoteToSingletonViaCalcOnlyFinder: public WalkerWorker<ExecutionNode> {
case EN::SUBQUERY:
case EN::FILTER:
case EN::AGGREGATE:
case EN::DISTRIBUTE:
case EN::GATHER:
case EN::INSERT:
case EN::REMOVE:
@ -2131,6 +2136,7 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
case EN::SINGLETON:
case EN::ENUMERATE_LIST:
case EN::SUBQUERY:
case EN::DISTRIBUTE:
case EN::AGGREGATE:
case EN::INSERT:
case EN::REPLACE: