1
0
Fork 0

Server stream cursor (#7186) (#7210)

This commit is contained in:
Simon 2018-11-03 20:17:52 +01:00 committed by Jan
parent 8f1ce5b396
commit 9c53d045be
22 changed files with 615 additions and 72 deletions

View File

@ -134,7 +134,7 @@ Result QueryResultCursor::dumpSync(VPackBuilder& builder) {
if (!hasNext()) {
// mark the cursor as deleted
this->deleted();
this->setDeleted();
}
} catch (arangodb::basics::Exception const& ex) {
return Result(ex.code(), ex.what());
@ -147,6 +147,12 @@ Result QueryResultCursor::dumpSync(VPackBuilder& builder) {
return {TRI_ERROR_NO_ERROR};
}
// .............................................................................
// QueryStreamCursor class
// .............................................................................
QueryStreamCursor::QueryStreamCursor(
TRI_vocbase_t& vocbase,
CursorId id,
@ -154,7 +160,8 @@ QueryStreamCursor::QueryStreamCursor(
std::shared_ptr<VPackBuilder> bindVars,
std::shared_ptr<VPackBuilder> opts,
size_t batchSize,
double ttl)
double ttl,
bool contextOwnedByExterior)
: Cursor(id, batchSize, ttl, /*hasCount*/ false),
_guard(vocbase),
_exportCount(-1),
@ -162,7 +169,7 @@ QueryStreamCursor::QueryStreamCursor(
TRI_ASSERT(QueryRegistryFeature::registry() != nullptr);
_query = std::make_unique<Query>(
false,
contextOwnedByExterior,
_guard.database(),
aql::QueryString(query),
std::move(bindVars),
@ -187,16 +194,32 @@ QueryStreamCursor::QueryStreamCursor(
_exportCount = (std::min)(limit.getInt(), _exportCount);
}
}
if (contextOwnedByExterior) {
// things break if the Query outlives a V8 transaction
_stateChangeCb = [this](transaction::Methods& trx,
transaction::Status status) {
if ((status == transaction::Status::COMMITTED ||
status == transaction::Status::ABORTED) &&
!this->isUsed()) {
this->setDeleted();
}
};
if (!_query->trx()->addStatusChangeCallback(&_stateChangeCb)) {
_stateChangeCb = nullptr;
}
}
}
QueryStreamCursor::~QueryStreamCursor() {
while (!_queryResults.empty()) {
_query->engine()->_itemBlockManager.returnBlock(
std::move(_queryResults.front()));
_queryResults.pop_front();
}
if (_query) { // cursor is canceled or timed-out
cleanupStateCallback();
while (!_queryResults.empty()) {
_query->engine()->_itemBlockManager.returnBlock(std::move(_queryResults.front()));
_queryResults.pop_front();
}
// now remove the continue handler we may have registered in the query
_query->sharedState()->setContinueCallback();
// Query destructor will cleanup plan and abort transaction
@ -233,22 +256,22 @@ std::pair<ExecutionState, Result> QueryStreamCursor::dump(VPackBuilder& builder,
}
return {state, res};
} catch (arangodb::basics::Exception const& ex) {
this->deleted();
this->setDeleted();
return {ExecutionState::DONE, Result(ex.code(),
"AQL: " + ex.message() +
QueryExecutionState::toStringWithPrefix(_query->state()))};
} catch (std::bad_alloc const&) {
this->deleted();
this->setDeleted();
return {ExecutionState::DONE, Result(TRI_ERROR_OUT_OF_MEMORY,
TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY) +
QueryExecutionState::toStringWithPrefix(_query->state()))};
} catch (std::exception const& ex) {
this->deleted();
this->setDeleted();
return {ExecutionState::DONE, Result(
TRI_ERROR_INTERNAL,
ex.what() + QueryExecutionState::toStringWithPrefix(_query->state()))};
} catch (...) {
this->deleted();
this->setDeleted();
return {ExecutionState::DONE, Result(TRI_ERROR_INTERNAL,
TRI_errno_string(TRI_ERROR_INTERNAL) +
QueryExecutionState::toStringWithPrefix(_query->state()))};
@ -281,22 +304,22 @@ Result QueryStreamCursor::dumpSync(VPackBuilder& builder) {
return writeResult(builder);
} catch (arangodb::basics::Exception const& ex) {
this->deleted();
this->setDeleted();
return Result(ex.code(),
"AQL: " + ex.message() +
QueryExecutionState::toStringWithPrefix(_query->state()));
} catch (std::bad_alloc const&) {
this->deleted();
this->setDeleted();
return Result(TRI_ERROR_OUT_OF_MEMORY,
TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY) +
QueryExecutionState::toStringWithPrefix(_query->state()));
} catch (std::exception const& ex) {
this->deleted();
this->setDeleted();
return Result(
TRI_ERROR_INTERNAL,
ex.what() + QueryExecutionState::toStringWithPrefix(_query->state()));
} catch (...) {
this->deleted();
this->setDeleted();
return Result(TRI_ERROR_INTERNAL,
TRI_errno_string(TRI_ERROR_INTERNAL) +
QueryExecutionState::toStringWithPrefix(_query->state()));
@ -328,7 +351,7 @@ Result QueryStreamCursor::writeResult(VPackBuilder &builder) {
while (rowsWritten < batchSize() && _queryResultPos < block->size()) {
AqlValue const& value = block->getValueReference(_queryResultPos, resultRegister);
if (!value.isEmpty()) {
if (!value.isEmpty()) { // ignore empty blocks (e.g. from UpdateBlock)
value.toVelocyPack(_query->trx(), builder, false);
++rowsWritten;
}
@ -365,7 +388,10 @@ Result QueryStreamCursor::writeResult(VPackBuilder &builder) {
if (!hasMore) {
std::shared_ptr<SharedQueryState> ss = _query->sharedState();
ss->setContinueCallback();
ss->setContinueCallback();
// cleanup before transaction is committet
cleanupStateCallback();
QueryResult result;
ExecutionState state = _query->finalize(result); // will commit transaction
@ -377,25 +403,26 @@ Result QueryStreamCursor::writeResult(VPackBuilder &builder) {
builder.add("extra", result.extra->slice());
}
_query.reset();
this->deleted();
TRI_ASSERT(_queryResults.empty());
this->setDeleted();
}
} catch (arangodb::basics::Exception const& ex) {
this->deleted();
this->setDeleted();
return Result(ex.code(),
"AQL: " + ex.message() +
QueryExecutionState::toStringWithPrefix(_query->state()));
} catch (std::bad_alloc const&) {
this->deleted();
this->setDeleted();
return Result(TRI_ERROR_OUT_OF_MEMORY,
TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY) +
QueryExecutionState::toStringWithPrefix(_query->state()));
} catch (std::exception const& ex) {
this->deleted();
this->setDeleted();
return Result(
TRI_ERROR_INTERNAL,
ex.what() + QueryExecutionState::toStringWithPrefix(_query->state()));
} catch (...) {
this->deleted();
this->setDeleted();
return Result(TRI_ERROR_INTERNAL,
TRI_errno_string(TRI_ERROR_INTERNAL) +
QueryExecutionState::toStringWithPrefix(_query->state()));
@ -443,3 +470,11 @@ ExecutionState QueryStreamCursor::prepareDump() {
return state;
}
void QueryStreamCursor::cleanupStateCallback() {
TRI_ASSERT(_query);
transaction::Methods* trx = _query->trx();
if (trx && _stateChangeCb) {
trx->removeStatusChangeCallback(&_stateChangeCb);
}
}

View File

@ -27,6 +27,7 @@
#include "Aql/QueryResult.h"
#include "Basics/Common.h"
#include "Transaction/Methods.h"
#include "Utils/Cursor.h"
#include "VocBase/vocbase.h"
@ -89,7 +90,8 @@ class QueryStreamCursor final : public arangodb::Cursor {
std::shared_ptr<velocypack::Builder> bindVars,
std::shared_ptr<velocypack::Builder> opts,
size_t batchSize,
double ttl);
double ttl,
bool contextOwnedByExterior);
~QueryStreamCursor();
@ -99,9 +101,6 @@ class QueryStreamCursor final : public arangodb::Cursor {
size_t count() const override final { return 0; }
// TODO add this to Cursor and make it virtual / override final.
ExecutionState prepareDump();
std::pair<ExecutionState, Result> dump(
velocypack::Builder& result,
std::function<void()> const& continueHandler) override final;
@ -117,15 +116,23 @@ class QueryStreamCursor final : public arangodb::Cursor {
// Relies on the caller to have fetched more than batchSize() result rows
// (if possible) in order to set hasMore reliably.
Result writeResult(velocypack::Builder& builder);
ExecutionState prepareDump();
void cleanupStateCallback();
void cleanupResources();
private:
DatabaseGuard _guard;
int64_t _exportCount; // used by RocksDBRestExportHandler
/// current query
std::unique_ptr<aql::Query> _query;
/// buffered results
std::deque<std::unique_ptr<AqlItemBlock>> _queryResults;
// index of the next to-be-returned row in _queryResults.front()
/// index of the next to-be-returned row in _queryResults.front()
size_t _queryResultPos;
/// used when cursor is owned by V8 transaction
transaction::Methods::StatusChangeCallback _stateChangeCb;
};
} // aql

View File

@ -144,7 +144,7 @@ Result MMFilesExportCursor::dumpSync(VPackBuilder& builder) {
// mark the cursor as deleted
delete _ex;
_ex = nullptr;
this->deleted();
this->setDeleted();
}
} catch (arangodb::basics::Exception const& ex) {
return Result(ex.code(), ex.what());

View File

@ -44,10 +44,9 @@ InitialSyncer::~InitialSyncer() {
try {
if (!_state.isChildSyncer) {
_batch.finish(_state.connection, _progress);
}
} catch (...) {
}
_batch.finish(_state.connection, _progress);
}
} catch (...) {}
}
/// @brief start a recurring task to extend the batch

View File

@ -165,8 +165,9 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
} else {
CursorRepository* cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
Cursor* cursor = cursors->createQueryStream(
querySlice.copyString(), bindVarsBuilder, _options, batchSize, ttl);
Cursor* cursor = cursors->createQueryStream(querySlice.copyString(),
bindVarsBuilder, _options,
batchSize, ttl, /*contextExt*/false);
return generateCursorResult(rest::ResponseCode::CREATED, cursor);
}

View File

@ -254,9 +254,7 @@ bool transaction::Methods::addStatusChangeCallback(
) {
if (!callback || !*callback) {
return true; // nothing to call back
}
if (!_state) {
} else if (!_state) {
return false; // nothing to add to
}
@ -270,6 +268,27 @@ bool transaction::Methods::addStatusChangeCallback(
return true;
}
bool transaction::Methods::removeStatusChangeCallback(
StatusChangeCallback const* callback
) {
if (!callback || !*callback) {
return true; // nothing to call back
} else if (!_state) {
return false; // nothing to add to
}
auto* statusChangeCallbacks = getStatusChangeCallbacks(*_state, false);
if (statusChangeCallbacks) {
auto it = std::find(statusChangeCallbacks->begin(), statusChangeCallbacks->end(), callback);
TRI_ASSERT(it != statusChangeCallbacks->end());
if (ADB_LIKELY(it != statusChangeCallbacks->end())) {
statusChangeCallbacks->erase(it);
}
}
return true;
}
/*static*/ void transaction::Methods::clearDataSourceRegistrationCallbacks() {
getDataSourceRegistrationCallbacks().clear();
}

View File

@ -159,6 +159,7 @@ class Methods {
/// @param callback nullptr and empty functers are ignored, treated as success
/// @return success
bool addStatusChangeCallback(StatusChangeCallback const* callback);
bool removeStatusChangeCallback(StatusChangeCallback const* callback);
/// @brief clear all called for LogicalDataSource instance association events
/// @note not thread-safe on the assumption of static factory registration

View File

@ -78,7 +78,7 @@ CollectionNameResolver const& transaction::V8Context::resolver() {
_resolver = &(main->resolver());
} else {
TRI_ASSERT(_resolver == nullptr);
_resolver = createResolver();
createResolver(); // sets _resolver
}
}

View File

@ -77,7 +77,7 @@ class Cursor {
inline bool isDeleted() const { return _isDeleted; }
void deleted() { _isDeleted = true; }
void setDeleted() { _isDeleted = true; }
void use() {
TRI_ASSERT(!_isDeleted);

View File

@ -85,7 +85,7 @@ CursorRepository::~CursorRepository() {
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "giving up waiting for unused cursors";
}
std::this_thread::sleep_for(std::chrono::microseconds(500000));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
++tries;
}
@ -133,7 +133,7 @@ Cursor* CursorRepository::createFromQueryResult(
TRI_ASSERT(result.result != nullptr);
CursorId const id = TRI_NewServerSpecificTick(); // embedded server id
TRI_ASSERT(id != 0);
std::unique_ptr<Cursor> cursor(new aql::QueryResultCursor(
_vocbase, id, std::move(result), batchSize, ttl, hasCount));
cursor->use();
@ -151,12 +151,15 @@ Cursor* CursorRepository::createFromQueryResult(
Cursor* CursorRepository::createQueryStream(std::string const& query,
std::shared_ptr<VPackBuilder> const& binds,
std::shared_ptr<VPackBuilder> const& opts,
size_t batchSize, double ttl) {
size_t batchSize, double ttl,
bool contextOwnedByExterior) {
TRI_ASSERT(!query.empty());
CursorId const id = TRI_NewServerSpecificTick(); // embedded server id
std::unique_ptr<Cursor> cursor(new aql::QueryStreamCursor(
_vocbase, id, query, binds, opts, batchSize, ttl));
TRI_ASSERT(id != 0);
auto cursor = std::make_unique<aql::QueryStreamCursor>(_vocbase, id, query, binds,
opts, batchSize,
ttl, contextOwnedByExterior);
cursor->use();
return addCursor(std::move(cursor));
@ -192,7 +195,7 @@ bool CursorRepository::remove(CursorId id, Cursor::CursorType type) {
if (cursor->isUsed()) {
// cursor is in use by someone else. now mark as deleted
cursor->deleted();
cursor->setDeleted();
return true;
}
@ -311,7 +314,7 @@ bool CursorRepository::garbageCollect(bool force) {
if (force || cursor->expires() < now) {
cursor->kill();
cursor->deleted();
cursor->setDeleted();
}
if (cursor->isDeleted()) {

View File

@ -81,7 +81,8 @@ class CursorRepository {
Cursor* createQueryStream(std::string const& query,
std::shared_ptr<velocypack::Builder> const& binds,
std::shared_ptr<velocypack::Builder> const& opts,
size_t batchSize, double ttl);
size_t batchSize, double ttl,
bool contextOwnedByExterior);
//////////////////////////////////////////////////////////////////////////////
/// @brief remove a cursor by id
@ -111,6 +112,7 @@ class CursorRepository {
//////////////////////////////////////////////////////////////////////////////
/// @brief run a garbage collection on the cursors
/// @return
//////////////////////////////////////////////////////////////////////////////
bool garbageCollect(bool);

View File

@ -458,7 +458,7 @@ void V8DealerFeature::copyInstallationFiles() {
std::string const versionAppendix = std::regex_replace(rest::Version::getServerVersion(), std::regex("-.*$"), "");
std::string const nodeModulesPath = FileUtils::buildFilename("js", "node", "node_modules");
std::string const nodeModulesPathVersioned = basics::FileUtils::buildFilename("js", versionAppendix, "node", "node_modules");
auto filter = [&nodeModulesPath, &nodeModulesPathVersioned, this](std::string const& filename) -> bool{
auto filter = [&nodeModulesPath, &nodeModulesPathVersioned](std::string const& filename) -> bool{
if (filename.size() >= nodeModulesPath.size()) {
std::string normalized = filename;
FileUtils::normalizePath(normalized);

View File

@ -610,10 +610,10 @@ void TRI_InitV8Users(v8::Handle<v8::Context> context, TRI_vocbase_t* vocbase,
v8::Handle<v8::FunctionTemplate> ft;
ft = v8::FunctionTemplate::New(isolate);
ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsers"));
ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor"));
rt = ft->InstanceTemplate();
rt->SetInternalFieldCount(2);
rt->SetInternalFieldCount(0);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "save"), JS_SaveUser);
TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING(isolate, "replace"),
@ -646,14 +646,13 @@ void TRI_InitV8Users(v8::Handle<v8::Context> context, TRI_vocbase_t* vocbase,
JS_AuthIsActive);
v8g->UsersTempl.Reset(isolate, rt);
ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor"));
//ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor"));
TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoUsersCtor"),
ft->GetFunction(), true);
// register the global object
v8::Handle<v8::Object> aa = rt->NewInstance();
if (!aa.IsEmpty()) {
TRI_AddGlobalVariableVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoUsers"),
aa);
TRI_AddGlobalVariableVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoUsers"), aa);
}
}

View File

@ -22,16 +22,26 @@
////////////////////////////////////////////////////////////////////////////////
#include "v8-vocbaseprivate.h"
#include "Aql/QueryCursor.h"
#include "Aql/QueryResult.h"
#include "Basics/conversions.h"
#include "Basics/VelocyPackHelper.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/Cursor.h"
#include "Utils/CursorRepository.h"
#include "Transaction/Context.h"
#include "Transaction/V8Context.h"
#include "V8/v8-conv.h"
#include "V8/v8-globals.h"
#include "V8/v8-utils.h"
#include "V8/v8-vpack.h"
#include "V8Server/v8-voccursor.h"
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
#include "Logger/Logger.h"
using namespace arangodb;
using namespace arangodb::basics;
@ -140,11 +150,8 @@ static void JS_JsonCursor(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_CURSOR_NOT_FOUND);
}
auto cth = cursor->context()->orderCustomTypeHandler();
VPackOptions opts = VPackOptions::Defaults;
opts.customTypeHandler = cth.get();
VPackBuilder builder(&opts);
VPackOptions* opts = cursor->context()->getVPackOptions();
VPackBuilder builder(opts);
builder.openObject(true); // conversion uses sequential iterator, no indexing
Result r = cursor->dumpSync(builder);
if (r.fail()) {
@ -157,6 +164,390 @@ static void JS_JsonCursor(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_END
}
// .............................................................................
// cursor v8 class
// .............................................................................
struct V8Cursor final {
static constexpr uint16_t CID = 4956;
V8Cursor(v8::Isolate* isolate,
v8::Handle<v8::Object> holder,
TRI_vocbase_t& vocbase,
CursorId cursorId)
: _isolate(isolate),
_cursorId(cursorId),
_resolver(vocbase),
_cte(transaction::Context::createCustomTypeHandler(vocbase, _resolver)) {
// sanity checks
TRI_ASSERT(_handle.IsEmpty());
TRI_ASSERT(holder->InternalFieldCount() > 0);
// create a new persistent handle
holder->SetAlignedPointerInInternalField(0, this);
_handle.Reset(_isolate, holder);
_handle.SetWrapperClassId(CID);
// and make it weak, so that we can garbage collect
_handle.SetWeak(&_handle, weakCallback, v8::WeakCallbackType::kFinalizer);
_options.customTypeHandler = _cte.get();
}
~V8Cursor() {
if (!_handle.IsEmpty()) {
TRI_ASSERT(_handle.IsNearDeath());
_handle.ClearWeak();
v8::Local<v8::Object> data =
v8::Local<v8::Object>::New(_isolate, _handle);
data->SetInternalField(0, v8::Undefined(_isolate));
_handle.Reset();
}
if (_isolate) {
TRI_GET_GLOBALS2(_isolate);
TRI_vocbase_t* vocbase = v8g->_vocbase;
if (vocbase) {
CursorRepository* cursors = vocbase->cursorRepository();
cursors->remove(_cursorId, Cursor::CURSOR_VPACK);
}
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief unwraps a structure
//////////////////////////////////////////////////////////////////////////////
static V8Cursor* unwrap(v8::Local<v8::Object> handle) {
TRI_ASSERT(handle->InternalFieldCount() > 0);
return static_cast<V8Cursor*>(handle->GetAlignedPointerFromInternalField(0));
}
/// @brief return false on error
bool maybeFetchBatch(v8::Isolate* isolate) {
if (_dataIterator == nullptr && _hasMore) { // fetch more data
TRI_GET_GLOBALS();
CursorRepository* cursors = v8g->_vocbase->cursorRepository();
bool busy;
Cursor* cc = cursors->find(_cursorId, Cursor::CURSOR_VPACK, busy);
if (busy || cc == nullptr) {
TRI_V8_SET_ERROR("cursor is busy");
return false; // someone else is using it
}
TRI_DEFER(cc->release());
Result r = fetchData(cc);
if (r.fail()) {
TRI_CreateErrorObject(isolate, r);
return false;
}
}
return true;
}
/// @brief fetch the next batch
Result fetchData(Cursor* cursor) {
TRI_ASSERT(cursor != nullptr && cursor->isUsed());
TRI_ASSERT(_hasMore);
TRI_ASSERT(_dataIterator == nullptr);
_dataSlice = VPackSlice::noneSlice();
_extraSlice = VPackSlice::noneSlice();
_tmpResult.clear();
_tmpResult.openObject();
Result r = cursor->dumpSync(_tmpResult);
if (r.fail()) {
return r;
}
_tmpResult.close();
TRI_ASSERT(_tmpResult.slice().isObject());
// TODO as an optimization
for(auto pair : VPackObjectIterator(_tmpResult.slice(), true)) {
if (pair.key.isEqualString("result")) {
_dataSlice = pair.value;
TRI_ASSERT(_dataSlice.isArray());
if (!_dataSlice.isEmptyArray()) {
_dataIterator = std::make_unique<VPackArrayIterator>(_dataSlice);
}
} else if (pair.key.isEqualString("hasMore")) {
_hasMore = pair.value.getBool();
} else if (pair.key.isEqualString("extra")) {
_extraSlice = pair.value;
}
}
// cursor should delete itself
TRI_ASSERT(_hasMore || cursor->isDeleted());
return Result{};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a new streaming cursor from arguments
////////////////////////////////////////////////////////////////////////////////
static void New(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
TRI_V8_CURRENT_GLOBALS_AND_SCOPE;
if (!args.IsConstructCall()) { // if not call as a constructor call it
TRI_V8_THROW_EXCEPTION_USAGE("only instance-able by constructor");
}
if (args.Length() < 1 || args.Length() > 3) {
TRI_V8_THROW_EXCEPTION_USAGE("ArangoQueryStreamCursor(<queryString>, <bindVars>, <options>)");
}
// get the query string
if (!args[0]->IsString()) {
TRI_V8_THROW_TYPE_ERROR("expecting string for <queryString>");
}
std::string const queryString(TRI_ObjectToString(args[0]));
// bind parameters
std::shared_ptr<VPackBuilder> bindVars;
if (args.Length() > 1) {
if (!args[1]->IsUndefined() && !args[1]->IsNull() && !args[1]->IsObject()) {
TRI_V8_THROW_TYPE_ERROR("expecting object for <bindVars>");
}
if (args[1]->IsObject()) {
bindVars.reset(new VPackBuilder);
int res = TRI_V8ToVPack(isolate, *(bindVars.get()), args[1], false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
}
}
// options
auto options = std::make_shared<VPackBuilder>();
if (args.Length() > 2) {
// we have options! yikes!
if (!args[2]->IsObject()) {
TRI_V8_THROW_TYPE_ERROR("expecting object for <options>");
}
int res = TRI_V8ToVPack(isolate, *options, args[2], false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
} else {
VPackObjectBuilder guard(options.get());
}
size_t batchSize = VelocyPackHelper::getNumericValue<size_t>(options->slice(),
"batchSize", 1000);
const bool contextOwnedByExterior = transaction::V8Context::isEmbedded();
TRI_vocbase_t* vocbase = v8g->_vocbase;
TRI_ASSERT(vocbase != nullptr);
auto* cursors = vocbase->cursorRepository(); // create a cursor
double ttl = std::numeric_limits<double>::max();
// specify ID 0 so it uses the external V8 context
auto cc = cursors->createQueryStream(queryString,
std::move(bindVars),
std::move(options),
batchSize, ttl,
contextOwnedByExterior);
TRI_DEFER(cc->release());
// args.Holder() is supposedly better than args.This()
auto self = new V8Cursor(isolate, args.Holder(), *vocbase, cc->id());
Result r = self->fetchData(cc);
if (r.fail()) {
TRI_V8_THROW_EXCEPTION(r);
}
// do not delete self, its owned by V8 now
TRI_V8_RETURN(args.This());
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoQueryStreamCursor.prototype.toArray = ...
////////////////////////////////////////////////////////////////////////////////
static void toArray(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::Isolate* isolate = args.GetIsolate();
v8::HandleScope scope(isolate);
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_NOT_IMPLEMENTED,
"toArray() is not supported on ArangoQueryStreamCursor");
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoQueryStreamCursor.prototype.getExtra = ...
////////////////////////////////////////////////////////////////////////////////
static void getExtra(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
V8Cursor* self = V8Cursor::unwrap(args.Holder());
if (self == nullptr) {
TRI_V8_RETURN(v8::Undefined(isolate));
}
// we always need to fetch
if (!self->maybeFetchBatch(isolate)) { // sets exception
return;
}
if (self->_extraSlice.isObject()) {
TRI_V8_RETURN(TRI_VPackToV8(isolate, self->_extraSlice));
}
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"getExtra() is only valid after all data has been fetched");
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoQueryStreamCursor.prototype.hasNext = ...
////////////////////////////////////////////////////////////////////////////////
static void hasNext(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
V8Cursor* self = V8Cursor::unwrap(args.Holder());
if (self == nullptr) {
TRI_V8_RETURN_UNDEFINED();
}
// we always need to fetch
if (!self->maybeFetchBatch(isolate)) { // sets exception
return;
}
if (self->_dataIterator != nullptr) {
TRI_V8_RETURN_TRUE();
}
TRI_V8_RETURN_FALSE();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoQueryStreamCursor.prototype.next = ...
////////////////////////////////////////////////////////////////////////////////
static void next(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
V8Cursor* self = V8Cursor::unwrap(args.Holder());
if (self == nullptr) {
TRI_V8_RETURN_UNDEFINED();
}
// we always need to fetch
if (!self->maybeFetchBatch(isolate)) { // sets exception
return;
}
if (self->_dataIterator) { // got a current batch
TRI_ASSERT(self->_dataIterator->valid());
VPackSlice s = self->_dataIterator->value();
v8::Local<v8::Value> val = TRI_VPackToV8(isolate, s, &self->_options);
++(*self->_dataIterator);
// reset so that the next one can fetch again
if (!self->_dataIterator->valid()) {
self->_dataIterator.reset();
}
TRI_V8_RETURN(val);
}
TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoQueryStreamCursor.prototype.count = ...
////////////////////////////////////////////////////////////////////////////////
static void count(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
TRI_V8_RETURN_UNDEFINED(); // always undefined
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief explicitly discard cursor, mostly relevant for testing
////////////////////////////////////////////////////////////////////////////////
static void dispose(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
V8Cursor* self = V8Cursor::unwrap(args.Holder());
if (self != nullptr) {
TRI_GET_GLOBALS();
CursorRepository* cursors = v8g->_vocbase->cursorRepository();
cursors->remove(self->_cursorId, Cursor::CURSOR_VPACK);
self->_hasMore = false;
self->_dataSlice = VPackSlice::noneSlice();
self->_extraSlice = VPackSlice::noneSlice();
self->_dataIterator.reset();
self->_tmpResult.clear();
}
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief ArangoQueryStreamCursor.prototype.id = ...
////////////////////////////////////////////////////////////////////////////////
static void id(v8::FunctionCallbackInfo<v8::Value> const& args) {
v8::Isolate* isolate = args.GetIsolate();
v8::HandleScope scope(isolate);
V8Cursor* self = V8Cursor::unwrap(args.Holder());
if (self == nullptr) {
TRI_V8_RETURN(v8::Undefined(isolate));
}
TRI_V8_RETURN(v8::Integer::New(isolate, self->_cursorId));
}
private:
/// called when GC deletes the value
static void weakCallback(const v8::WeakCallbackInfo<v8::Persistent<v8::Object>>& data) {
auto isolate = data.GetIsolate();
auto persistent = data.GetParameter();
auto myPointer = v8::Local<v8::Object>::New(isolate, *persistent);
TRI_ASSERT(myPointer->InternalFieldCount() > 0);
V8Cursor* obj = V8Cursor::unwrap(myPointer);
TRI_ASSERT(obj);
TRI_ASSERT(persistent == &obj->_handle);
TRI_ASSERT(persistent->IsNearDeath());
delete obj;
}
private:
/// @brief persistent handle for V8 object
v8::Persistent<v8::Object> _handle;
/// @brief isolate
v8::Isolate* _isolate;
/// @brief temporary result buffer
VPackBuilder _tmpResult;
/// @brief id of cursor
CursorId _cursorId;
/// cache has more variable
bool _hasMore = true;
VPackSlice _dataSlice = VPackSlice::noneSlice();
/// cached extras which might be attached to the stream
VPackSlice _extraSlice = VPackSlice::noneSlice();
/// @brief pointer to the current result
std::unique_ptr<VPackArrayIterator> _dataIterator;
CollectionNameResolver _resolver;
std::shared_ptr<VPackCustomTypeHandler> _cte;
VPackOptions _options = VPackOptions::Defaults;
};
// .............................................................................
// generate the general cursor template
// .............................................................................
@ -165,10 +556,48 @@ void TRI_InitV8cursor(v8::Handle<v8::Context> context, TRI_v8_global_t* v8g) {
v8::Isolate* isolate = v8::Isolate::GetCurrent();
// cursor functions. not intended to be used by end users
// these cursor functions are the APIs implemented in js/actions/api-simple.js
TRI_AddGlobalFunctionVocbase(isolate,
TRI_V8_ASCII_STRING(isolate, "CREATE_CURSOR"),
JS_CreateCursor, true);
TRI_AddGlobalFunctionVocbase(isolate,
TRI_V8_ASCII_STRING(isolate, "JSON_CURSOR"),
JS_JsonCursor, true);
// streaming query cursor class, intended to be used via ArangoStatement.execute
v8::Handle<v8::ObjectTemplate> rt;
v8::Handle<v8::FunctionTemplate> ft;
ft = v8::FunctionTemplate::New(isolate, V8Cursor::New);
ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoQueryStreamCursor"));
rt = ft->InstanceTemplate();
rt->SetInternalFieldCount(1);
ft->PrototypeTemplate()->Set(TRI_V8_ASCII_STRING(isolate, "isArangoResultSet"),
v8::True(isolate));
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "toArray"),
V8Cursor::toArray);
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "getExtra"),
V8Cursor::getExtra);
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "hasNext"),
V8Cursor::hasNext);
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "next"),
V8Cursor::next);
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "count"),
V8Cursor::count);
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "dispose"),
V8Cursor::dispose);
TRI_V8_AddProtoMethod(isolate, ft, TRI_V8_ASCII_STRING(isolate, "id"),
V8Cursor::id);
v8g->StreamQueryCursorTempl.Reset(isolate, ft);
v8::MaybeLocal<v8::Function> ctor = ft->GetFunction(context);
if (ctor.IsEmpty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "error creating v8 stream cursor");
}
//ft->SetClassName(TRI_V8_ASCII_STRING(isolate, "ArangoStreamQueryCursorCtor"));
TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING(isolate, "ArangoQueryStreamCursor"),
ctor.ToLocalChecked(), true);
}

View File

@ -7,6 +7,7 @@
#include "Transaction/Methods.h"
#include "Transaction/Options.h"
#include "Transaction/V8Context.h"
#include "Utils/CursorRepository.h"
#include "V8/v8-conv.h"
#include "V8/v8-vpack.h"
#include "V8/v8-helper.h"
@ -361,6 +362,8 @@ Result executeTransactionJS(
if (!rv.fail()) {
rv = trx->commit();
}
// if we do not remove unused V8Cursors, V8Context might not reset global state
vocbase.cursorRepository()->garbageCollect(/*force*/false);
return rv;
}

View File

@ -92,11 +92,11 @@
delete global.ArangoDatabase;
// //////////////////////////////////////////////////////////////////////////////
// / @brief ShapedJson stub object - only here for compatibility with 2.8
// / @brief ArangoQueryStreamCursor
// //////////////////////////////////////////////////////////////////////////////
exports.ShapedJson = function () {};
delete global.ShapedJson;
exports.ArangoQueryStreamCursor = global.ArangoQueryStreamCursor;
delete global.ArangoQueryStreamCursor;
// //////////////////////////////////////////////////////////////////////////////
// / @brief dispatcherThreads

View File

@ -32,6 +32,7 @@ module.isSystem = true;
var ArangoStatement = require('@arangodb/arango-statement-common').ArangoStatement;
var GeneralArrayCursor = require('@arangodb/arango-cursor').GeneralArrayCursor;
const ArangoQueryStreamCursor = require('internal').ArangoQueryStreamCursor;
// //////////////////////////////////////////////////////////////////////////////
// / @brief parse a query and return the results
@ -75,8 +76,10 @@ ArangoStatement.prototype.execute = function () {
if (this._cache !== undefined) {
opts.cache = this._cache;
}
if (opts.stream) {
return new ArangoQueryStreamCursor(this._query, this._bindVars, opts);
}
}
// {json:[docs], stats:{}, profile:{}, warnings:{}, cached:true}
var result = AQL_EXECUTE(this._query, this._bindVars, opts);
return new GeneralArrayCursor(result.json, 0, null, result);

View File

@ -41,8 +41,8 @@ TRI_v8_global_t::TRI_v8_global_t(v8::Isolate* isolate)
#ifdef USE_ENTERPRISE
SmartGraphTempl(),
#endif
BufferTempl(),
StreamQueryCursorTempl(),
BufferConstant(),
DeleteConstant(),

View File

@ -411,6 +411,9 @@ struct TRI_v8_global_t {
/// @brief Buffer template
v8::Persistent<v8::FunctionTemplate> BufferTempl;
/// @brief stream query cursor templace
v8::Persistent<v8::FunctionTemplate> StreamQueryCursorTempl;
/// @brief "Buffer" constant
v8::Persistent<v8::String> BufferConstant;

View File

@ -20,7 +20,7 @@ let cursors = [];
function sendQuery (count, async) {
count = count || 1;
for (let i = 0; i < count; ++i) {
let opts = { stream:true };
let opts = { stream: true };
if (async === true) {
opts.batchSize = 1; // runs SLEEP once
}

View File

@ -1,5 +1,5 @@
/*jshint globalstrict:false, strict:false, maxlen:1000*/
/*global assertEqual, assertTrue, assertFalse, fail, more */
/*global assertEqual, assertTrue, assertUndefined, fail, more */
////////////////////////////////////////////////////////////////////////////////
/// @brief test the statement class
@ -29,9 +29,7 @@
////////////////////////////////////////////////////////////////////////////////
const jsunity = require("jsunity");
const arangodb = require("@arangodb");
const db = arangodb.db;
const db = require("internal").db;
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite: stream cursors
@ -92,8 +90,20 @@ function StreamCursorSuite () {
cursor.next();
}
});
}
},
testInfiniteAQL : function() {
var stmt = db._createStatement({ query: "FOR i IN 1..100000000000 RETURN i",
options: { stream: true },
batchSize: 1000});
var cursor = stmt.execute();
assertUndefined(cursor.count());
let i = 10;
while (cursor.hasNext() && i-- > 0) {
cursor.next();
}
}
};
}

View File

@ -576,6 +576,35 @@ function TransactionsImplicitCollectionsSuite () {
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief perform an infinite loop
////////////////////////////////////////////////////////////////////////////////
testUseQueryStreamCursorInAql : function () {
let docs = [];
for(let i = 0; i < 100000; i++) {
docs.push({value: i});
if (i % 5000 === 0) {
c1.save(docs);
docs = [];
}
}
c1.save(docs);
var result = db._executeTransaction({
collections: { allowImplicit: false, read: cn1 },
action: `function (params) {
const db = require('internal').db;
let cc = db._query('FOR i IN @@cn1 RETURN i', { '@cn1' : params.cn1 }, {stream: true});
let xx = 0;
while (cc.hasNext()) {cc.next(); xx++;}
let cc2 = db._query('FOR i IN 1..1000000000 RETURN i', {}, {stream: true});
return xx; }`,
params: { cn1: cn1 }
});
assertEqual(100000, result);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief uses an explicitly declared collection for reading
////////////////////////////////////////////////////////////////////////////////