mirror of https://gitee.com/bigwinds/arangodb
honor transaction options
This commit is contained in:
parent
8b6e9ec793
commit
b91eab0ce8
|
@ -31,6 +31,13 @@
|
|||
using namespace arangodb;
|
||||
using namespace arangodb::aql;
|
||||
|
||||
/// @brief clone, used to make daughter transactions for parts of a
|
||||
/// distributed AQL query running on the coordinator
|
||||
transaction::Methods* AqlTransaction::clone(transaction::Options const& options) const {
|
||||
return new AqlTransaction(transaction::StandaloneContext::Create(vocbase()),
|
||||
&_collections, options, false);
|
||||
}
|
||||
|
||||
/// @brief add a collection to the transaction
|
||||
Result AqlTransaction::processCollection(aql::Collection* collection) {
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
|
|
|
@ -45,8 +45,7 @@ class AqlTransaction final : public transaction::Methods {
|
|||
transaction::Options const& options,
|
||||
bool isMainTransaction)
|
||||
: transaction::Methods(transactionContext, options),
|
||||
_collections(*collections),
|
||||
_options(options) {
|
||||
_collections(*collections) {
|
||||
if (!isMainTransaction) {
|
||||
addHint(transaction::Hints::Hint::LOCK_NEVER);
|
||||
} else {
|
||||
|
@ -90,12 +89,8 @@ class AqlTransaction final : public transaction::Methods {
|
|||
LogicalCollection* documentCollection(TRI_voc_cid_t cid);
|
||||
|
||||
/// @brief clone, used to make daughter transactions for parts of a
|
||||
/// distributed
|
||||
/// AQL query running on the coordinator
|
||||
transaction::Methods* clone() const override {
|
||||
return new AqlTransaction(transaction::StandaloneContext::Create(vocbase()),
|
||||
&_collections, _options, false);
|
||||
}
|
||||
/// distributed AQL query running on the coordinator
|
||||
transaction::Methods* clone(transaction::Options const&) const override;
|
||||
|
||||
/// @brief lockCollections, this is needed in a corner case in AQL: we need
|
||||
/// to lock all shards in a controlled way when we set up a distributed
|
||||
|
@ -109,7 +104,6 @@ class AqlTransaction final : public transaction::Methods {
|
|||
/// operation
|
||||
private:
|
||||
std::map<std::string, aql::Collection*> _collections;
|
||||
transaction::Options _options;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -100,6 +100,14 @@ Query::Query(bool contextOwnedByExterior, TRI_vocbase_t* vocbase,
|
|||
if (aql == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
|
||||
}
|
||||
|
||||
if (_contextOwnedByExterior) {
|
||||
// copy transaction options from global state into our local query options
|
||||
TransactionState* state = transaction::V8Context::getParentState();
|
||||
if (state != nullptr) {
|
||||
_queryOptions.transactionOptions = state->options();
|
||||
}
|
||||
}
|
||||
|
||||
// populate query options
|
||||
if (_options != nullptr) {
|
||||
|
@ -248,8 +256,9 @@ Query* Query::clone(QueryPart part, bool withPlan) {
|
|||
|
||||
TRI_ASSERT(clone->_trx == nullptr);
|
||||
|
||||
clone->_trx = _trx->clone(); // A daughter transaction which does not
|
||||
// actually lock the collections
|
||||
// A daughter transaction which does not
|
||||
// actually lock the collections
|
||||
clone->_trx = _trx->clone(_queryOptions.transactionOptions);
|
||||
|
||||
Result res = clone->_trx->begin();
|
||||
|
||||
|
@ -418,12 +427,12 @@ ExecutionPlan* Query::prepare() {
|
|||
std::unique_ptr<ExecutionPlan> plan;
|
||||
|
||||
if (!_queryString.empty()) {
|
||||
auto parser = std::make_unique<Parser>(this);
|
||||
Parser parser(this);
|
||||
|
||||
parser->parse(false);
|
||||
parser.parse(false);
|
||||
// put in bind parameters
|
||||
parser->ast()->injectBindParameters(_bindParameters);
|
||||
_isModificationQuery = parser->isModificationQuery();
|
||||
parser.ast()->injectBindParameters(_bindParameters);
|
||||
_isModificationQuery = parser.isModificationQuery();
|
||||
}
|
||||
|
||||
TRI_ASSERT(_trx == nullptr);
|
||||
|
|
|
@ -68,7 +68,7 @@ QueryOptions::QueryOptions() :
|
|||
auto queryCacheMode = QueryCache::instance()->mode();
|
||||
cache = (queryCacheMode == CACHE_ALWAYS_ON);
|
||||
}
|
||||
|
||||
|
||||
void QueryOptions::fromVelocyPack(VPackSlice const& slice) {
|
||||
if (!slice.isObject()) {
|
||||
return;
|
||||
|
|
|
@ -106,7 +106,7 @@ BaseEngine::BaseEngine(TRI_vocbase_t* vocbase, VPackSlice info)
|
|||
// created transaction is considered a "MAIN" part and will not switch
|
||||
// off collection locking completely!
|
||||
_query =
|
||||
new aql::Query(true, vocbase, aql::QueryString(), params, opts, aql::PART_DEPENDENT);
|
||||
new aql::Query(false, vocbase, aql::QueryString(), params, opts, aql::PART_DEPENDENT);
|
||||
_query->injectTransaction(_trx);
|
||||
|
||||
VPackSlice variablesSlice = info.get(VARIABLES);
|
||||
|
|
|
@ -306,10 +306,10 @@ bool RestQueryHandler::parseQuery() {
|
|||
"expecting a JSON object as body");
|
||||
};
|
||||
|
||||
std::string const&& queryString =
|
||||
std::string const queryString =
|
||||
VelocyPackHelper::checkAndGetStringValue(body, "query");
|
||||
|
||||
Query query(true, _vocbase, QueryString(queryString),
|
||||
Query query(false, _vocbase, QueryString(queryString),
|
||||
nullptr, nullptr, PART_MAIN);
|
||||
|
||||
auto parseResult = query.parse();
|
||||
|
|
|
@ -438,15 +438,16 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
|
|||
// "transaction size" counters have reached their limit
|
||||
if (_options.intermediateCommitCount <= numOperations ||
|
||||
_options.intermediateCommitSize <= newSize) {
|
||||
internalCommit();
|
||||
_numInserts = 0;
|
||||
_numUpdates = 0;
|
||||
_numRemoves = 0;
|
||||
// LOG_TOPIC(ERR, Logger::FIXME) << "INTERMEDIATE COMMIT!";
|
||||
internalCommit();
|
||||
_numInserts = 0;
|
||||
_numUpdates = 0;
|
||||
_numRemoves = 0;
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
_numLogdata = 0;
|
||||
_numLogdata = 0;
|
||||
#endif
|
||||
createTransaction();
|
||||
}
|
||||
createTransaction();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -68,6 +68,8 @@ class TransactionState {
|
|||
bool isDBServer() const { return ServerState::isDBServer(_serverRole); }
|
||||
bool isCoordinator() const { return ServerState::isCoordinator(_serverRole); }
|
||||
|
||||
transaction::Options& options() { return _options; }
|
||||
transaction::Options const& options() const { return _options; }
|
||||
TRI_vocbase_t* vocbase() const { return _vocbase; }
|
||||
TRI_voc_tid_t id() const { return _id; }
|
||||
transaction::Status status() const { return _status; }
|
||||
|
|
|
@ -2701,14 +2701,12 @@ std::vector<std::shared_ptr<Index>> transaction::Methods::indexesForCollection(
|
|||
}
|
||||
|
||||
/// @brief Lock all collections. Only works for selected sub-classes
|
||||
|
||||
int transaction::Methods::lockCollections() {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// @brief Clone this transaction. Only works for selected sub-classes
|
||||
|
||||
transaction::Methods* transaction::Methods::clone() const {
|
||||
transaction::Methods* transaction::Methods::clone(transaction::Options const&) const {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
|
|
|
@ -373,7 +373,7 @@ class Methods {
|
|||
virtual int lockCollections();
|
||||
|
||||
/// @brief Clone this transaction. Only works for selected sub-classes
|
||||
virtual transaction::Methods* clone() const;
|
||||
virtual transaction::Methods* clone(transaction::Options const&) const;
|
||||
|
||||
/// @brief return the collection name resolver
|
||||
CollectionNameResolver const* resolver() const;
|
||||
|
|
|
@ -115,15 +115,21 @@ bool transaction::V8Context::isGlobal() const {
|
|||
return _sharedTransactionContext == this;
|
||||
}
|
||||
|
||||
/// @brief check whether the transaction is embedded
|
||||
bool transaction::V8Context::IsEmbedded() {
|
||||
/// @brief return parent transaction state or none
|
||||
TransactionState* transaction::V8Context::getParentState() {
|
||||
TRI_v8_global_t* v8g = static_cast<TRI_v8_global_t*>(
|
||||
v8::Isolate::GetCurrent()->GetData(V8PlatformFeature::V8_DATA_SLOT));
|
||||
if (v8g->_transactionContext == nullptr) {
|
||||
return false;
|
||||
if (v8g == nullptr ||
|
||||
v8g->_transactionContext == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
return static_cast<transaction::V8Context*>(v8g->_transactionContext)
|
||||
->_currentTransaction != nullptr;
|
||||
->_currentTransaction;
|
||||
}
|
||||
|
||||
/// @brief check whether the transaction is embedded
|
||||
bool transaction::V8Context::isEmbedded() {
|
||||
return (getParentState() != nullptr);
|
||||
}
|
||||
|
||||
/// @brief create a context, returned in a shared ptr
|
||||
|
|
|
@ -69,8 +69,11 @@ class V8Context final : public Context {
|
|||
/// @brief whether or not the transaction context is a global one
|
||||
bool isGlobal() const;
|
||||
|
||||
/// @brief return parent transaction state or none
|
||||
static TransactionState* getParentState();
|
||||
|
||||
/// @brief check whether the transaction is embedded
|
||||
static bool IsEmbedded();
|
||||
static bool isEmbedded();
|
||||
|
||||
/// @brief create a context, returned in a shared ptr
|
||||
static std::shared_ptr<transaction::V8Context> Create(TRI_vocbase_t*, bool);
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#define PREVENT_EMBEDDED_TRANSACTION() \
|
||||
if (arangodb::transaction::V8Context::IsEmbedded()) { \
|
||||
if (arangodb::transaction::V8Context::isEmbedded()) { \
|
||||
TRI_V8_THROW_EXCEPTION(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION); \
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue