//////////////////////////////////////////////////////////////////////////////// /// @brief Infrastructure for ExecutionBlocks (the execution engine) /// /// @file arangod/Aql/ExecutionBlock.h /// /// DISCLAIMER /// /// Copyright 2010-2014 triagens GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is triAGENS GmbH, Cologne, Germany /// /// @author Max Neunhoeffer /// @author Copyright 2014, triagens GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_AQL_EXECUTION_BLOCK_H #define ARANGODB_AQL_EXECUTION_BLOCK_H 1 #include #include #include "Aql/Types.h" #include "Aql/ExecutionPlan.h" #include "Utils/transactions.h" using namespace triagens::basics; struct TRI_json_s; namespace triagens { namespace aql { class ExecutionBlock { public: ExecutionBlock (ExecutionPlan const* ep) : _exePlan(ep), _done(false), _nrVars(10), _depth(0) { } virtual ~ExecutionBlock (); //////////////////////////////////////////////////////////////////////////////// /// @brief add a dependency //////////////////////////////////////////////////////////////////////////////// void addDependency (ExecutionBlock* ep) { _dependencies.push_back(ep); } //////////////////////////////////////////////////////////////////////////////// /// @brief get all dependencies //////////////////////////////////////////////////////////////////////////////// vector getDependencies () { return _dependencies; } //////////////////////////////////////////////////////////////////////////////// /// @brief remove a dependency, returns true if the pointer was found and /// removed, please note that this does not delete ep! //////////////////////////////////////////////////////////////////////////////// bool removeDependency (ExecutionBlock* ep) { auto it = _dependencies.begin(); while (it != _dependencies.end()) { if (*it == ep) { _dependencies.erase(it); return true; } ++it; } return false; } //////////////////////////////////////////////////////////////////////////////// /// @brief access the pos-th dependency //////////////////////////////////////////////////////////////////////////////// ExecutionBlock* operator[] (size_t pos) { if (pos > _dependencies.size()) { return nullptr; } else { return _dependencies.at(pos); } } //////////////////////////////////////////////////////////////////////////////// /// @brief functionality to walk an execution plan recursively //////////////////////////////////////////////////////////////////////////////// class WalkerWorker { public: WalkerWorker () {}; virtual ~WalkerWorker () {}; virtual void before (ExecutionBlock* eb) {}; virtual void after (ExecutionBlock* eb) {}; virtual void enterSubquery (ExecutionBlock* super, ExecutionBlock* sub) {}; virtual void leaveSubquery (ExecutionBlock* super, ExecutionBlock* sub) {}; }; void walk (WalkerWorker& worker); struct VarDefPlace { int depth; int index; VarDefPlace(int depth, int index) : depth(depth), index(index) {} }; virtual int staticAnalysisRecursion ( std::unordered_map& varTab, int& curVar); void staticAnalysis (); // Methods for execution: virtual int initialize () { for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) { (*it)->initialize(); } // FIXME: report errors from above return TRI_ERROR_NO_ERROR; } virtual int bind (std::map* params); std::map* getParameters (); virtual int execute () { for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) { (*it)->execute(); } // FIXME: report errors from above _done = false; return TRI_ERROR_NO_ERROR; } virtual int shutdown () { for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) { (*it)->shutdown(); } // FIXME: report errors from above return TRI_ERROR_NO_ERROR; } protected: bool bufferMore (size_t atLeast, size_t atMost) { std::vector > docs = _dependencies[0]->getSome(atLeast, atMost); if (docs.size() == 0) { return false; } for (auto it = docs.begin(); it != docs.end(); ++it) { _buffer.push_back(*it); } return true; } public: virtual shared_ptr getOne () { shared_ptr res; if (_buffer.size() != 0) { res = _buffer.front(); _buffer.pop_front(); return res; } return _dependencies[0]->getOne(); } virtual std::vector > getSome (size_t atLeast, size_t atMost) { std::vector > res; if (_done) { return res; } if (_buffer.size() < atLeast) { if (! bufferMore(atLeast-_buffer.size(), atMost-_buffer.size()) && _buffer.size() == 0) { _done = true; return res; } } size_t nr = _buffer.size(); if (nr > atMost) { nr = atMost; } while (nr-- > 0) { res.push_back(_buffer.front()); _buffer.pop_front(); } return res; } bool skip (int number); virtual int64_t count () { return _dependencies[0]->count(); } virtual int64_t remaining () { return _dependencies[0]->remaining() + _buffer.size(); } ExecutionPlan const* getPlan () { return _exePlan; } protected: ExecutionPlan const* _exePlan; std::vector _dependencies; std::deque > _buffer; bool _done; VariableId _nrVars; // will be filled in by staticAnalysis int _depth; // nesting depth of contexts, i.e. AqlItems, // coming out of this node public: static ExecutionBlock* instanciatePlan (ExecutionPlan const* ep); }; class SingletonBlock : public ExecutionBlock { public: SingletonBlock (SingletonPlan const* ep) : ExecutionBlock(ep) { } ~SingletonBlock () { } int initialize () { ExecutionBlock::initialize(); return TRI_ERROR_NO_ERROR; } int execute () { ExecutionBlock::execute(); return TRI_ERROR_NO_ERROR; } int shutdown () { return TRI_ERROR_NO_ERROR; } shared_ptr getOne () { if (_done) { return nullptr; } shared_ptr res(new AqlItem(_nrVars)); _done = true; return res; } vector > getSome (size_t atLeast, size_t atMost) { vector > res; if (_done) { return res; } res.emplace_back(new AqlItem(_nrVars)); _done = true; return res; } }; class EnumerateCollectionBlock : public ExecutionBlock { public: EnumerateCollectionBlock (EnumerateCollectionPlan const* ep) : ExecutionBlock(ep) { } ~EnumerateCollectionBlock () { } int initialize () { // TODO: this is very very inefficient // it must be implemented properly for production int res = ExecutionBlock::initialize(); if (res != TRI_ERROR_NO_ERROR) { return res; } auto p = reinterpret_cast(_exePlan); V8ReadTransaction trx(p->_vocbase, p->_collname); res = trx.begin(); vector docs; res = trx.read(docs); size_t const n = docs.size(); auto shaper = trx.documentCollection()->getShaper(); _allDocs.clear(); for (size_t i = 0; i < n; ++i) { TRI_shaped_json_t shaped; TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, docs[i]->getDataPtr()); _allDocs.push_back(new Json(shaper->_memoryZone, TRI_JsonShapedJson(shaper, &shaped))); } res = trx.finish(res); if (_allDocs.size() == 0) { _done = true; } _buffer.clear(); return res; } int shutdown () { int res = ExecutionBlock::shutdown(); // Tell all dependencies _allDocs.clear(); return res; } shared_ptr getOne () { if (_done) { return nullptr; } if (_buffer.size() == 0) { if (! bufferMore(1,1)) { _done = true; return nullptr; } _pos = 0; if (_allDocs.size() == 0) { // just in case _done = true; return nullptr; } } shared_ptr res(new AqlItem(_buffer.front(), 1)); res->setValue(0,0,new AqlValue( new Json(_allDocs[_pos]->copy()))); if (++_pos >= _allDocs.size()) { _buffer.pop_front(); } return res; } private: vector _allDocs; size_t _pos; }; class RootBlock : public ExecutionBlock { public: RootBlock (RootPlan const* ep) : ExecutionBlock(ep) { } ~RootBlock () { } virtual shared_ptr getOne () { // Fetch one from above, possibly using our _buffer: shared_ptr res = ExecutionBlock::getOne(); if (res == nullptr) { return res; } // Let's steal the actual result and throw away the vars: shared_ptr stripped(new AqlItem(1)); stripped->setValue(0, 0, res->getValue(0, 0)); res->setValue(0, 0, nullptr); return stripped; } virtual std::vector > getSome (size_t atLeast, size_t atMost) { std::vector > res = ExecutionBlock::getSome(atLeast, atMost); std::vector > stripped; if (res.size() > 0) { stripped.reserve(res.size()); for (auto it = res.begin(); it != res.end(); ++it) { auto a = new AqlItem(1); a->setValue(0, 0, (*it)->getValue(0, 0)); (*it)->setValue(0, 0, nullptr); stripped.emplace_back(a); } } return stripped; } }; } // namespace triagens::aql } // namespace triagens #endif // Local Variables: // mode: outline-minor // outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" // End: