From 9c53d045bebb803132bec14ece701f66f2337bec Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 3 Nov 2018 20:17:52 +0100 Subject: [PATCH] Server stream cursor (#7186) (#7210) --- arangod/Aql/QueryCursor.cpp | 83 +++- arangod/Aql/QueryCursor.h | 19 +- arangod/MMFiles/MMFilesExportCursor.cpp | 2 +- arangod/Replication/InitialSyncer.cpp | 7 +- arangod/RestHandler/RestCursorHandler.cpp | 5 +- arangod/Transaction/Methods.cpp | 25 +- arangod/Transaction/Methods.h | 1 + arangod/Transaction/V8Context.cpp | 2 +- arangod/Utils/Cursor.h | 2 +- arangod/Utils/CursorRepository.cpp | 17 +- arangod/Utils/CursorRepository.h | 4 +- arangod/V8Server/V8DealerFeature.cpp | 2 +- arangod/V8Server/v8-users.cpp | 9 +- arangod/V8Server/v8-voccursor.cpp | 439 +++++++++++++++++- arangod/VocBase/Methods/Transactions.cpp | 3 + js/server/bootstrap/modules/internal.js | 6 +- .../modules/@arangodb/arango-statement.js | 5 +- lib/V8/v8-globals.cpp | 2 +- lib/V8/v8-globals.h | 3 + .../shell-query-stream-timecritical-spec.js | 2 +- .../shell/shell-query-stream.js | 20 +- tests/js/common/shell/shell-transactions.js | 29 ++ 22 files changed, 615 insertions(+), 72 deletions(-) rename tests/js/{client => common}/shell/shell-query-stream-timecritical-spec.js (99%) rename tests/js/{client => common}/shell/shell-query-stream.js (88%) diff --git a/arangod/Aql/QueryCursor.cpp b/arangod/Aql/QueryCursor.cpp index 9add6ad30d..3abbbd46d7 100644 --- a/arangod/Aql/QueryCursor.cpp +++ b/arangod/Aql/QueryCursor.cpp @@ -134,7 +134,7 @@ Result QueryResultCursor::dumpSync(VPackBuilder& builder) { if (!hasNext()) { // mark the cursor as deleted - this->deleted(); + this->setDeleted(); } } catch (arangodb::basics::Exception const& ex) { return Result(ex.code(), ex.what()); @@ -147,6 +147,12 @@ Result QueryResultCursor::dumpSync(VPackBuilder& builder) { return {TRI_ERROR_NO_ERROR}; } + +// ............................................................................. +// QueryStreamCursor class +// ............................................................................. + + QueryStreamCursor::QueryStreamCursor( TRI_vocbase_t& vocbase, CursorId id, @@ -154,7 +160,8 @@ QueryStreamCursor::QueryStreamCursor( std::shared_ptr bindVars, std::shared_ptr opts, size_t batchSize, - double ttl) + double ttl, + bool contextOwnedByExterior) : Cursor(id, batchSize, ttl, /*hasCount*/ false), _guard(vocbase), _exportCount(-1), @@ -162,7 +169,7 @@ QueryStreamCursor::QueryStreamCursor( TRI_ASSERT(QueryRegistryFeature::registry() != nullptr); _query = std::make_unique( - false, + contextOwnedByExterior, _guard.database(), aql::QueryString(query), std::move(bindVars), @@ -187,16 +194,32 @@ QueryStreamCursor::QueryStreamCursor( _exportCount = (std::min)(limit.getInt(), _exportCount); } } + + if (contextOwnedByExterior) { + // things break if the Query outlives a V8 transaction + _stateChangeCb = [this](transaction::Methods& trx, + transaction::Status status) { + if ((status == transaction::Status::COMMITTED || + status == transaction::Status::ABORTED) && + !this->isUsed()) { + this->setDeleted(); + } + }; + if (!_query->trx()->addStatusChangeCallback(&_stateChangeCb)) { + _stateChangeCb = nullptr; + } + } } QueryStreamCursor::~QueryStreamCursor() { - while (!_queryResults.empty()) { - _query->engine()->_itemBlockManager.returnBlock( - std::move(_queryResults.front())); - _queryResults.pop_front(); - } - if (_query) { // cursor is canceled or timed-out + cleanupStateCallback(); + + while (!_queryResults.empty()) { + _query->engine()->_itemBlockManager.returnBlock(std::move(_queryResults.front())); + _queryResults.pop_front(); + } + // now remove the continue handler we may have registered in the query _query->sharedState()->setContinueCallback(); // Query destructor will cleanup plan and abort transaction @@ -233,22 +256,22 @@ std::pair QueryStreamCursor::dump(VPackBuilder& builder, } return {state, res}; } catch (arangodb::basics::Exception const& ex) { - this->deleted(); + this->setDeleted(); return {ExecutionState::DONE, Result(ex.code(), "AQL: " + ex.message() + QueryExecutionState::toStringWithPrefix(_query->state()))}; } catch (std::bad_alloc const&) { - this->deleted(); + this->setDeleted(); return {ExecutionState::DONE, Result(TRI_ERROR_OUT_OF_MEMORY, TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY) + QueryExecutionState::toStringWithPrefix(_query->state()))}; } catch (std::exception const& ex) { - this->deleted(); + this->setDeleted(); return {ExecutionState::DONE, Result( TRI_ERROR_INTERNAL, ex.what() + QueryExecutionState::toStringWithPrefix(_query->state()))}; } catch (...) { - this->deleted(); + this->setDeleted(); return {ExecutionState::DONE, Result(TRI_ERROR_INTERNAL, TRI_errno_string(TRI_ERROR_INTERNAL) + QueryExecutionState::toStringWithPrefix(_query->state()))}; @@ -281,22 +304,22 @@ Result QueryStreamCursor::dumpSync(VPackBuilder& builder) { return writeResult(builder); } catch (arangodb::basics::Exception const& ex) { - this->deleted(); + this->setDeleted(); return Result(ex.code(), "AQL: " + ex.message() + QueryExecutionState::toStringWithPrefix(_query->state())); } catch (std::bad_alloc const&) { - this->deleted(); + this->setDeleted(); return Result(TRI_ERROR_OUT_OF_MEMORY, TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY) + QueryExecutionState::toStringWithPrefix(_query->state())); } catch (std::exception const& ex) { - this->deleted(); + this->setDeleted(); return Result( TRI_ERROR_INTERNAL, ex.what() + QueryExecutionState::toStringWithPrefix(_query->state())); } catch (...) { - this->deleted(); + this->setDeleted(); return Result(TRI_ERROR_INTERNAL, TRI_errno_string(TRI_ERROR_INTERNAL) + QueryExecutionState::toStringWithPrefix(_query->state())); @@ -328,7 +351,7 @@ Result QueryStreamCursor::writeResult(VPackBuilder &builder) { while (rowsWritten < batchSize() && _queryResultPos < block->size()) { AqlValue const& value = block->getValueReference(_queryResultPos, resultRegister); - if (!value.isEmpty()) { + if (!value.isEmpty()) { // ignore empty blocks (e.g. from UpdateBlock) value.toVelocyPack(_query->trx(), builder, false); ++rowsWritten; } @@ -365,7 +388,10 @@ Result QueryStreamCursor::writeResult(VPackBuilder &builder) { if (!hasMore) { std::shared_ptr ss = _query->sharedState(); - ss->setContinueCallback(); + ss->setContinueCallback(); + + // cleanup before transaction is committet + cleanupStateCallback(); QueryResult result; ExecutionState state = _query->finalize(result); // will commit transaction @@ -377,25 +403,26 @@ Result QueryStreamCursor::writeResult(VPackBuilder &builder) { builder.add("extra", result.extra->slice()); } _query.reset(); - this->deleted(); + TRI_ASSERT(_queryResults.empty()); + this->setDeleted(); } } catch (arangodb::basics::Exception const& ex) { - this->deleted(); + this->setDeleted(); return Result(ex.code(), "AQL: " + ex.message() + QueryExecutionState::toStringWithPrefix(_query->state())); } catch (std::bad_alloc const&) { - this->deleted(); + this->setDeleted(); return Result(TRI_ERROR_OUT_OF_MEMORY, TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY) + QueryExecutionState::toStringWithPrefix(_query->state())); } catch (std::exception const& ex) { - this->deleted(); + this->setDeleted(); return Result( TRI_ERROR_INTERNAL, ex.what() + QueryExecutionState::toStringWithPrefix(_query->state())); } catch (...) { - this->deleted(); + this->setDeleted(); return Result(TRI_ERROR_INTERNAL, TRI_errno_string(TRI_ERROR_INTERNAL) + QueryExecutionState::toStringWithPrefix(_query->state())); @@ -443,3 +470,11 @@ ExecutionState QueryStreamCursor::prepareDump() { return state; } + +void QueryStreamCursor::cleanupStateCallback() { + TRI_ASSERT(_query); + transaction::Methods* trx = _query->trx(); + if (trx && _stateChangeCb) { + trx->removeStatusChangeCallback(&_stateChangeCb); + } +} diff --git a/arangod/Aql/QueryCursor.h b/arangod/Aql/QueryCursor.h index e0206a1b7b..4965506b83 100644 --- a/arangod/Aql/QueryCursor.h +++ b/arangod/Aql/QueryCursor.h @@ -27,6 +27,7 @@ #include "Aql/QueryResult.h" #include "Basics/Common.h" +#include "Transaction/Methods.h" #include "Utils/Cursor.h" #include "VocBase/vocbase.h" @@ -89,7 +90,8 @@ class QueryStreamCursor final : public arangodb::Cursor { std::shared_ptr bindVars, std::shared_ptr opts, size_t batchSize, - double ttl); + double ttl, + bool contextOwnedByExterior); ~QueryStreamCursor(); @@ -99,9 +101,6 @@ class QueryStreamCursor final : public arangodb::Cursor { size_t count() const override final { return 0; } - // TODO add this to Cursor and make it virtual / override final. - ExecutionState prepareDump(); - std::pair dump( velocypack::Builder& result, std::function const& continueHandler) override final; @@ -117,15 +116,23 @@ class QueryStreamCursor final : public arangodb::Cursor { // Relies on the caller to have fetched more than batchSize() result rows // (if possible) in order to set hasMore reliably. Result writeResult(velocypack::Builder& builder); + + ExecutionState prepareDump(); + + void cleanupStateCallback(); + void cleanupResources(); private: DatabaseGuard _guard; int64_t _exportCount; // used by RocksDBRestExportHandler + /// current query std::unique_ptr _query; + /// buffered results std::deque> _queryResults; - - // index of the next to-be-returned row in _queryResults.front() + /// index of the next to-be-returned row in _queryResults.front() size_t _queryResultPos; + /// used when cursor is owned by V8 transaction + transaction::Methods::StatusChangeCallback _stateChangeCb; }; } // aql diff --git a/arangod/MMFiles/MMFilesExportCursor.cpp b/arangod/MMFiles/MMFilesExportCursor.cpp index 078aaebc97..90fcd01c16 100644 --- a/arangod/MMFiles/MMFilesExportCursor.cpp +++ b/arangod/MMFiles/MMFilesExportCursor.cpp @@ -144,7 +144,7 @@ Result MMFilesExportCursor::dumpSync(VPackBuilder& builder) { // mark the cursor as deleted delete _ex; _ex = nullptr; - this->deleted(); + this->setDeleted(); } } catch (arangodb::basics::Exception const& ex) { return Result(ex.code(), ex.what()); diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 605398f899..c50418c484 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -44,10 +44,9 @@ InitialSyncer::~InitialSyncer() { try { if (!_state.isChildSyncer) { - _batch.finish(_state.connection, _progress); - } - } catch (...) { - } + _batch.finish(_state.connection, _progress); + } + } catch (...) {} } /// @brief start a recurring task to extend the batch diff --git a/arangod/RestHandler/RestCursorHandler.cpp b/arangod/RestHandler/RestCursorHandler.cpp index 70b4dac5b9..9d8fce3e16 100644 --- a/arangod/RestHandler/RestCursorHandler.cpp +++ b/arangod/RestHandler/RestCursorHandler.cpp @@ -165,8 +165,9 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) { } else { CursorRepository* cursors = _vocbase.cursorRepository(); TRI_ASSERT(cursors != nullptr); - Cursor* cursor = cursors->createQueryStream( - querySlice.copyString(), bindVarsBuilder, _options, batchSize, ttl); + Cursor* cursor = cursors->createQueryStream(querySlice.copyString(), + bindVarsBuilder, _options, + batchSize, ttl, /*contextExt*/false); return generateCursorResult(rest::ResponseCode::CREATED, cursor); } diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 9fcd3077d0..388a6816e0 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -254,9 +254,7 @@ bool transaction::Methods::addStatusChangeCallback( ) { if (!callback || !*callback) { return true; // nothing to call back - } - - if (!_state) { + } else if (!_state) { return false; // nothing to add to } @@ -270,6 +268,27 @@ bool transaction::Methods::addStatusChangeCallback( return true; } +bool transaction::Methods::removeStatusChangeCallback( + StatusChangeCallback const* callback +) { + if (!callback || !*callback) { + return true; // nothing to call back + } else if (!_state) { + return false; // nothing to add to + } + + auto* statusChangeCallbacks = getStatusChangeCallbacks(*_state, false); + if (statusChangeCallbacks) { + auto it = std::find(statusChangeCallbacks->begin(), statusChangeCallbacks->end(), callback); + TRI_ASSERT(it != statusChangeCallbacks->end()); + if (ADB_LIKELY(it != statusChangeCallbacks->end())) { + statusChangeCallbacks->erase(it); + } + } + return true; +} + + /*static*/ void transaction::Methods::clearDataSourceRegistrationCallbacks() { getDataSourceRegistrationCallbacks().clear(); } diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index f606cb3fc7..3da1072b87 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -159,6 +159,7 @@ class Methods { /// @param callback nullptr and empty functers are ignored, treated as success /// @return success bool addStatusChangeCallback(StatusChangeCallback const* callback); + bool removeStatusChangeCallback(StatusChangeCallback const* callback); /// @brief clear all called for LogicalDataSource instance association events /// @note not thread-safe on the assumption of static factory registration diff --git a/arangod/Transaction/V8Context.cpp b/arangod/Transaction/V8Context.cpp index 7576c1b35f..259ebc8e2b 100644 --- a/arangod/Transaction/V8Context.cpp +++ b/arangod/Transaction/V8Context.cpp @@ -78,7 +78,7 @@ CollectionNameResolver const& transaction::V8Context::resolver() { _resolver = &(main->resolver()); } else { TRI_ASSERT(_resolver == nullptr); - _resolver = createResolver(); + createResolver(); // sets _resolver } } diff --git a/arangod/Utils/Cursor.h b/arangod/Utils/Cursor.h index db8638c68e..5bcd1d847c 100644 --- a/arangod/Utils/Cursor.h +++ b/arangod/Utils/Cursor.h @@ -77,7 +77,7 @@ class Cursor { inline bool isDeleted() const { return _isDeleted; } - void deleted() { _isDeleted = true; } + void setDeleted() { _isDeleted = true; } void use() { TRI_ASSERT(!_isDeleted); diff --git a/arangod/Utils/CursorRepository.cpp b/arangod/Utils/CursorRepository.cpp index fb78892907..5736755f60 100644 --- a/arangod/Utils/CursorRepository.cpp +++ b/arangod/Utils/CursorRepository.cpp @@ -85,7 +85,7 @@ CursorRepository::~CursorRepository() { LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "giving up waiting for unused cursors"; } - std::this_thread::sleep_for(std::chrono::microseconds(500000)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); ++tries; } @@ -133,7 +133,7 @@ Cursor* CursorRepository::createFromQueryResult( TRI_ASSERT(result.result != nullptr); CursorId const id = TRI_NewServerSpecificTick(); // embedded server id - + TRI_ASSERT(id != 0); std::unique_ptr cursor(new aql::QueryResultCursor( _vocbase, id, std::move(result), batchSize, ttl, hasCount)); cursor->use(); @@ -151,12 +151,15 @@ Cursor* CursorRepository::createFromQueryResult( Cursor* CursorRepository::createQueryStream(std::string const& query, std::shared_ptr const& binds, std::shared_ptr const& opts, - size_t batchSize, double ttl) { + size_t batchSize, double ttl, + bool contextOwnedByExterior) { TRI_ASSERT(!query.empty()); CursorId const id = TRI_NewServerSpecificTick(); // embedded server id - std::unique_ptr cursor(new aql::QueryStreamCursor( - _vocbase, id, query, binds, opts, batchSize, ttl)); + TRI_ASSERT(id != 0); + auto cursor = std::make_unique(_vocbase, id, query, binds, + opts, batchSize, + ttl, contextOwnedByExterior); cursor->use(); return addCursor(std::move(cursor)); @@ -192,7 +195,7 @@ bool CursorRepository::remove(CursorId id, Cursor::CursorType type) { if (cursor->isUsed()) { // cursor is in use by someone else. now mark as deleted - cursor->deleted(); + cursor->setDeleted(); return true; } @@ -311,7 +314,7 @@ bool CursorRepository::garbageCollect(bool force) { if (force || cursor->expires() < now) { cursor->kill(); - cursor->deleted(); + cursor->setDeleted(); } if (cursor->isDeleted()) { diff --git a/arangod/Utils/CursorRepository.h b/arangod/Utils/CursorRepository.h index b6ff52b734..e889a2f89d 100644 --- a/arangod/Utils/CursorRepository.h +++ b/arangod/Utils/CursorRepository.h @@ -81,7 +81,8 @@ class CursorRepository { Cursor* createQueryStream(std::string const& query, std::shared_ptr const& binds, std::shared_ptr const& opts, - size_t batchSize, double ttl); + size_t batchSize, double ttl, + bool contextOwnedByExterior); ////////////////////////////////////////////////////////////////////////////// /// @brief remove a cursor by id @@ -111,6 +112,7 @@ class CursorRepository { ////////////////////////////////////////////////////////////////////////////// /// @brief run a garbage collection on the cursors + /// @return ////////////////////////////////////////////////////////////////////////////// bool garbageCollect(bool); diff --git a/arangod/V8Server/V8DealerFeature.cpp b/arangod/V8Server/V8DealerFeature.cpp index 2036748da2..1bb5d3dd2f 100644 --- a/arangod/V8Server/V8DealerFeature.cpp +++ b/arangod/V8Server/V8DealerFeature.cpp @@ -458,7 +458,7 @@ void V8DealerFeature::copyInstallationFiles() { std::string const versionAppendix = std::regex_replace(rest::Version::getServerVersion(), std::regex("-.*$"), ""); std::string const nodeModulesPath = FileUtils::buildFilename("js", "node", "node_modules"); std::string const nodeModulesPathVersioned = basics::FileUtils::buildFilename("js", versionAppendix, "node", "node_modules"); - auto filter = [&nodeModulesPath, &nodeModulesPathVersioned, this](std::string const& filename) -> bool{ + auto filter = [&nodeModulesPath, &nodeModulesPathVersioned](std::string const& filename) -> bool{ if (filename.size() >= nodeModulesPath.size()) { std::string normalized = filename; FileUtils::normalizePath(normalized); diff --git a/arangod/V8Server/v8-users.cpp b/arangod/V8Server/v8-users.cpp index 87455ea321..8e3a40c0c3 100644 --- a/arangod/V8Server/v8-users.cpp +++ b/arangod/V8Server/v8-users.cpp @@ -610,10 +610,10 @@ void TRI_InitV8Users(v8::Handle context, TRI_vocbase_t* vocbase, v8::Handle ft; ft = v8::FunctionTemplate::New(isolate); - ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsers")); + ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor")); rt = ft->InstanceTemplate(); - rt->SetInternalFieldCount(2); + rt->SetInternalFieldCount(0); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "save"), JS_SaveUser); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "replace"), @@ -646,14 +646,13 @@ void TRI_InitV8Users(v8::Handle context, TRI_vocbase_t* vocbase, JS_AuthIsActive); v8g->UsersTempl.Reset(isolate, rt); - ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor")); + //ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor")); TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor"), ft->GetFunction(), true); // register the global object v8::Handle aa = rt->NewInstance(); if (!aa.IsEmpty()) { - TRI_AddGlobalVariableVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoUsers"), - aa); + TRI_AddGlobalVariableVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoUsers"), aa); } } diff --git a/arangod/V8Server/v8-voccursor.cpp b/arangod/V8Server/v8-voccursor.cpp index 06784cd070..533d713411 100644 --- a/arangod/V8Server/v8-voccursor.cpp +++ b/arangod/V8Server/v8-voccursor.cpp @@ -22,16 +22,26 @@ //////////////////////////////////////////////////////////////////////////////// #include "v8-vocbaseprivate.h" +#include "Aql/QueryCursor.h" #include "Aql/QueryResult.h" #include "Basics/conversions.h" +#include "Basics/VelocyPackHelper.h" +#include "Utils/CollectionNameResolver.h" #include "Utils/Cursor.h" #include "Utils/CursorRepository.h" #include "Transaction/Context.h" #include "Transaction/V8Context.h" #include "V8/v8-conv.h" +#include "V8/v8-globals.h" +#include "V8/v8-utils.h" #include "V8/v8-vpack.h" #include "V8Server/v8-voccursor.h" +#include +#include + +#include "Logger/Logger.h" + using namespace arangodb; using namespace arangodb::basics; @@ -140,11 +150,8 @@ static void JS_JsonCursor(v8::FunctionCallbackInfo const& args) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_CURSOR_NOT_FOUND); } - auto cth = cursor->context()->orderCustomTypeHandler(); - VPackOptions opts = VPackOptions::Defaults; - opts.customTypeHandler = cth.get(); - - VPackBuilder builder(&opts); + VPackOptions* opts = cursor->context()->getVPackOptions(); + VPackBuilder builder(opts); builder.openObject(true); // conversion uses sequential iterator, no indexing Result r = cursor->dumpSync(builder); if (r.fail()) { @@ -157,6 +164,390 @@ static void JS_JsonCursor(v8::FunctionCallbackInfo const& args) { TRI_V8_TRY_CATCH_END } +// ............................................................................. +// cursor v8 class +// ............................................................................. + +struct V8Cursor final { + static constexpr uint16_t CID = 4956; + + V8Cursor(v8::Isolate* isolate, + v8::Handle holder, + TRI_vocbase_t& vocbase, + CursorId cursorId) + : _isolate(isolate), + _cursorId(cursorId), + _resolver(vocbase), + _cte(transaction::Context::createCustomTypeHandler(vocbase, _resolver)) { + // sanity checks + TRI_ASSERT(_handle.IsEmpty()); + TRI_ASSERT(holder->InternalFieldCount() > 0); + + // create a new persistent handle + holder->SetAlignedPointerInInternalField(0, this); + _handle.Reset(_isolate, holder); + _handle.SetWrapperClassId(CID); + + // and make it weak, so that we can garbage collect + _handle.SetWeak(&_handle, weakCallback, v8::WeakCallbackType::kFinalizer); + _options.customTypeHandler = _cte.get(); + } + + ~V8Cursor() { + if (!_handle.IsEmpty()) { + TRI_ASSERT(_handle.IsNearDeath()); + + _handle.ClearWeak(); + v8::Local data = + v8::Local::New(_isolate, _handle); + data->SetInternalField(0, v8::Undefined(_isolate)); + _handle.Reset(); + } + if (_isolate) { + TRI_GET_GLOBALS2(_isolate); + TRI_vocbase_t* vocbase = v8g->_vocbase; + if (vocbase) { + CursorRepository* cursors = vocbase->cursorRepository(); + cursors->remove(_cursorId, Cursor::CURSOR_VPACK); + } + } + } + + ////////////////////////////////////////////////////////////////////////////// + /// @brief unwraps a structure + ////////////////////////////////////////////////////////////////////////////// + + static V8Cursor* unwrap(v8::Local handle) { + TRI_ASSERT(handle->InternalFieldCount() > 0); + return static_cast(handle->GetAlignedPointerFromInternalField(0)); + } + + /// @brief return false on error + bool maybeFetchBatch(v8::Isolate* isolate) { + if (_dataIterator == nullptr && _hasMore) { // fetch more data + TRI_GET_GLOBALS(); + CursorRepository* cursors = v8g->_vocbase->cursorRepository(); + bool busy; + Cursor* cc = cursors->find(_cursorId, Cursor::CURSOR_VPACK, busy); + if (busy || cc == nullptr) { + TRI_V8_SET_ERROR("cursor is busy"); + return false; // someone else is using it + } + TRI_DEFER(cc->release()); + + Result r = fetchData(cc); + if (r.fail()) { + TRI_CreateErrorObject(isolate, r); + return false; + } + } + return true; + } + + /// @brief fetch the next batch + Result fetchData(Cursor* cursor) { + TRI_ASSERT(cursor != nullptr && cursor->isUsed()); + + TRI_ASSERT(_hasMore); + TRI_ASSERT(_dataIterator == nullptr); + _dataSlice = VPackSlice::noneSlice(); + _extraSlice = VPackSlice::noneSlice(); + + _tmpResult.clear(); + _tmpResult.openObject(); + Result r = cursor->dumpSync(_tmpResult); + if (r.fail()) { + return r; + } + _tmpResult.close(); + + TRI_ASSERT(_tmpResult.slice().isObject()); + // TODO as an optimization + for(auto pair : VPackObjectIterator(_tmpResult.slice(), true)) { + if (pair.key.isEqualString("result")) { + _dataSlice = pair.value; + TRI_ASSERT(_dataSlice.isArray()); + if (!_dataSlice.isEmptyArray()) { + _dataIterator = std::make_unique(_dataSlice); + } + } else if (pair.key.isEqualString("hasMore")) { + _hasMore = pair.value.getBool(); + } else if (pair.key.isEqualString("extra")) { + _extraSlice = pair.value; + } + } + // cursor should delete itself + TRI_ASSERT(_hasMore || cursor->isDeleted()); + return Result{}; + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief constructs a new streaming cursor from arguments + //////////////////////////////////////////////////////////////////////////////// + + static void New(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + TRI_V8_CURRENT_GLOBALS_AND_SCOPE; + + if (!args.IsConstructCall()) { // if not call as a constructor call it + TRI_V8_THROW_EXCEPTION_USAGE("only instance-able by constructor"); + } + + if (args.Length() < 1 || args.Length() > 3) { + TRI_V8_THROW_EXCEPTION_USAGE("ArangoQueryStreamCursor(, , )"); + } + + // get the query string + if (!args[0]->IsString()) { + TRI_V8_THROW_TYPE_ERROR("expecting string for "); + } + std::string const queryString(TRI_ObjectToString(args[0])); + + // bind parameters + std::shared_ptr bindVars; + + if (args.Length() > 1) { + if (!args[1]->IsUndefined() && !args[1]->IsNull() && !args[1]->IsObject()) { + TRI_V8_THROW_TYPE_ERROR("expecting object for "); + } + if (args[1]->IsObject()) { + bindVars.reset(new VPackBuilder); + int res = TRI_V8ToVPack(isolate, *(bindVars.get()), args[1], false); + + if (res != TRI_ERROR_NO_ERROR) { + TRI_V8_THROW_EXCEPTION(res); + } + } + } + + // options + auto options = std::make_shared(); + if (args.Length() > 2) { + // we have options! yikes! + if (!args[2]->IsObject()) { + TRI_V8_THROW_TYPE_ERROR("expecting object for "); + } + + int res = TRI_V8ToVPack(isolate, *options, args[2], false); + if (res != TRI_ERROR_NO_ERROR) { + TRI_V8_THROW_EXCEPTION(res); + } + } else { + VPackObjectBuilder guard(options.get()); + } + size_t batchSize = VelocyPackHelper::getNumericValue(options->slice(), + "batchSize", 1000); + + const bool contextOwnedByExterior = transaction::V8Context::isEmbedded(); + TRI_vocbase_t* vocbase = v8g->_vocbase; + TRI_ASSERT(vocbase != nullptr); + auto* cursors = vocbase->cursorRepository(); // create a cursor + double ttl = std::numeric_limits::max(); + // specify ID 0 so it uses the external V8 context + auto cc = cursors->createQueryStream(queryString, + std::move(bindVars), + std::move(options), + batchSize, ttl, + contextOwnedByExterior); + TRI_DEFER(cc->release()); + // args.Holder() is supposedly better than args.This() + auto self = new V8Cursor(isolate, args.Holder(), *vocbase, cc->id()); + Result r = self->fetchData(cc); + if (r.fail()) { + TRI_V8_THROW_EXCEPTION(r); + } + // do not delete self, its owned by V8 now + + TRI_V8_RETURN(args.This()); + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief ArangoQueryStreamCursor.prototype.toArray = ... + //////////////////////////////////////////////////////////////////////////////// + + static void toArray(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::Isolate* isolate = args.GetIsolate(); + v8::HandleScope scope(isolate); + TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_NOT_IMPLEMENTED, + "toArray() is not supported on ArangoQueryStreamCursor"); + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief ArangoQueryStreamCursor.prototype.getExtra = ... + //////////////////////////////////////////////////////////////////////////////// + + static void getExtra(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + + V8Cursor* self = V8Cursor::unwrap(args.Holder()); + if (self == nullptr) { + TRI_V8_RETURN(v8::Undefined(isolate)); + } + + // we always need to fetch + if (!self->maybeFetchBatch(isolate)) { // sets exception + return; + } + + if (self->_extraSlice.isObject()) { + TRI_V8_RETURN(TRI_VPackToV8(isolate, self->_extraSlice)); + } + + TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, + "getExtra() is only valid after all data has been fetched"); + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief ArangoQueryStreamCursor.prototype.hasNext = ... + //////////////////////////////////////////////////////////////////////////////// + + static void hasNext(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + + V8Cursor* self = V8Cursor::unwrap(args.Holder()); + if (self == nullptr) { + TRI_V8_RETURN_UNDEFINED(); + } + + // we always need to fetch + if (!self->maybeFetchBatch(isolate)) { // sets exception + return; + } + + if (self->_dataIterator != nullptr) { + TRI_V8_RETURN_TRUE(); + } + TRI_V8_RETURN_FALSE(); + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief ArangoQueryStreamCursor.prototype.next = ... + //////////////////////////////////////////////////////////////////////////////// + + static void next(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + + V8Cursor* self = V8Cursor::unwrap(args.Holder()); + if (self == nullptr) { + TRI_V8_RETURN_UNDEFINED(); + } + + // we always need to fetch + if (!self->maybeFetchBatch(isolate)) { // sets exception + return; + } + + if (self->_dataIterator) { // got a current batch + TRI_ASSERT(self->_dataIterator->valid()); + + VPackSlice s = self->_dataIterator->value(); + v8::Local val = TRI_VPackToV8(isolate, s, &self->_options); + + ++(*self->_dataIterator); + // reset so that the next one can fetch again + if (!self->_dataIterator->valid()) { + self->_dataIterator.reset(); + } + TRI_V8_RETURN(val); + } + + TRI_V8_RETURN_UNDEFINED(); + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief ArangoQueryStreamCursor.prototype.count = ... + //////////////////////////////////////////////////////////////////////////////// + + static void count(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + TRI_V8_RETURN_UNDEFINED(); // always undefined + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief explicitly discard cursor, mostly relevant for testing + //////////////////////////////////////////////////////////////////////////////// + + static void dispose(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + V8Cursor* self = V8Cursor::unwrap(args.Holder()); + if (self != nullptr) { + TRI_GET_GLOBALS(); + CursorRepository* cursors = v8g->_vocbase->cursorRepository(); + cursors->remove(self->_cursorId, Cursor::CURSOR_VPACK); + self->_hasMore = false; + self->_dataSlice = VPackSlice::noneSlice(); + self->_extraSlice = VPackSlice::noneSlice(); + self->_dataIterator.reset(); + self->_tmpResult.clear(); + } + TRI_V8_TRY_CATCH_END + } + + //////////////////////////////////////////////////////////////////////////////// + /// @brief ArangoQueryStreamCursor.prototype.id = ... + //////////////////////////////////////////////////////////////////////////////// + + static void id(v8::FunctionCallbackInfo const& args) { + v8::Isolate* isolate = args.GetIsolate(); + v8::HandleScope scope(isolate); + V8Cursor* self = V8Cursor::unwrap(args.Holder()); + if (self == nullptr) { + TRI_V8_RETURN(v8::Undefined(isolate)); + } + TRI_V8_RETURN(v8::Integer::New(isolate, self->_cursorId)); + } + +private: + /// called when GC deletes the value + static void weakCallback(const v8::WeakCallbackInfo>& data) { + auto isolate = data.GetIsolate(); + auto persistent = data.GetParameter(); + auto myPointer = v8::Local::New(isolate, *persistent); + + TRI_ASSERT(myPointer->InternalFieldCount() > 0); + V8Cursor* obj = V8Cursor::unwrap(myPointer); + TRI_ASSERT(obj); + + TRI_ASSERT(persistent == &obj->_handle); + TRI_ASSERT(persistent->IsNearDeath()); + delete obj; + } + +private: + + /// @brief persistent handle for V8 object + v8::Persistent _handle; + /// @brief isolate + v8::Isolate* _isolate; + /// @brief temporary result buffer + VPackBuilder _tmpResult; + /// @brief id of cursor + CursorId _cursorId; + + /// cache has more variable + bool _hasMore = true; + VPackSlice _dataSlice = VPackSlice::noneSlice(); + /// cached extras which might be attached to the stream + VPackSlice _extraSlice = VPackSlice::noneSlice(); + /// @brief pointer to the current result + std::unique_ptr _dataIterator; + + CollectionNameResolver _resolver; + std::shared_ptr _cte; + VPackOptions _options = VPackOptions::Defaults; +}; + // ............................................................................. // generate the general cursor template // ............................................................................. @@ -165,10 +556,48 @@ void TRI_InitV8cursor(v8::Handle context, TRI_v8_global_t* v8g) { v8::Isolate* isolate = v8::Isolate::GetCurrent(); // cursor functions. not intended to be used by end users + // these cursor functions are the APIs implemented in js/actions/api-simple.js TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "CREATE_CURSOR"), JS_CreateCursor, true); TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "JSON_CURSOR"), JS_JsonCursor, true); + + // streaming query cursor class, intended to be used via ArangoStatement.execute + v8::Handle rt; + v8::Handle ft; + + ft = v8::FunctionTemplate::New(isolate, V8Cursor::New); + ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoQueryStreamCursor")); + + rt = ft->InstanceTemplate(); + rt->SetInternalFieldCount(1); + + ft->PrototypeTemplate()->Set(TRI_V8_ASCII_STRING(isolate, "isArangoResultSet"), + v8::True(isolate)); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "toArray"), + V8Cursor::toArray); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "getExtra"), + V8Cursor::getExtra); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "hasNext"), + V8Cursor::hasNext); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "next"), + V8Cursor::next); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "count"), + V8Cursor::count); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "dispose"), + V8Cursor::dispose); + TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "id"), + V8Cursor::id); + + v8g->StreamQueryCursorTempl.Reset(isolate, ft); + v8::MaybeLocal ctor = ft->GetFunction(context); + if (ctor.IsEmpty()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "error creating v8 stream cursor"); + } + //ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoStreamQueryCursorCtor")); + TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoQueryStreamCursor"), + ctor.ToLocalChecked(), true); + } diff --git a/arangod/VocBase/Methods/Transactions.cpp b/arangod/VocBase/Methods/Transactions.cpp index 44724c807c..dbfb0f1053 100644 --- a/arangod/VocBase/Methods/Transactions.cpp +++ b/arangod/VocBase/Methods/Transactions.cpp @@ -7,6 +7,7 @@ #include "Transaction/Methods.h" #include "Transaction/Options.h" #include "Transaction/V8Context.h" +#include "Utils/CursorRepository.h" #include "V8/v8-conv.h" #include "V8/v8-vpack.h" #include "V8/v8-helper.h" @@ -361,6 +362,8 @@ Result executeTransactionJS( if (!rv.fail()) { rv = trx->commit(); } + // if we do not remove unused V8Cursors, V8Context might not reset global state + vocbase.cursorRepository()->garbageCollect(/*force*/false); return rv; } diff --git a/js/server/bootstrap/modules/internal.js b/js/server/bootstrap/modules/internal.js index 52a06cc547..ae848cd809 100644 --- a/js/server/bootstrap/modules/internal.js +++ b/js/server/bootstrap/modules/internal.js @@ -92,11 +92,11 @@ delete global.ArangoDatabase; // ////////////////////////////////////////////////////////////////////////////// - // / @brief ShapedJson stub object - only here for compatibility with 2.8 + // / @brief ArangoQueryStreamCursor // ////////////////////////////////////////////////////////////////////////////// - exports.ShapedJson = function () {}; - delete global.ShapedJson; + exports.ArangoQueryStreamCursor = global.ArangoQueryStreamCursor; + delete global.ArangoQueryStreamCursor; // ////////////////////////////////////////////////////////////////////////////// // / @brief dispatcherThreads diff --git a/js/server/modules/@arangodb/arango-statement.js b/js/server/modules/@arangodb/arango-statement.js index c7a37332e6..932d0bbb54 100644 --- a/js/server/modules/@arangodb/arango-statement.js +++ b/js/server/modules/@arangodb/arango-statement.js @@ -32,6 +32,7 @@ module.isSystem = true; var ArangoStatement = require('@arangodb/arango-statement-common').ArangoStatement; var GeneralArrayCursor = require('@arangodb/arango-cursor').GeneralArrayCursor; +const ArangoQueryStreamCursor = require('internal').ArangoQueryStreamCursor; // ////////////////////////////////////////////////////////////////////////////// // / @brief parse a query and return the results @@ -75,8 +76,10 @@ ArangoStatement.prototype.execute = function () { if (this._cache !== undefined) { opts.cache = this._cache; } + if (opts.stream) { + return new ArangoQueryStreamCursor(this._query, this._bindVars, opts); + } } - // {json:[docs], stats:{}, profile:{}, warnings:{}, cached:true} var result = AQL_EXECUTE(this._query, this._bindVars, opts); return new GeneralArrayCursor(result.json, 0, null, result); diff --git a/lib/V8/v8-globals.cpp b/lib/V8/v8-globals.cpp index f027396c42..937041c536 100644 --- a/lib/V8/v8-globals.cpp +++ b/lib/V8/v8-globals.cpp @@ -41,8 +41,8 @@ TRI_v8_global_t::TRI_v8_global_t(v8::Isolate* isolate) #ifdef USE_ENTERPRISE SmartGraphTempl(), #endif - BufferTempl(), + StreamQueryCursorTempl(), BufferConstant(), DeleteConstant(), diff --git a/lib/V8/v8-globals.h b/lib/V8/v8-globals.h index a4440e59aa..afc8edcb57 100644 --- a/lib/V8/v8-globals.h +++ b/lib/V8/v8-globals.h @@ -411,6 +411,9 @@ struct TRI_v8_global_t { /// @brief Buffer template v8::Persistent BufferTempl; + + /// @brief stream query cursor templace + v8::Persistent StreamQueryCursorTempl; /// @brief "Buffer" constant v8::Persistent BufferConstant; diff --git a/tests/js/client/shell/shell-query-stream-timecritical-spec.js b/tests/js/common/shell/shell-query-stream-timecritical-spec.js similarity index 99% rename from tests/js/client/shell/shell-query-stream-timecritical-spec.js rename to tests/js/common/shell/shell-query-stream-timecritical-spec.js index f2ef773018..03642940a3 100644 --- a/tests/js/client/shell/shell-query-stream-timecritical-spec.js +++ b/tests/js/common/shell/shell-query-stream-timecritical-spec.js @@ -20,7 +20,7 @@ let cursors = []; function sendQuery (count, async) { count = count || 1; for (let i = 0; i < count; ++i) { - let opts = { stream:true }; + let opts = { stream: true }; if (async === true) { opts.batchSize = 1; // runs SLEEP once } diff --git a/tests/js/client/shell/shell-query-stream.js b/tests/js/common/shell/shell-query-stream.js similarity index 88% rename from tests/js/client/shell/shell-query-stream.js rename to tests/js/common/shell/shell-query-stream.js index 68ff72a401..68c363f0b8 100644 --- a/tests/js/client/shell/shell-query-stream.js +++ b/tests/js/common/shell/shell-query-stream.js @@ -1,5 +1,5 @@ /*jshint globalstrict:false, strict:false, maxlen:1000*/ -/*global assertEqual, assertTrue, assertFalse, fail, more */ +/*global assertEqual, assertTrue, assertUndefined, fail, more */ //////////////////////////////////////////////////////////////////////////////// /// @brief test the statement class @@ -29,9 +29,7 @@ //////////////////////////////////////////////////////////////////////////////// const jsunity = require("jsunity"); - -const arangodb = require("@arangodb"); -const db = arangodb.db; +const db = require("internal").db; //////////////////////////////////////////////////////////////////////////////// /// @brief test suite: stream cursors @@ -92,8 +90,20 @@ function StreamCursorSuite () { cursor.next(); } }); - } + }, + testInfiniteAQL : function() { + var stmt = db._createStatement({ query: "FOR i IN 1..100000000000 RETURN i", + options: { stream: true }, + batchSize: 1000}); + var cursor = stmt.execute(); + + assertUndefined(cursor.count()); + let i = 10; + while (cursor.hasNext() && i-- > 0) { + cursor.next(); + } + } }; } diff --git a/tests/js/common/shell/shell-transactions.js b/tests/js/common/shell/shell-transactions.js index b9de760feb..7bf78b36ec 100644 --- a/tests/js/common/shell/shell-transactions.js +++ b/tests/js/common/shell/shell-transactions.js @@ -576,6 +576,35 @@ function TransactionsImplicitCollectionsSuite () { } }, + //////////////////////////////////////////////////////////////////////////////// + /// @brief perform an infinite loop + //////////////////////////////////////////////////////////////////////////////// + + testUseQueryStreamCursorInAql : function () { + let docs = []; + for(let i = 0; i < 100000; i++) { + docs.push({value: i}); + if (i % 5000 === 0) { + c1.save(docs); + docs = []; + } + } + c1.save(docs); + + var result = db._executeTransaction({ + collections: { allowImplicit: false, read: cn1 }, + action: `function (params) { + const db = require('internal').db; + let cc = db._query('FOR i IN @@cn1 RETURN i', { '@cn1' : params.cn1 }, {stream: true}); + let xx = 0; + while (cc.hasNext()) {cc.next(); xx++;} + let cc2 = db._query('FOR i IN 1..1000000000 RETURN i', {}, {stream: true}); + return xx; }`, + params: { cn1: cn1 } + }); + assertEqual(100000, result); + }, + //////////////////////////////////////////////////////////////////////////////// /// @brief uses an explicitly declared collection for reading ////////////////////////////////////////////////////////////////////////////////