1
0
Fork 0

Support Transactions in RestCursorHandler (#8539)

This commit is contained in:
Simon 2019-03-25 17:04:11 +01:00 committed by Jan
parent 48b709ae16
commit bcc26926de
16 changed files with 790 additions and 494 deletions

View File

@ -44,10 +44,10 @@
using namespace arangodb;
using namespace arangodb::aql;
QueryResultCursor::QueryResultCursor(TRI_vocbase_t& vocbase, CursorId id,
QueryResultCursor::QueryResultCursor(TRI_vocbase_t& vocbase,
aql::QueryResult&& result,
size_t batchSize, double ttl, bool hasCount)
: Cursor(id, batchSize, ttl, hasCount),
: Cursor(TRI_NewServerSpecificTick(), batchSize, ttl, hasCount),
_guard(vocbase),
_result(std::move(result)),
_iterator(_result.result->slice()),
@ -147,21 +147,23 @@ Result QueryResultCursor::dumpSync(VPackBuilder& builder) {
// QueryStreamCursor class
// .............................................................................
QueryStreamCursor::QueryStreamCursor(TRI_vocbase_t& vocbase, CursorId id,
QueryStreamCursor::QueryStreamCursor(TRI_vocbase_t& vocbase,
std::string const& query,
std::shared_ptr<VPackBuilder> bindVars,
std::shared_ptr<VPackBuilder> opts, size_t batchSize,
double ttl, bool contextOwnedByExterior)
: Cursor(id, batchSize, ttl, /*hasCount*/ false),
double ttl, bool contextOwnedByExterior,
std::shared_ptr<transaction::Context> ctx)
: Cursor(TRI_NewServerSpecificTick(), batchSize, ttl, /*hasCount*/ false),
_guard(vocbase),
_exportCount(-1),
_queryResultPos(0) {
auto registry = QueryRegistryFeature::registry();
TRI_ASSERT(registry != nullptr);
_query = std::make_unique<Query>(contextOwnedByExterior, _guard.database(),
aql::QueryString(query), std::move(bindVars),
std::move(opts), arangodb::aql::PART_MAIN);
_query->setTransactionContext(std::move(ctx));
_query->prepare(registry);
TRI_ASSERT(_query->state() == aql::QueryExecutionState::ValueType::EXECUTION);
@ -181,20 +183,20 @@ QueryStreamCursor::QueryStreamCursor(TRI_vocbase_t& vocbase, CursorId id,
}
}
if (contextOwnedByExterior) {
TRI_ASSERT(_query->trx()->status() == transaction::Status::RUNNING);
int level = _query->trx()->state()->nestingLevel(); // should be level 0 or 1
// things break if the Query outlives a V8 transaction
_stateChangeCb = [this, level](transaction::Methods& trx, transaction::Status status) {
if (trx.state()->nestingLevel() == level &&
(status == transaction::Status::COMMITTED ||
status == transaction::Status::ABORTED)) {
this->setDeleted();
}
};
if (!_query->trx()->addStatusChangeCallback(&_stateChangeCb)) {
_stateChangeCb = nullptr;
// ensures the cursor is cleaned up as soon as the outer transaction ends
// otherwise we just get issues because we might still try to use the trx
TRI_ASSERT(_query->trx()->status() == transaction::Status::RUNNING);
int level = _query->trx()->state()->nestingLevel(); // should be level 0 or 1
// things break if the Query outlives a V8 transaction
_stateChangeCb = [this, level](transaction::Methods& trx, transaction::Status status) {
if (trx.state()->nestingLevel() == level &&
(status == transaction::Status::COMMITTED ||
status == transaction::Status::ABORTED)) {
this->setDeleted();
}
};
if (!_query->trx()->addStatusChangeCallback(&_stateChangeCb)) {
_stateChangeCb = nullptr;
}
}

View File

@ -42,7 +42,7 @@ class Query;
/// Should be used in conjunction with the RestCursorHandler
class QueryResultCursor final : public arangodb::Cursor {
public:
QueryResultCursor(TRI_vocbase_t& vocbase, CursorId id, aql::QueryResult&& result,
QueryResultCursor(TRI_vocbase_t& vocbase, aql::QueryResult&& result,
size_t batchSize, double ttl, bool hasCount);
~QueryResultCursor() = default;
@ -83,10 +83,12 @@ class QueryResultCursor final : public arangodb::Cursor {
/// cursor is deleted (or query exhausted)
class QueryStreamCursor final : public arangodb::Cursor {
public:
QueryStreamCursor(TRI_vocbase_t& vocbase, CursorId id, std::string const& query,
QueryStreamCursor(TRI_vocbase_t& vocbase, std::string const& query,
std::shared_ptr<velocypack::Builder> bindVars,
std::shared_ptr<velocypack::Builder> opts, size_t batchSize,
double ttl, bool contextOwnedByExterior);
std::shared_ptr<velocypack::Builder> opts,
size_t batchSize, double ttl,
bool contextOwnedByExterior,
std::shared_ptr<transaction::Context> ctx);
~QueryStreamCursor();
@ -117,7 +119,7 @@ class QueryStreamCursor final : public arangodb::Cursor {
private:
DatabaseGuard _guard;
int64_t _exportCount; // used by RocksDBRestExportHandler
int64_t _exportCount; // used by RocksDBRestExportHandler (<0 is not used)
/// current query
std::unique_ptr<aql::Query> _query;
/// buffered results

View File

@ -791,7 +791,7 @@ OperationResult GraphOperations::removeEdgeOrVertex(const std::string& collectio
edgeCollections.emplace(it); // but also to edgeCollections for later iteration
}
auto ctx = std::make_shared<transaction::SimpleSmartContext>(_vocbase);
auto ctx = std::make_shared<transaction::StandaloneSmartContext>(_vocbase);
transaction::Options trxOptions;
trxOptions.waitForSync = waitForSync;
UserTransaction trx{ctx, {}, trxCollections, {}, trxOptions};

View File

@ -165,7 +165,8 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
TRI_ASSERT(cursors != nullptr);
Cursor* cursor = cursors->createQueryStream(querySlice.copyString(), bindVarsBuilder,
_options, batchSize, ttl,
/*contextExt*/ false);
/*contextOwnedByExt*/ false,
createAQLTransactionContext());
return generateCursorResult(rest::ResponseCode::CREATED, cursor);
}
@ -180,6 +181,7 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
auto query = std::make_unique<aql::Query>(
false, _vocbase, arangodb::aql::QueryString(queryStr, static_cast<size_t>(l)),
bindVarsBuilder, _options, arangodb::aql::PART_MAIN);
query->setTransactionContext(createAQLTransactionContext());
std::shared_ptr<aql::SharedQueryState> ss = query->sharedState();
auto self = shared_from_this();

View File

@ -561,17 +561,18 @@ std::unique_ptr<SingleCollectionTransaction> RestVocbaseBaseHandler::createTrans
transaction::Manager* mgr = transaction::ManagerFeature::manager();
TRI_ASSERT(mgr != nullptr);
if (pos > 0 && pos < value.size()) {
if (value.compare(pos, std::string::npos, " begin") == 0) {
value = _request->header(StaticStrings::TransactionBody, found);
if (found) {
auto trxOpts = VPackParser::fromJson(value);
Result res = mgr->createManagedTrx(_vocbase, tid, trxOpts->slice());;
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "missing transaction config");
if (pos > 0 && pos < value.size() &&
value.compare(pos, std::string::npos, " begin") == 0) {
value = _request->header(StaticStrings::TransactionBody, found);
if (found) {
if (!transaction::isFollowerTransactionId(tid) ||
!ServerState::instance()->isDBServer()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION);
}
auto trxOpts = VPackParser::fromJson(value);
Result res = mgr->createManagedTrx(_vocbase, tid, trxOpts->slice());;
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
}
}
@ -590,11 +591,10 @@ std::unique_ptr<SingleCollectionTransaction> RestVocbaseBaseHandler::createTrans
/// @brief create proper transaction context, inclusing the proper IDs
std::shared_ptr<transaction::Context> RestVocbaseBaseHandler::createAQLTransactionContext() const {
TRI_ASSERT(ServerState::instance()->isDBServer());
bool found = false;
std::string value = _request->header(StaticStrings::TransactionId, found);
if (!found) {
return std::make_shared<transaction::SimpleSmartContext>(_vocbase);
return std::make_shared<transaction::StandaloneSmartContext>(_vocbase);
}
TRI_voc_tid_t tid = 0;
@ -602,7 +602,8 @@ std::shared_ptr<transaction::Context> RestVocbaseBaseHandler::createAQLTransacti
try {
tid = std::stoull(value, &pos, 10);
} catch (...) {}
if (tid == 0 || !transaction::isLeaderTransactionId(tid)) {
if (tid == 0 || (transaction::isLegacyTransactionId(tid) &&
ServerState::instance()->isRunningInCluster())) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "invalid transaction ID");
}
@ -611,6 +612,9 @@ std::shared_ptr<transaction::Context> RestVocbaseBaseHandler::createAQLTransacti
if (pos > 0 && pos < value.size()) {
if (value.compare(pos, std::string::npos, " aql") == 0) {
if (!ServerState::instance()->isDBServer()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION);
}
return std::make_shared<transaction::AQLStandaloneContext>(_vocbase, tid);
}
}

View File

@ -434,9 +434,12 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
allTransactionsLocker.unlock();
std::this_thread::yield();
if (i++ > 16) {
if (i++ > 32) {
LOG_TOPIC(DEBUG, Logger::TRANSACTIONS) << "waiting on trx lock " << tid;
i = 0;
if (application_features::ApplicationServer::isStopping()) {
return nullptr; // shutting down
}
}
} while (true);

View File

@ -41,8 +41,8 @@ ManagerFeature::ManagerFeature(application_features::ApplicationServer& server)
: ApplicationFeature(server, "TransactionManager"), _workItem(nullptr), _gcfunc() {
setOptional(false);
startsAfter("BasicsPhase");
startsAfter("EngineSelector");
startsBefore("Database");
_gcfunc = [this] (bool cancelled) {
if (!cancelled) {
@ -69,8 +69,9 @@ void ManagerFeature::start() {
void ManagerFeature::beginShutdown() {
_workItem.reset();
// make sure no lingering managed trx remain
// at this point all cursor should have been aborted already
MANAGER->garbageCollect(/*abortAll*/true);
// make sure no lingering managed trx remain
while (MANAGER->garbageCollect(/*abortAll*/true)) {
LOG_TOPIC(WARN, Logger::TRANSACTIONS) << "still waiting for managed transaction";
std::this_thread::sleep_for(std::chrono::seconds(1));

View File

@ -131,25 +131,25 @@ void AQLStandaloneContext::unregisterTransaction() noexcept {
mgr->unregisterAQLTrx(_globalId);
}
// ============= SimpleSmartContext =============
// ============= StandaloneSmartContext =============
SimpleSmartContext::SimpleSmartContext(TRI_vocbase_t& vocbase)
StandaloneSmartContext::StandaloneSmartContext(TRI_vocbase_t& vocbase)
: SmartContext(vocbase, Context::makeTransactionId(), nullptr) {}
/// @brief get parent transaction (if any)
TransactionState* SimpleSmartContext::getParentTransaction() const {
TransactionState* StandaloneSmartContext::getParentTransaction() const {
return _state;
}
/// @brief register the transaction,
void SimpleSmartContext::registerTransaction(TransactionState* state) {
void StandaloneSmartContext::registerTransaction(TransactionState* state) {
TRI_ASSERT(_state == nullptr);
_state = state;
}
/// @brief unregister the transaction
void SimpleSmartContext::unregisterTransaction() noexcept {
void StandaloneSmartContext::unregisterTransaction() noexcept {
TRI_ASSERT(_state != nullptr);
_state = nullptr;
}

View File

@ -113,9 +113,9 @@ struct AQLStandaloneContext final : public SmartContext {
/// Can be used to reuse transaction state between multiple
/// transaction::Methods instances. Mainly for legacy clients
/// that do not send the transaction ID header
struct SimpleSmartContext final : public SmartContext {
struct StandaloneSmartContext final : public SmartContext {
explicit SimpleSmartContext(TRI_vocbase_t& vocbase);
explicit StandaloneSmartContext(TRI_vocbase_t& vocbase);
/// @brief get parent transaction (if any)
TransactionState* getParentTransaction() const override;

View File

@ -133,10 +133,8 @@ Cursor* CursorRepository::createFromQueryResult(aql::QueryResult&& result, size_
double ttl, bool hasCount) {
TRI_ASSERT(result.result != nullptr);
CursorId const id = TRI_NewServerSpecificTick(); // embedded server id
TRI_ASSERT(id != 0);
std::unique_ptr<Cursor> cursor(
new aql::QueryResultCursor(_vocbase, id, std::move(result), batchSize, ttl, hasCount));
new aql::QueryResultCursor(_vocbase, std::move(result), batchSize, ttl, hasCount));
cursor->use();
return addCursor(std::move(cursor));
@ -153,14 +151,14 @@ Cursor* CursorRepository::createQueryStream(std::string const& query,
std::shared_ptr<VPackBuilder> const& binds,
std::shared_ptr<VPackBuilder> const& opts,
size_t batchSize, double ttl,
bool contextOwnedByExterior) {
bool contextOwnedByExterior,
std::shared_ptr<transaction::Context> ctx) {
TRI_ASSERT(!query.empty());
CursorId const id = TRI_NewServerSpecificTick(); // embedded server id
TRI_ASSERT(id != 0);
auto cursor = std::make_unique<aql::QueryStreamCursor>(_vocbase, id, query, binds,
auto cursor = std::make_unique<aql::QueryStreamCursor>(_vocbase, query, binds,
opts, batchSize, ttl,
contextOwnedByExterior);
contextOwnedByExterior,
std::move(ctx));
cursor->use();
return addCursor(std::move(cursor));

View File

@ -81,7 +81,8 @@ class CursorRepository {
Cursor* createQueryStream(std::string const& query,
std::shared_ptr<velocypack::Builder> const& binds,
std::shared_ptr<velocypack::Builder> const& opts,
size_t batchSize, double ttl, bool contextOwnedByExterior);
size_t batchSize, double ttl, bool contextOwnedByExterior,
std::shared_ptr<transaction::Context> ctx);
//////////////////////////////////////////////////////////////////////////////
/// @brief remove a cursor by id

View File

@ -345,7 +345,8 @@ struct V8Cursor final {
// specify ID 0 so it uses the external V8 context
auto cc = cursors->createQueryStream(queryString, std::move(bindVars),
std::move(options), batchSize, ttl,
contextOwnedByExterior);
contextOwnedByExterior,
/*trxCtx*/ nullptr);
TRI_DEFER(cc->release());
// args.Holder() is supposedly better than args.This()
auto self = std::make_unique<V8Cursor>(isolate, args.Holder(), *vocbase, cc->id());

View File

@ -252,15 +252,13 @@ ArangoQueryCursor.prototype[Symbol.iterator] = function * () {
// //////////////////////////////////////////////////////////////////////////////
ArangoQueryCursor.prototype.dispose = function () {
if (!this.data.id) {
// client side only cursor
if (!this.data.id || !this._hasMore) {
// client side only cursor, or already disposed
return;
}
var requestResult = this._database._connection.DELETE(this._baseurl());
arangosh.checkRequestResult(requestResult);
this.data.id = undefined;
};

View File

@ -28,11 +28,11 @@
// / @author Copyright 2012-2013, triAGENS GmbH, Cologne, Germany
// //////////////////////////////////////////////////////////////////////////////
var internal = require('internal');
var arangosh = require('@arangodb/arangosh');
const internal = require('internal');
const arangosh = require('@arangodb/arangosh');
var ArangoStatement = require('@arangodb/arango-statement-common').ArangoStatement;
var ArangoQueryCursor = require('@arangodb/arango-query-cursor').ArangoQueryCursor;
const ArangoStatement = require('@arangodb/arango-statement-common').ArangoStatement;
const ArangoQueryCursor = require('@arangodb/arango-query-cursor').ArangoQueryCursor;
// //////////////////////////////////////////////////////////////////////////////
// / @brief return a string representation of the statement

View File

@ -28,6 +28,7 @@
const internal = require('internal');
const arangosh = require('@arangodb/arangosh');
const ArangoError = require('@arangodb').ArangoError;
const ArangoQueryCursor = require('@arangodb/arango-query-cursor').ArangoQueryCursor;
function ArangoTransaction (database, data) {
this._id = 0;
@ -130,6 +131,49 @@ ArangoTransaction.prototype.abort = function() {
this._id = 0;
};
ArangoTransaction.prototype.query = function(query, bindVars, cursorOptions, options) {
if (typeof query !== 'string' || query === undefined || query === '') {
throw 'need a valid query string';
}
if (options === undefined && cursorOptions !== undefined) {
options = cursorOptions;
}
let body = {
query: query,
count: (cursorOptions && cursorOptions.count) || false,
bindVars: bindVars || undefined,
};
if (cursorOptions && cursorOptions.batchSize) {
body.batchSize = cursorOptions.batchSize;
}
if (options) {
body.options = options;
}
if (cursorOptions && cursorOptions.cache) {
body.cache = cursorOptions.cache;
}
const headers = {'x-arango-trx-id' : this.id()};
var requestResult = this._database._connection.POST('/_api/cursor', body, headers);
arangosh.checkRequestResult(requestResult);
let isStream = false;
if (options && options.stream) {
isStream = options.stream;
}
return new ArangoQueryCursor(this._database, requestResult, isStream);
};
ArangoTransactionCollection.prototype.name = function() {
return this._collection.name();
};
ArangoTransactionCollection.prototype.document = function(id) {
if (this._transaction.id() === 0) {
throw new ArangoError({

File diff suppressed because it is too large Load Diff