//////////////////////////////////////////////////////////////////////////////// /// @brief Implementation of the ExecutionBlock for Traversals /// /// @file arangod/Aql/TraversalBlock.cpp /// /// DISCLAIMER /// /// Copyright 2010-2014 triagens GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is triAGENS GmbH, Cologne, Germany /// /// @author Michael Hackstein /// @author Copyright 2014, triagens GmbH, Cologne, Germany /// @author Copyright 2014-2015, ArangoDB GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// #include "Aql/Ast.h" #include "Aql/ExecutionEngine.h" #include "Aql/ExecutionPlan.h" #include "Aql/Functions.h" #include "Aql/TraversalBlock.h" #include "Aql/ExecutionNode.h" #include "Basics/ScopeGuard.h" #include "V8/v8-globals.h" #include "V8Server/V8Traverser.h" #include "Cluster/ClusterTraverser.h" #include /// TODO: remove me. using namespace std; using namespace triagens::basics; using namespace triagens::arango; using namespace triagens::aql; using VertexId = triagens::arango::traverser::VertexId; // ----------------------------------------------------------------------------- // --SECTION-- class TraversalBlock // ----------------------------------------------------------------------------- TraversalBlock::TraversalBlock (ExecutionEngine* engine, TraversalNode const* ep) : ExecutionBlock(engine, ep), _posInPaths(0), _useRegister(false), _usedConstant(false), _vertexReg(0), _edgeReg(0), _pathReg(0), _condition((ep->condition()) ? ep->condition()->root(): nullptr), _expressions(ep->expressions()), _hasV8Expression(false) { triagens::arango::traverser::TraverserOptions opts; ep->fillTraversalOptions(opts); auto edgeColls = ep->edgeColls(); auto ast = ep->_plan->getAst(); for (auto& map : *_expressions) { for (size_t i = 0; i < map.second.size(); ++i) { SimpleTraverserExpression* it = dynamic_cast(map.second.at(i)); std::unique_ptr e(new Expression(ast, it->toEvaluate)); _hasV8Expression |= e->isV8(); std::unordered_set inVars; e->variables(inVars); it->expression = e.release(); // Prepare _inVars and _inRegs: _inVars.emplace_back(); std::vector& inVarsCur = _inVars.back(); _inRegs.emplace_back(); std::vector& inRegsCur = _inRegs.back(); for (auto const& v : inVars) { inVarsCur.emplace_back(v); auto it = ep->getRegisterPlan()->varInfo.find(v->id); TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); inRegsCur.emplace_back(it->second.registerId); } } } _resolver = new CollectionNameResolver(_trx->vocbase()); if (triagens::arango::ServerState::instance()->isCoordinator()) { _traverser.reset(new triagens::arango::traverser::ClusterTraverser( edgeColls, opts, std::string(_trx->vocbase()->_name, strlen(_trx->vocbase()->_name)), _resolver, _expressions )); } else { std::vector edgeCollections; for (auto const& coll : edgeColls) { TRI_voc_cid_t cid = _resolver->getCollectionId(coll); edgeCollections.push_back(_trx->documentCollection(cid)); } _traverser.reset(new triagens::arango::traverser::DepthFirstTraverser(edgeCollections, opts, _expressions)); } if (!ep->usesInVariable()) { _vertexId = ep->getStartVertex(); auto pos = _vertexId.find("/"); _startId = VertexId( _resolver->getCollectionIdCluster(_vertexId.substr(0, pos).c_str()), _vertexId.c_str() + pos + 1 ); } else { auto it = ep->getRegisterPlan()->varInfo.find(ep->inVariable()->id); TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end()); _reg = it->second.registerId; _useRegister = true; } _vertexVar = nullptr; _edgeVar = nullptr; _pathVar = nullptr; if (ep->usesVertexOutVariable()) { _vertexVar = ep->vertexOutVariable(); } if (ep->usesEdgeOutVariable()) { _edgeVar = ep->edgeOutVariable(); } if (ep->usesPathOutVariable()) { _pathVar = ep->pathOutVariable(); } _CalculationNodeId = ep->getCalculationNodeId(); /* auto pruner = [] (const TraversalPath& path) -> bool { if (strcmp(path.vertices.back().key, "1") == 0) { return true; } if (strcmp(path.vertices.back().key, "31") == 0) { return true; } return false; }; opts.setPruningFunction(pruner); */ } TraversalBlock::~TraversalBlock () { delete _resolver; freeCaches(); } void TraversalBlock::freeCaches () { for (auto& v : _vertices) { v.destroy(); } _vertices.clear(); for (auto& e : _edges) { e.destroy(); } _edges.clear(); for (auto& p : _paths) { p.destroy(); } _paths.clear(); } int TraversalBlock::initialize () { int res = ExecutionBlock::initialize(); auto en = static_cast(getPlanNode()); auto ast = en->_plan->getAst(); auto varInfo = getPlanNode()->getRegisterPlan()->varInfo; if (usesVertexOutput()) { auto it = varInfo.find(_vertexVar->id); TRI_ASSERT(it != varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); _vertexReg = it->second.registerId; } if (usesEdgeOutput()) { auto it = varInfo.find(_edgeVar->id); TRI_ASSERT(it != varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); _edgeReg = it->second.registerId; } if (usesPathOutput()) { auto it = varInfo.find(_pathVar->id); TRI_ASSERT(it != varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); _pathReg = it->second.registerId; } // instantiate expressions: auto instantiateExpression = [&] (AstNode const* a) -> void { // all new AstNodes are registered with the Ast in the Query std::unique_ptr e(new Expression(ast, a)); TRI_IF_FAILURE("TraversalBlock::initialize") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } _hasV8Expression |= e->isV8(); std::unordered_set inVars; e->variables(inVars); std::unique_ptr nce(new NonConstExpression(0, 0, 0, e.get())); e.release(); _nonConstExpressions.push_back(nce.get()); nce.release(); // Prepare _inVars and _inRegs: _inVars.emplace_back(); auto& inVarsCur = _inVars.back(); _inRegs.emplace_back(); auto& inRegsCur = _inRegs.back(); for (auto const& v : inVars) { inVarsCur.emplace_back(v); auto it = getPlanNode()->getRegisterPlan()->varInfo.find(v->id); TRI_ASSERT(it != getPlanNode()->getRegisterPlan()->varInfo.end()); TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId); inRegsCur.emplace_back(it->second.registerId); } }; if (_condition) { instantiateExpression(_condition); } return res; } void TraversalBlock::executeExpressions () { AqlItemBlock* cur = _buffer.front(); for (auto& map : *_expressions) { for (size_t i = 0; i < map.second.size(); ++i) { // Right now no inVars are allowed. SimpleTraverserExpression* it = dynamic_cast(map.second.at(i)); TRI_document_collection_t const* myCollection = nullptr; if (it->expression != nullptr) { if (it->compareTo != nullptr) { delete it->compareTo; } // inVars and inRegs needs fixx AqlValue a = it->expression->execute(_trx, cur, _pos, _inVars[i], _inRegs[i], &myCollection); it->compareTo = new Json(a.toJson(_trx, myCollection, true)); a.destroy(); } } } } void TraversalBlock::executeFilterExpressions () { if (! _expressions->empty()) { if (_hasV8Expression) { bool const isRunningInCluster = triagens::arango::ServerState::instance()->isRunningInCluster(); // must have a V8 context here to protect Expression::execute() auto engine = _engine; triagens::basics::ScopeGuard guard{ [&engine]() -> void { engine->getQuery()->enterContext(); }, [&]() -> void { if (isRunningInCluster) { // must invalidate the expression now as we might be called from // different threads for (auto const& map : *_expressions) { for (auto const& e : map.second) { auto casted = dynamic_cast(e); casted->expression->invalidate(); } } engine->getQuery()->exitContext(); } } }; ISOLATE; v8::HandleScope scope(isolate); // do not delete this! executeExpressions(); TRI_IF_FAILURE("TraversalBlock::executeV8") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } } else { // no V8 context required! Functions::InitializeThreadContext(); try { executeExpressions(); TRI_IF_FAILURE("TraversalBlock::executeExpression") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } Functions::DestroyThreadContext(); } catch (...) { Functions::DestroyThreadContext(); throw; } } } } int TraversalBlock::initializeCursor (AqlItemBlock* items, size_t pos) { return ExecutionBlock::initializeCursor(items, pos); } bool TraversalBlock::executeExpressions (AqlValue& pathValue) { return true; TRI_ASSERT(_condition != nullptr); auto& toReplace = _nonConstExpressions[0]; auto exp = toReplace->expression; TRI_document_collection_t const* myCollection = nullptr; AqlItemBlock b(1, 3); b.setValue(0, 2, pathValue); std::vector outVars({_vertexVar, _edgeVar, _pathVar}); AqlValue a = exp->execute(_trx, &b, 0, outVars, _inRegs[0], &myCollection); b.eraseValue(0, 2); bool rc = a.isTrue(); a.destroy(); return rc; } //////////////////////////////////////////////////////////////////////////////// /// @brief read more paths //////////////////////////////////////////////////////////////////////////////// bool TraversalBlock::morePaths (size_t hint) { freeCaches(); _posInPaths = 0; if (!_traverser->hasMore()) { return false; } auto en = static_cast(getPlanNode()); for (size_t j = 0; j < hint; ++j) { auto p = _traverser->next(); if (p == nullptr) { // There are no further paths available. break; } AqlValue pathValue; if (usesPathOutput() || (en->condition() != nullptr)) { pathValue = AqlValue(p->pathToJson(_trx, _resolver)); } if ((en->condition() != nullptr) && ! executeExpressions(pathValue)) { _traverser->prune(); continue; } if ( usesVertexOutput() ) { _vertices.emplace_back(p->lastVertexToJson(_trx, _resolver)); } if ( usesEdgeOutput() ) { _edges.emplace_back(p->lastEdgeToJson(_trx, _resolver)); } if ( usesPathOutput() ) { _paths.push_back(pathValue); } } // This is only save as long as _vertices is still build return _vertices.size() > 0; } //////////////////////////////////////////////////////////////////////////////// /// @brief skip the next paths //////////////////////////////////////////////////////////////////////////////// size_t TraversalBlock::skipPaths (size_t hint) { freeCaches(); _posInPaths = 0; if (!_traverser->hasMore()) { return 0; } return _traverser->skip(hint); } //////////////////////////////////////////////////////////////////////////////// /// @brief initialize the list of paths //////////////////////////////////////////////////////////////////////////////// void TraversalBlock::initializePaths (AqlItemBlock const* items) { if (_vertices.size() > 0) { // No Initialisation required. return; } if (!_useRegister) { if (!_usedConstant) { _usedConstant = true; _traverser->setStartVertex(_startId); } } else { auto in = items->getValueReference(_pos, _reg); if (in.isShaped()) { auto col = items->getDocumentCollection(_reg); VertexId v(col->_info._cid, TRI_EXTRACT_MARKER_KEY(in.getMarker())); _traverser->setStartVertex(v); } else if (in.isObject()) { Json input = in.toJson(_trx, nullptr, false); if (input.has("_id") ) { Json _idJson = input.get("_id"); if (_idJson.isString()) { _vertexId = JsonHelper::getStringValue(_idJson.json(), ""); VertexId v = triagens::arango::traverser::IdStringToVertexId ( _resolver, _vertexId ); _traverser->setStartVertex(v); } } else if (input.has("vertex")) { // This is used whenever the input is the result of another traversal. Json vertexJson = input.get("vertex"); if (vertexJson.has("_id") ) { Json _idJson = vertexJson.get("_id"); if (_idJson.isString()) { _vertexId = JsonHelper::getStringValue(_idJson.json(), ""); VertexId v = triagens::arango::traverser::IdStringToVertexId ( _resolver, _vertexId ); _traverser->setStartVertex(v); } } } } else if (in._type == AqlValue::DOCVEC) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUERY_PARSE, std::string("Only one start vertex allowed. Embed it in a FOR loop.")); } else { TRI_ASSERT(in.getTypeString() == ""); } } } //////////////////////////////////////////////////////////////////////////////// /// @brief getSome //////////////////////////////////////////////////////////////////////////////// AqlItemBlock* TraversalBlock::getSome (size_t, // atLeast, size_t atMost) { if (_done) { return nullptr; } if (_buffer.empty()) { size_t toFetch = (std::min)(DefaultBatchSize, atMost); if (! ExecutionBlock::getBlock(toFetch, toFetch)) { _done = true; return nullptr; } _pos = 0; // this is in the first block executeFilterExpressions(); } // If we get here, we do have _buffer.front() AqlItemBlock* cur = _buffer.front(); size_t const curRegs = cur->getNrRegs(); if (_pos == 0) { // Initial initialisation initializePaths(cur); } // Iterate more paths: if (_posInPaths >= _vertices.size()) { if (! morePaths(atMost)) { // This input does not have any more paths. maybe the next one has. // we can only return nullptr iff the buffer is empty. if (++_pos >= cur->size()) { _buffer.pop_front(); // does not throw // returnBlock(cur); delete cur; _pos = 0; } else { initializePaths(cur); } return getSome(atMost, atMost); } } size_t available = _vertices.size() - _posInPaths; size_t toSend = (std::min)(atMost, available); RegisterId nrRegs = getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()]; std::unique_ptr res(requestBlock(toSend, nrRegs)); // std::unique_ptr res(new AqlItemBlock(toSend, nrRegs)); // automatically freed if we throw TRI_ASSERT(curRegs <= res->getNrRegs()); // only copy 1st row of registers inherited from previous frame(s) inheritRegisters(cur, res.get(), _pos); for (size_t j = 0; j < toSend; j++) { if (j > 0) { // re-use already copied aqlvalues for (RegisterId i = 0; i < curRegs; i++) { res->setValue(j, i, res->getValueReference(0, i)); // Note: if this throws, then all values will be deleted // properly since the first one is. } } if ( usesVertexOutput() ) { res->setValue(j, _vertexReg, _vertices[_posInPaths].clone()); } if ( usesEdgeOutput() ) { res->setValue(j, _edgeReg, _edges[_posInPaths].clone()); } if ( usesPathOutput() ) { res->setValue(j, _pathReg, _paths[_posInPaths].clone() ); } ++_posInPaths; } // Advance read position: if (_posInPaths >= _vertices.size()) { // we have exhausted our local paths buffer // fetch more paths into our buffer if (! morePaths(atMost)) { // nothing more to read, re-initialize fetching of paths if (++_pos >= cur->size()) { _buffer.pop_front(); // does not throw // returnBlock(cur); delete cur; _pos = 0; } else { initializePaths(cur); } } } // Clear out registers no longer needed later: clearRegisters(res.get()); return res.release(); } size_t TraversalBlock::skipSome (size_t atLeast, size_t atMost) { size_t skipped = 0; if (_done) { return skipped; } if (_buffer.empty()) { size_t toFetch = (std::min)(DefaultBatchSize, atMost); if (! ExecutionBlock::getBlock(toFetch, toFetch)) { _done = true; return skipped; } _pos = 0; // this is in the first block executeFilterExpressions(); } // If we get here, we do have _buffer.front() AqlItemBlock* cur = _buffer.front(); if (_pos == 0) { // Initial initialisation initializePaths(cur); } size_t available = _vertices.size() - _posInPaths; // We have not yet fetched any paths. We can skip the next atMost many if (available == 0) { return skipPaths(atMost); } // We have fewer paths available in our list, so we clear the list and thereby skip these. if (available <= atMost) { freeCaches(); _posInPaths = 0; return available; } _posInPaths += atMost; // Skip the next atMost many paths. return atMost; } // Local Variables: // mode: outline-minor // outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" // End: