1
0
Fork 0

clone plan for DBServers

This commit is contained in:
Jan Steemann 2014-09-29 15:57:53 +02:00
parent 7c78bd1d61
commit ec1e6552ca
4 changed files with 167 additions and 70 deletions

View File

@ -123,8 +123,8 @@ static ExecutionBlock* createBlock (ExecutionEngine* engine,
case ExecutionNode::REMOTE: {
return new RemoteBlock(engine,
static_cast<RemoteNode const*>(en),
"", // TODO: server
"", // TODO: ownname
"", // TODO: server
"", // TODO: ownname
""); // TODO: queryId
}
case ExecutionNode::ILLEGAL: {
@ -340,24 +340,28 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
void buildEngineDBServer (EngineInfo& info,
void buildEngineDBServer (EngineInfo const& info,
QueryId connectedId) {
Collection* collection = nullptr;
for (auto en = info.nodes.rbegin(); en != info.nodes.rend(); ++en) {
if ((*en)->getType() == ExecutionNode::REMOTE) {
// remove the node's dependencies so it becomes a terminal node
collection = const_cast<Collection*>(static_cast<RemoteNode*>((*en))->collection());
(*en)->removeDependencies();
// we should only have one remote node
break;
// find the collection to be used
if ((*en)->getType() == ExecutionNode::ENUMERATE_COLLECTION) {
collection = const_cast<Collection*>(static_cast<EnumerateCollectionNode*>((*en))->collection());
}
else if ((*en)->getType() == ExecutionNode::INDEX_RANGE) {
collection = const_cast<Collection*>(static_cast<IndexRangeNode*>((*en))->collection());
}
else if ((*en)->getType() == ExecutionNode::INSERT ||
(*en)->getType() == ExecutionNode::UPDATE ||
(*en)->getType() == ExecutionNode::REPLACE ||
(*en)->getType() == ExecutionNode::REMOVE) {
collection = const_cast<Collection*>(static_cast<ModificationNode*>((*en))->collection());
}
}
info.nodes.front()->removeParents();
TRI_ASSERT(collection != nullptr);
// now send the plan to the remote servers
auto cc = triagens::arango::ClusterComm::instance();
TRI_ASSERT(cc != nullptr);
@ -368,12 +372,52 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// iterate over all shards of the collection
for (auto shardId : shardIds) {
// copy the relevant fragment of the plan for each shard
ExecutionPlan plan(query->ast());
ExecutionNode const* current = info.nodes.front();
ExecutionNode* previous = nullptr;
// clone nodes until we reach a remote node
while (current != nullptr) {
bool stop = false;
if (current->getType() == ExecutionNode::REMOTE) {
// TODO: inject connectedID and coordinator server name into clone of RemoteNode
// we'll stop after a remote
stop = true;
}
auto clone = current->clone(&plan, false);
plan.registerNode(clone);
if (previous == nullptr) {
plan.root(clone);
}
if (previous != nullptr) {
previous->addDependency(clone);
}
auto const& deps = current->getDependencies();
if (deps.size() != 1) {
stop = true;
}
if (stop) {
break;
}
previous = clone;
current = deps[0];
}
// inject the current shard id into the collection
collection->setCurrentShard(shardId);
// create a JSON representation of the plan
triagens::basics::Json result(triagens::basics::Json::Array);
triagens::basics::Json jsonNodesList(info.nodes.front()->toJson(TRI_UNKNOWN_MEM_ZONE, true));
triagens::basics::Json jsonNodesList(plan.root()->toJson(TRI_UNKNOWN_MEM_ZONE, true));
// add the collection
triagens::basics::Json jsonCollectionsList(triagens::basics::Json::List);
@ -387,7 +431,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
std::unique_ptr<std::string> body(new std::string(triagens::basics::JsonHelper::toString(result.json())));
// std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n";
std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n";
// TODO: pass connectedId to the shard so it can fetch data using the correct query id
auto headers = new std::map<std::string, std::string>;
@ -422,13 +466,13 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
nrok++;
}
else {
// std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
}
}
delete res;
}
// std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n";
std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n";
if (nrok != (int) shardIds.size()) {
// TODO: provide sensible error message with more details

View File

@ -265,14 +265,6 @@ namespace triagens {
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief forcefully remove all parents of the node
////////////////////////////////////////////////////////////////////////////////
void removeParents () {
_parents.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a dependency, returns true if the pointer was found and
/// removed, please note that this does not delete ep!
@ -341,7 +333,8 @@ namespace triagens {
/// @brief clone execution Node recursively, this makes the class abstract
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const = 0;
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const = 0;
// make class abstract
////////////////////////////////////////////////////////////////////////////////
@ -352,7 +345,7 @@ namespace triagens {
ExecutionNode* theClone) const {
auto it = _dependencies.begin();
while (it != _dependencies.end()) {
auto c = (*it)->clone(plan);
auto c = (*it)->clone(plan, true);
try {
c->_parents.push_back(theClone);
theClone->_dependencies.push_back(c);
@ -641,9 +634,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new SingletonNode(plan, _id);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -713,9 +709,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new EnumerateCollectionNode(plan, _id, _vocbase, _collection, _outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -866,9 +865,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new EnumerateListNode(plan, _id, _inVariable, _outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1001,7 +1003,8 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
std::vector<std::vector<RangeInfo>> ranges;
for (size_t i = 0; i < _ranges.size(); i++){
ranges.push_back(std::vector<RangeInfo>());
@ -1012,7 +1015,9 @@ namespace triagens {
}
auto c = new IndexRangeNode(plan, _id, _vocbase, _collection,
_outVariable, _index, ranges, _reverse);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1153,9 +1158,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new LimitNode(plan, _id, _offset, _limit);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1244,10 +1252,13 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new CalculationNode(plan, _id, _expression->clone(),
_outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1382,10 +1393,13 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
auto c = new SubqueryNode(plan, _id, _subquery->clone(plan),
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new SubqueryNode(plan, _id, _subquery->clone(plan, true),
_outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1512,9 +1526,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new FilterNode(plan, _id, _inVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1669,9 +1686,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new SortNode(plan, _id, _elements, _stable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1797,9 +1817,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new AggregateNode(plan, _id, _aggregateVariables, _outVariable, _variableMap);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -1919,9 +1942,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new ReturnNode(plan, _id, _inVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2107,9 +2133,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new RemoveNode(plan, _id, _vocbase, _collection, _options, _inVariable, _outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2221,10 +2250,13 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new InsertNode(plan, _id, _vocbase, _collection,
_options, _inVariable, _outVariable);
cloneDependencies(plan,c);
if (withDependencies) {
cloneDependencies(plan,c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2338,9 +2370,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new UpdateNode(plan, _id, _vocbase, _collection, _options, _inDocVariable, _inKeyVariable, _outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2464,11 +2499,14 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new ReplaceNode(plan, _id, _vocbase, _collection,
_options, _inDocVariable, _inKeyVariable,
_outVariable);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2580,9 +2618,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new NoResultsNode(plan, _id);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2646,9 +2687,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new RemoteNode(plan, _id, _vocbase, _collection);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2748,9 +2792,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new ScatterNode(plan, _id, _vocbase, _collection);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}
@ -2846,9 +2893,12 @@ namespace triagens {
/// @brief clone ExecutionNode recursively
////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const {
virtual ExecutionNode* clone (ExecutionPlan* plan,
bool withDependencies) const {
auto c = new GatherNode(plan, _id, _vocbase, _collection);
cloneDependencies(plan, c);
if (withDependencies) {
cloneDependencies(plan, c);
}
return static_cast<ExecutionNode*>(c);
}

View File

@ -1160,10 +1160,15 @@ class CloneNodeAdder : public WalkerWorker<ExecutionNode> {
}
};
////////////////////////////////////////////////////////////////////////////////
/// @brief clone an existing execution plan
////////////////////////////////////////////////////////////////////////////////
ExecutionPlan* ExecutionPlan::clone () {
auto plan = new ExecutionPlan(_ast);
try {
plan->_root = _root->clone(plan);
plan->_root = _root->clone(plan, true);
plan->_nextId = _nextId;
plan->_appliedRules = _appliedRules;
CloneNodeAdder adder(plan);

View File

@ -54,7 +54,7 @@ namespace triagens {
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
protected:
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief create the plan
@ -62,8 +62,6 @@ namespace triagens {
ExecutionPlan (Ast*);
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the plan, frees all assigned nodes
////////////////////////////////////////////////////////////////////////////////