mirror of https://gitee.com/bigwinds/arangodb
Former JsonCursor now uses QueryResult and contains the TRX context for continuuos reads
This commit is contained in:
parent
36e608a5fd
commit
be118b1b80
|
@ -541,10 +541,10 @@ QueryResult Query::execute(QueryRegistry* registry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryResult res = prepare(registry);
|
QueryResult result = prepare(registry);
|
||||||
|
|
||||||
if (res.code != TRI_ERROR_NO_ERROR) {
|
if (result.code != TRI_ERROR_NO_ERROR) {
|
||||||
return res;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_queryString == nullptr) {
|
if (_queryString == nullptr) {
|
||||||
|
@ -631,11 +631,12 @@ QueryResult Query::execute(QueryRegistry* registry) {
|
||||||
auto stats = std::make_shared<VPackBuilder>();
|
auto stats = std::make_shared<VPackBuilder>();
|
||||||
_engine->_stats.toVelocyPack(*(stats.get()));
|
_engine->_stats.toVelocyPack(*(stats.get()));
|
||||||
|
|
||||||
|
result.context = _trx->transactionContext();
|
||||||
|
|
||||||
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
|
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
|
||||||
|
|
||||||
enterState(FINALIZATION);
|
enterState(FINALIZATION);
|
||||||
|
|
||||||
QueryResult result(TRI_ERROR_NO_ERROR);
|
|
||||||
result.warnings = warningsToVelocyPack();
|
result.warnings = warningsToVelocyPack();
|
||||||
result.result = resultBuilder;
|
result.result = resultBuilder;
|
||||||
result.stats = stats;
|
result.stats = stats;
|
||||||
|
@ -700,10 +701,10 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryResultV8 res = prepare(registry);
|
QueryResultV8 result = prepare(registry);
|
||||||
|
|
||||||
if (res.code != TRI_ERROR_NO_ERROR) {
|
if (result.code != TRI_ERROR_NO_ERROR) {
|
||||||
return res;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
work.reset(new AqlWorkStack(_vocbase, _id, _queryString, _queryLength));
|
work.reset(new AqlWorkStack(_vocbase, _id, _queryString, _queryLength));
|
||||||
|
@ -715,7 +716,6 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
|
||||||
useQueryCache = false;
|
useQueryCache = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryResultV8 result(TRI_ERROR_NO_ERROR);
|
|
||||||
result.result = v8::Array::New(isolate);
|
result.result = v8::Array::New(isolate);
|
||||||
|
|
||||||
// this is the RegisterId our results can be found in
|
// this is the RegisterId our results can be found in
|
||||||
|
@ -781,6 +781,8 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
|
||||||
auto stats = std::make_shared<VPackBuilder>();
|
auto stats = std::make_shared<VPackBuilder>();
|
||||||
_engine->_stats.toVelocyPack(*(stats.get()));
|
_engine->_stats.toVelocyPack(*(stats.get()));
|
||||||
|
|
||||||
|
result.context = _trx->transactionContext();
|
||||||
|
|
||||||
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
|
cleanupPlanAndEngine(TRI_ERROR_NO_ERROR);
|
||||||
|
|
||||||
enterState(FINALIZATION);
|
enterState(FINALIZATION);
|
||||||
|
|
|
@ -30,23 +30,14 @@ namespace arangodb {
|
||||||
namespace velocypack {
|
namespace velocypack {
|
||||||
class Builder;
|
class Builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TransactionContext;
|
||||||
|
|
||||||
namespace aql {
|
namespace aql {
|
||||||
|
|
||||||
struct QueryResult {
|
struct QueryResult {
|
||||||
QueryResult& operator=(QueryResult const& other) = delete;
|
QueryResult& operator=(QueryResult const& other) = delete;
|
||||||
|
QueryResult(QueryResult&& other) = default;
|
||||||
QueryResult(QueryResult&& other) {
|
|
||||||
code = other.code;
|
|
||||||
cached = other.cached;
|
|
||||||
details = other.details;
|
|
||||||
warnings.swap(other.warnings);
|
|
||||||
result.swap(other.result);
|
|
||||||
stats.swap(other.stats);
|
|
||||||
profile.swap(other.profile);
|
|
||||||
|
|
||||||
bindParameters = other.bindParameters;
|
|
||||||
collectionNames = other.collectionNames;
|
|
||||||
}
|
|
||||||
|
|
||||||
QueryResult(int code, std::string const& details)
|
QueryResult(int code, std::string const& details)
|
||||||
: code(code),
|
: code(code),
|
||||||
|
@ -54,7 +45,8 @@ struct QueryResult {
|
||||||
details(details),
|
details(details),
|
||||||
warnings(nullptr),
|
warnings(nullptr),
|
||||||
result(nullptr),
|
result(nullptr),
|
||||||
profile(nullptr) {}
|
profile(nullptr),
|
||||||
|
context(nullptr) {}
|
||||||
|
|
||||||
explicit QueryResult(int code) : QueryResult(code, "") {}
|
explicit QueryResult(int code) : QueryResult(code, "") {}
|
||||||
|
|
||||||
|
@ -71,6 +63,7 @@ struct QueryResult {
|
||||||
std::shared_ptr<arangodb::velocypack::Builder> result;
|
std::shared_ptr<arangodb::velocypack::Builder> result;
|
||||||
std::shared_ptr<arangodb::velocypack::Builder> stats;
|
std::shared_ptr<arangodb::velocypack::Builder> stats;
|
||||||
std::shared_ptr<arangodb::velocypack::Builder> profile;
|
std::shared_ptr<arangodb::velocypack::Builder> profile;
|
||||||
|
std::shared_ptr<arangodb::TransactionContext> context;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,10 +177,9 @@ void RestCursorHandler::processQuery(VPackSlice const& slice) {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto transactionContext = std::make_shared<StandaloneTransactionContext>(_vocbase);
|
|
||||||
arangodb::basics::VPackStringBufferAdapter bufferAdapter(
|
arangodb::basics::VPackStringBufferAdapter bufferAdapter(
|
||||||
_response->body().stringBuffer());
|
_response->body().stringBuffer());
|
||||||
VPackDumper dumper(&bufferAdapter, transactionContext->getVPackOptions());
|
VPackDumper dumper(&bufferAdapter, queryResult.context->getVPackOptions());
|
||||||
dumper.dump(result.slice());
|
dumper.dump(result.slice());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -195,8 +194,8 @@ void RestCursorHandler::processQuery(VPackSlice const& slice) {
|
||||||
opts, "count", false);
|
opts, "count", false);
|
||||||
|
|
||||||
// steal the query result, cursor will take over the ownership
|
// steal the query result, cursor will take over the ownership
|
||||||
arangodb::JsonCursor* cursor = cursors->createFromVelocyPack(
|
arangodb::VelocyPackCursor* cursor = cursors->createFromQueryResult(
|
||||||
queryResult.result, batchSize, extra, ttl, count, queryResult.cached);
|
std::move(queryResult), batchSize, extra, ttl, count);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
_response->body().appendChar('{');
|
_response->body().appendChar('{');
|
||||||
|
|
|
@ -22,7 +22,6 @@
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include "Cursor.h"
|
#include "Cursor.h"
|
||||||
#include "Basics/JsonHelper.h"
|
|
||||||
#include "Basics/VelocyPackHelper.h"
|
#include "Basics/VelocyPackHelper.h"
|
||||||
#include "Basics/VPackStringBufferAdapter.h"
|
#include "Basics/VPackStringBufferAdapter.h"
|
||||||
#include "Utils/CollectionExport.h"
|
#include "Utils/CollectionExport.h"
|
||||||
|
@ -63,20 +62,20 @@ VPackSlice Cursor::extra() const {
|
||||||
return _extra->slice();
|
return _extra->slice();
|
||||||
}
|
}
|
||||||
|
|
||||||
JsonCursor::JsonCursor(TRI_vocbase_t* vocbase, CursorId id,
|
VelocyPackCursor::VelocyPackCursor(TRI_vocbase_t* vocbase, CursorId id,
|
||||||
std::shared_ptr<VPackBuilder> json, size_t batchSize,
|
aql::QueryResult&& result, size_t batchSize,
|
||||||
std::shared_ptr<VPackBuilder> extra, double ttl,
|
std::shared_ptr<VPackBuilder> extra,
|
||||||
bool hasCount, bool cached)
|
double ttl, bool hasCount)
|
||||||
: Cursor(id, batchSize, extra, ttl, hasCount),
|
: Cursor(id, batchSize, extra, ttl, hasCount),
|
||||||
_vocbase(vocbase),
|
_vocbase(vocbase),
|
||||||
_json(json),
|
_result(std::move(result)),
|
||||||
_size(json->slice().length()),
|
_size(result.result->slice().length()),
|
||||||
_cached(cached) {
|
_cached(result.cached) {
|
||||||
TRI_ASSERT(json->slice().isArray());
|
TRI_ASSERT(result.result->slice().isArray());
|
||||||
TRI_UseVocBase(vocbase);
|
TRI_UseVocBase(vocbase);
|
||||||
}
|
}
|
||||||
|
|
||||||
JsonCursor::~JsonCursor() {
|
VelocyPackCursor::~VelocyPackCursor() {
|
||||||
freeJson();
|
freeJson();
|
||||||
|
|
||||||
TRI_ReleaseVocBase(_vocbase);
|
TRI_ReleaseVocBase(_vocbase);
|
||||||
|
@ -86,7 +85,7 @@ JsonCursor::~JsonCursor() {
|
||||||
/// @brief check whether the cursor contains more data
|
/// @brief check whether the cursor contains more data
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
bool JsonCursor::hasNext() {
|
bool VelocyPackCursor::hasNext() {
|
||||||
if (_position < _size) {
|
if (_position < _size) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -99,10 +98,10 @@ bool JsonCursor::hasNext() {
|
||||||
/// @brief return the next element
|
/// @brief return the next element
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
VPackSlice JsonCursor::next() {
|
VPackSlice VelocyPackCursor::next() {
|
||||||
TRI_ASSERT(_json != nullptr);
|
TRI_ASSERT(_result.result != nullptr);
|
||||||
TRI_ASSERT(_position < _size);
|
TRI_ASSERT(_position < _size);
|
||||||
VPackSlice slice = _json->slice();
|
VPackSlice slice = _result.result->slice();
|
||||||
return slice.at(_position++);
|
return slice.at(_position++);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,13 +109,13 @@ VPackSlice JsonCursor::next() {
|
||||||
/// @brief return the cursor size
|
/// @brief return the cursor size
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
size_t JsonCursor::count() const { return _size; }
|
size_t VelocyPackCursor::count() const { return _size; }
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief dump the cursor contents into a string buffer
|
/// @brief dump the cursor contents into a string buffer
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void JsonCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
void VelocyPackCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
||||||
buffer.appendText("\"result\":[");
|
buffer.appendText("\"result\":[");
|
||||||
|
|
||||||
size_t const n = batchSize();
|
size_t const n = batchSize();
|
||||||
|
@ -135,6 +134,9 @@ void JsonCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
arangodb::basics::VPackStringBufferAdapter bufferAdapter(
|
||||||
|
buffer.stringBuffer());
|
||||||
|
VPackDumper dumper(&bufferAdapter, transactionContext->getVPackOptions());
|
||||||
for (size_t i = 0; i < n; ++i) {
|
for (size_t i = 0; i < n; ++i) {
|
||||||
if (!hasNext()) {
|
if (!hasNext()) {
|
||||||
break;
|
break;
|
||||||
|
@ -149,9 +151,6 @@ void JsonCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
arangodb::basics::VPackStringBufferAdapter bufferAdapter(
|
|
||||||
buffer.stringBuffer());
|
|
||||||
VPackDumper dumper(&bufferAdapter, transactionContext->getVPackOptions());
|
|
||||||
try {
|
try {
|
||||||
dumper.dump(row);
|
dumper.dump(row);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
@ -198,9 +197,7 @@ void JsonCursor::dump(arangodb::basics::StringBuffer& buffer) {
|
||||||
/// @brief free the internals
|
/// @brief free the internals
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void JsonCursor::freeJson() {
|
void VelocyPackCursor::freeJson() {
|
||||||
_json = nullptr;
|
|
||||||
|
|
||||||
_isDeleted = true;
|
_isDeleted = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
|
|
||||||
#include "Basics/Common.h"
|
#include "Basics/Common.h"
|
||||||
#include "Basics/StringBuffer.h"
|
#include "Basics/StringBuffer.h"
|
||||||
|
#include "Aql/QueryResult.h"
|
||||||
#include "VocBase/voc-types.h"
|
#include "VocBase/voc-types.h"
|
||||||
|
|
||||||
struct TRI_vocbase_t;
|
struct TRI_vocbase_t;
|
||||||
|
@ -107,14 +108,13 @@ class Cursor {
|
||||||
bool _isUsed;
|
bool _isUsed;
|
||||||
};
|
};
|
||||||
|
|
||||||
class JsonCursor : public Cursor {
|
class VelocyPackCursor : public Cursor {
|
||||||
public:
|
public:
|
||||||
JsonCursor(TRI_vocbase_t*, CursorId,
|
VelocyPackCursor(TRI_vocbase_t*, CursorId, aql::QueryResult&&, size_t,
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>, size_t,
|
std::shared_ptr<arangodb::velocypack::Builder>, double,
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>, double, bool,
|
bool);
|
||||||
bool);
|
|
||||||
|
|
||||||
~JsonCursor();
|
~VelocyPackCursor();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
bool hasNext() override final;
|
bool hasNext() override final;
|
||||||
|
@ -130,7 +130,7 @@ class JsonCursor : public Cursor {
|
||||||
|
|
||||||
private:
|
private:
|
||||||
TRI_vocbase_t* _vocbase;
|
TRI_vocbase_t* _vocbase;
|
||||||
std::shared_ptr<arangodb::velocypack::Builder> _json;
|
aql::QueryResult _result;
|
||||||
size_t const _size;
|
size_t const _size;
|
||||||
bool _cached;
|
bool _cached;
|
||||||
};
|
};
|
||||||
|
|
|
@ -91,17 +91,15 @@ CursorRepository::~CursorRepository() {
|
||||||
/// the cursor will take ownership of both json and extra
|
/// the cursor will take ownership of both json and extra
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
JsonCursor* CursorRepository::createFromVelocyPack(
|
VelocyPackCursor* CursorRepository::createFromQueryResult(
|
||||||
std::shared_ptr<VPackBuilder> json, size_t batchSize,
|
aql::QueryResult&& result, size_t batchSize, std::shared_ptr<VPackBuilder> extra,
|
||||||
std::shared_ptr<VPackBuilder> extra, double ttl, bool count, bool cached) {
|
double ttl, bool count) {
|
||||||
|
TRI_ASSERT(result.result != nullptr);
|
||||||
TRI_ASSERT(json != nullptr);
|
|
||||||
|
|
||||||
CursorId const id = TRI_NewTickServer();
|
CursorId const id = TRI_NewTickServer();
|
||||||
arangodb::JsonCursor* cursor = nullptr;
|
|
||||||
|
|
||||||
cursor = new arangodb::JsonCursor(_vocbase, id, json, batchSize, extra, ttl,
|
arangodb::VelocyPackCursor* cursor = new arangodb::VelocyPackCursor(
|
||||||
count, cached);
|
_vocbase, id, std::move(result), batchSize, extra, ttl, count);
|
||||||
cursor->use();
|
cursor->use();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -36,6 +36,10 @@ namespace velocypack {
|
||||||
class Builder;
|
class Builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace aql {
|
||||||
|
struct QueryResult;
|
||||||
|
}
|
||||||
|
|
||||||
class CollectionExport;
|
class CollectionExport;
|
||||||
|
|
||||||
class CursorRepository {
|
class CursorRepository {
|
||||||
|
@ -60,9 +64,9 @@ class CursorRepository {
|
||||||
/// the cursor will retain a shared pointer of both json and extra
|
/// the cursor will retain a shared pointer of both json and extra
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
JsonCursor* createFromVelocyPack(
|
VelocyPackCursor* createFromQueryResult(
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>, size_t,
|
aql::QueryResult&&, size_t, std::shared_ptr<arangodb::velocypack::Builder>,
|
||||||
std::shared_ptr<arangodb::velocypack::Builder>, double, bool, bool);
|
double, bool);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief creates a cursor and stores it in the registry
|
/// @brief creates a cursor and stores it in the registry
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "Basics/conversions.h"
|
#include "Basics/conversions.h"
|
||||||
#include "Utils/Cursor.h"
|
#include "Utils/Cursor.h"
|
||||||
#include "Utils/CursorRepository.h"
|
#include "Utils/CursorRepository.h"
|
||||||
|
#include "Utils/StandaloneTransactionContext.h"
|
||||||
#include "V8/v8-conv.h"
|
#include "V8/v8-conv.h"
|
||||||
#include "V8/v8-vpack.h"
|
#include "V8/v8-vpack.h"
|
||||||
#include "V8Server/v8-voccursor.h"
|
#include "V8Server/v8-voccursor.h"
|
||||||
|
@ -88,9 +89,14 @@ static void JS_CreateCursor(v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||||
auto cursors =
|
auto cursors =
|
||||||
static_cast<arangodb::CursorRepository*>(vocbase->_cursorRepository);
|
static_cast<arangodb::CursorRepository*>(vocbase->_cursorRepository);
|
||||||
|
|
||||||
|
arangodb::aql::QueryResult result(TRI_ERROR_NO_ERROR);
|
||||||
|
result.result = builder;
|
||||||
|
result.cached = false;
|
||||||
|
result.context = std::make_shared<arangodb::StandaloneTransactionContext>(vocbase);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
arangodb::Cursor* cursor = cursors->createFromVelocyPack(
|
arangodb::Cursor* cursor = cursors->createFromQueryResult(
|
||||||
builder, static_cast<size_t>(batchSize), nullptr, ttl, true, false);
|
std::move(result), static_cast<size_t>(batchSize), nullptr, ttl, true);
|
||||||
|
|
||||||
TRI_ASSERT(cursor != nullptr);
|
TRI_ASSERT(cursor != nullptr);
|
||||||
cursors->release(cursor);
|
cursors->release(cursor);
|
||||||
|
|
Loading…
Reference in New Issue