1
0
Fork 0

add database and collection info to gather nodes

This commit is contained in:
Jan Steemann 2014-09-26 13:58:02 +02:00
parent cb3d138507
commit f462c5f620
5 changed files with 122 additions and 24 deletions

View File

@ -31,6 +31,8 @@
#define ARANGODB_AQL_COLLECTION_H 1 #define ARANGODB_AQL_COLLECTION_H 1
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Cluster/ClusterInfo.h"
#include "Utils/Exception.h"
#include "VocBase/document-collection.h" #include "VocBase/document-collection.h"
#include "VocBase/transaction.h" #include "VocBase/transaction.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
@ -124,6 +126,20 @@ namespace triagens {
return static_cast<size_t>(numDocuments); return static_cast<size_t>(numDocuments);
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the shard information for a collection
////////////////////////////////////////////////////////////////////////////////
std::map<std::string, std::string> shardIds () const {
auto clusterInfo = triagens::arango::ClusterInfo::instance();
auto collectionInfo = clusterInfo->getCollection(std::string(vocbase->_name), name);
if (collectionInfo.get() == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "collection not found");
}
return collectionInfo.get()->shardIds();
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- public variables // --SECTION-- public variables
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -216,6 +216,16 @@ struct Instanciator : public WalkerWorker<ExecutionNode> {
// --SECTION-- public functions // --SECTION-- public functions
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief add a block to the engine
////////////////////////////////////////////////////////////////////////////////
void ExecutionEngine::addBlock (ExecutionBlock* block) {
TRI_ASSERT(block != nullptr);
_blocks.push_back(block);
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief create an execution engine from a plan /// @brief create an execution engine from a plan
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -229,10 +239,9 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (AQL_TRANSACTION_V8* trx,
if (! plan->varUsageComputed()) { if (! plan->varUsageComputed()) {
plan->findVarUsage(); plan->findVarUsage();
} }
auto inst = new Instanciator(engine); std::unique_ptr<Instanciator> inst(new Instanciator(engine));
plan->root()->walk(inst); plan->root()->walk(inst.get());
auto root = inst->root; auto root = inst.get()->root;
delete inst;
root->staticAnalysis(); root->staticAnalysis();
root->initialize(); root->initialize();
@ -248,20 +257,6 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (AQL_TRANSACTION_V8* trx,
} }
} }
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief add a block to the engine
////////////////////////////////////////////////////////////////////////////////
void ExecutionEngine::addBlock (ExecutionBlock* block) {
TRI_ASSERT(block != nullptr);
_blocks.push_back(block);
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE // --SECTION-- END-OF-FILE
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -472,7 +472,7 @@ void EnumerateCollectionNode::toJsonHelper (triagens::basics::Json& nodes,
// Now put info about vocbase and cid in there // Now put info about vocbase and cid in there
json("database", triagens::basics::Json(_vocbase->_name)) json("database", triagens::basics::Json(_vocbase->_name))
("collection", triagens::basics::Json(_collection->name)) ("collection", triagens::basics::Json(_collection->name))
("outVariable", _outVariable->toJson()); ("outVariable", _outVariable->toJson());
// And add it: // And add it:
@ -1564,7 +1564,8 @@ void RemoteNode::toJsonHelper (triagens::basics::Json& nodes,
ScatterNode::ScatterNode (ExecutionPlan* plan, ScatterNode::ScatterNode (ExecutionPlan* plan,
triagens::basics::Json const& base) triagens::basics::Json const& base)
: ExecutionNode(plan, base) { : ExecutionNode(plan, base),
_collection(plan->getAst()->query()->collections()->get(JsonHelper::checkAndGetStringValue(base.json(), "collection"))) {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1579,6 +1580,9 @@ void ScatterNode::toJsonHelper (triagens::basics::Json& nodes,
return; return;
} }
json("database", triagens::basics::Json(_vocbase->_name))
("collection", triagens::basics::Json(_collection->name));
// And add it: // And add it:
nodes(json); nodes(json);
} }

View File

@ -938,6 +938,22 @@ namespace triagens {
return INDEX_RANGE; return INDEX_RANGE;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief return the database
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_t* vocbase () const {
return _vocbase;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection
////////////////////////////////////////////////////////////////////////////////
Collection* collection () const {
return _collection;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief export to JSON /// @brief export to JSON
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1950,6 +1966,29 @@ namespace triagens {
virtual void toJsonHelper (triagens::basics::Json& json, virtual void toJsonHelper (triagens::basics::Json& json,
TRI_memory_zone_t* zone, TRI_memory_zone_t* zone,
bool) const override; bool) const override;
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief return the database
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_t* vocbase () const {
return _vocbase;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection
////////////////////////////////////////////////////////////////////////////////
Collection* collection () const {
return _collection;
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- protected variables // --SECTION-- protected variables
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -2603,8 +2642,13 @@ namespace triagens {
public: public:
ScatterNode (ExecutionPlan* plan, size_t id) ScatterNode (ExecutionPlan* plan,
: ExecutionNode(plan, id) { size_t id,
TRI_vocbase_t* vocbase,
Collection const* collection)
: ExecutionNode(plan, id),
_vocbase(vocbase),
_collection(collection) {
} }
ScatterNode (ExecutionPlan*, triagens::basics::Json const& base); ScatterNode (ExecutionPlan*, triagens::basics::Json const& base);
@ -2630,7 +2674,7 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
virtual ExecutionNode* clone (ExecutionPlan* plan) const { virtual ExecutionNode* clone (ExecutionPlan* plan) const {
auto c = new ScatterNode(plan, _id); auto c = new ScatterNode(plan, _id, _vocbase, _collection);
cloneDependencies(plan, c); cloneDependencies(plan, c);
return static_cast<ExecutionNode*>(c); return static_cast<ExecutionNode*>(c);
} }
@ -2643,6 +2687,20 @@ namespace triagens {
return 1; return 1;
} }
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the underlying database
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_t* _vocbase;
////////////////////////////////////////////////////////////////////////////////
/// @brief the underlying collection
////////////////////////////////////////////////////////////////////////////////
Collection const* _collection;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -1640,8 +1640,33 @@ int triagens::aql::distributeInCluster (Optimizer* opt,
bool const isRootNode = plan->isRoot(node); bool const isRootNode = plan->isRoot(node);
plan->unlinkNode(node, isRootNode); plan->unlinkNode(node, isRootNode);
auto const nodeType = node->getType();
// extract database and collection from plan node
TRI_vocbase_t* vocbase = nullptr;
Collection const* collection = nullptr;
if (nodeType == ExecutionNode::ENUMERATE_COLLECTION) {
vocbase = static_cast<EnumerateCollectionNode*>(node)->vocbase();
collection = static_cast<EnumerateCollectionNode*>(node)->collection();
}
else if (nodeType == ExecutionNode::INDEX_RANGE) {
vocbase = static_cast<IndexRangeNode*>(node)->vocbase();
collection = static_cast<IndexRangeNode*>(node)->collection();
}
else if (nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::UPDATE ||
nodeType == ExecutionNode::REPLACE ||
nodeType == ExecutionNode::REMOVE) {
vocbase = static_cast<ModificationNode*>(node)->vocbase();
collection = static_cast<ModificationNode*>(node)->collection();
}
else {
TRI_ASSERT(false);
}
// insert a scatter node // insert a scatter node
ExecutionNode* scatterNode = new ScatterNode(plan, plan->nextId()); ExecutionNode* scatterNode = new ScatterNode(plan, plan->nextId(), vocbase, collection);
plan->registerNode(scatterNode); plan->registerNode(scatterNode);
scatterNode->addDependency(deps[0]); scatterNode->addDependency(deps[0]);