1
0
Fork 0
This commit is contained in:
jsteemann 2017-01-30 09:02:57 +01:00
parent 09851cdf75
commit 802c384448
8 changed files with 83 additions and 77 deletions

View File

@ -1111,7 +1111,7 @@ void Query::enterContext() {
auto ctx = static_cast<arangodb::V8TransactionContext*>(
v8g->_transactionContext);
if (ctx != nullptr) {
ctx->registerTransaction(_trx->getInternals());
ctx->registerTransaction(_trx->state());
}
}

View File

@ -102,7 +102,7 @@ LogicalCollection* AqlTransaction::documentCollection(TRI_voc_cid_t cid) {
/// order via an HTTP call. This method is used to implement that HTTP action.
int AqlTransaction::lockCollections() {
auto trx = getInternals();
auto trx = state();
for (auto& trxCollection : trx->_collections) {
int res = trxCollection->lock(trxCollection->_accessType, 0);

View File

@ -39,7 +39,10 @@ struct OperationResult {
: code(TRI_ERROR_NO_ERROR), wasSynchronous(false) {}
explicit OperationResult(int code)
: buffer(std::make_shared<VPackBuffer<uint8_t>>()), customTypeHandler(), code(code), wasSynchronous(false) {
: buffer(std::make_shared<VPackBuffer<uint8_t>>()),
customTypeHandler(),
code(code),
wasSynchronous(false) {
if (code != TRI_ERROR_NO_ERROR) {
errorMessage = TRI_errno_string(code);
}
@ -66,7 +69,10 @@ struct OperationResult {
}
OperationResult(int code, std::string const& message)
: buffer(std::make_shared<VPackBuffer<uint8_t>>()), customTypeHandler(), errorMessage(message), code(code),
: buffer(std::make_shared<VPackBuffer<uint8_t>>()),
customTypeHandler(),
errorMessage(message),
code(code),
wasSynchronous(false) {
TRI_ASSERT(code != TRI_ERROR_NO_ERROR);
}

View File

@ -55,20 +55,20 @@ class ReplicationTransaction : public Transaction {
inline TransactionCollection* trxCollection(TRI_voc_cid_t cid) {
TRI_ASSERT(cid > 0);
TransactionCollection* trxCollection = _trx->collection(cid, AccessMode::Type::WRITE);
TransactionCollection* trxCollection = _state->collection(cid, AccessMode::Type::WRITE);
if (trxCollection == nullptr) {
int res = _trx->addCollection(cid, AccessMode::Type::WRITE, 0, true, true);
int res = _state->addCollection(cid, AccessMode::Type::WRITE, 0, true, true);
if (res == TRI_ERROR_NO_ERROR) {
res = _trx->ensureCollections();
res = _state->ensureCollections();
}
if (res != TRI_ERROR_NO_ERROR) {
return nullptr;
}
trxCollection = _trx->collection(cid, AccessMode::Type::WRITE);
trxCollection = _state->collection(cid, AccessMode::Type::WRITE);
}
return trxCollection;

View File

@ -79,7 +79,7 @@ TransactionCollection* SingleCollectionTransaction::trxCollection() {
TRI_ASSERT(_cid > 0);
if (_trxCollection == nullptr) {
_trxCollection = _trx->collection(_cid, _accessType);
_trxCollection = _state->collection(_cid, _accessType);
if (_trxCollection != nullptr) {
_documentCollection =

View File

@ -157,8 +157,8 @@ static OperationResult EmptyResult(bool waitForSync) {
void Transaction::addHint(TransactionHints::Hint hint, bool passthrough) {
_hints.set(hint);
if (passthrough && _trx != nullptr) {
_trx->_hints.set(hint);
if (passthrough && _state != nullptr) {
_state->_hints.set(hint);
}
}
@ -166,20 +166,20 @@ void Transaction::addHint(TransactionHints::Hint hint, bool passthrough) {
void Transaction::removeHint(TransactionHints::Hint hint, bool passthrough) {
_hints.unset(hint);
if (passthrough && _trx != nullptr) {
_trx->_hints.unset(hint);
if (passthrough && _state != nullptr) {
_state->_hints.unset(hint);
}
}
/// @brief whether or not the transaction consists of a single operation only
bool Transaction::isSingleOperationTransaction() const {
return _trx->isSingleOperation();
return _state->isSingleOperation();
}
/// @brief get the status of the transaction
Transaction::Status Transaction::getStatus() const {
if (_trx != nullptr) {
return _trx->_status;
if (_state != nullptr) {
return _state->_status;
}
return Transaction::Status::UNDEFINED;
}
@ -188,8 +188,8 @@ Transaction::Status Transaction::getStatus() const {
void Transaction::setAllowImplicitCollections(bool value) {
_allowImplicitCollections = value;
if (_trx != nullptr) {
_trx->_allowImplicit = value;
if (_state != nullptr) {
_state->_allowImplicit = value;
}
}
@ -572,7 +572,7 @@ Transaction::Transaction(std::shared_ptr<TransactionContext> transactionContext)
_waitForSync(false),
_allowImplicitCollections(true),
_isReal(true),
_trx(nullptr),
_state(nullptr),
_vocbase(transactionContext->vocbase()),
_resolver(nullptr),
_transactionContext(transactionContext),
@ -590,12 +590,12 @@ Transaction::Transaction(std::shared_ptr<TransactionContext> transactionContext)
/// @brief destroy the transaction
Transaction::~Transaction() {
if (_trx == nullptr) {
if (_state == nullptr) {
return;
}
if (isEmbeddedTransaction()) {
_trx->_nestingLevel--;
_state->_nestingLevel--;
} else {
if (getStatus() == Transaction::Status::RUNNING) {
// auto abort a running transaction
@ -615,9 +615,9 @@ Transaction::~Transaction() {
/// @brief return the names of all collections used in the transaction
std::vector<std::string> Transaction::collectionNames() const {
std::vector<std::string> result;
result.reserve(_trx->_collections.size());
result.reserve(_state->_collections.size());
for (auto& trxCollection : _trx->_collections) {
for (auto& trxCollection : _state->_collections) {
if (trxCollection->_collection != nullptr) {
result.emplace_back(trxCollection->_collection->name());
}
@ -637,15 +637,15 @@ CollectionNameResolver const* Transaction::resolver() {
/// @brief return the transaction collection for a document collection
TransactionCollection* Transaction::trxCollection(TRI_voc_cid_t cid) const {
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
TRI_ASSERT(getStatus() == Transaction::Status::RUNNING);
return _trx->collection(cid, AccessMode::Type::READ);
return _state->collection(cid, AccessMode::Type::READ);
}
/// @brief order a ditch for a collection
DocumentDitch* Transaction::orderDitch(TRI_voc_cid_t cid) {
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
TRI_ASSERT(getStatus() == Transaction::Status::RUNNING ||
getStatus() == Transaction::Status::CREATED);
@ -653,7 +653,7 @@ DocumentDitch* Transaction::orderDitch(TRI_voc_cid_t cid) {
return _ditchCache.ditch;
}
TransactionCollection* trxCollection = _trx->collection(cid, AccessMode::Type::READ);
TransactionCollection* trxCollection = _state->collection(cid, AccessMode::Type::READ);
if (trxCollection == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "unable to determine transaction collection");
@ -680,11 +680,11 @@ bool Transaction::hasDitch(TRI_voc_cid_t cid) const {
/// @brief get (or create) a rocksdb WriteTransaction
rocksdb::Transaction* Transaction::rocksTransaction() {
if (_trx->_rocksTransaction == nullptr) {
_trx->_rocksTransaction = RocksDBFeature::instance()->db()->BeginTransaction(
if (_state->_rocksTransaction == nullptr) {
_state->_rocksTransaction = RocksDBFeature::instance()->db()->BeginTransaction(
rocksdb::WriteOptions(), rocksdb::OptimisticTransactionOptions());
}
return _trx->_rocksTransaction;
return _state->_rocksTransaction;
}
/// @brief extract the _key attribute from a slice
@ -1108,7 +1108,7 @@ void Transaction::buildDocumentIdentity(LogicalCollection* collection,
/// @brief begin the transaction
int Transaction::begin() {
if (_trx == nullptr) {
if (_state == nullptr) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
@ -1118,47 +1118,47 @@ int Transaction::begin() {
if (!_isReal) {
if (_nestingLevel == 0) {
_trx->_status = Transaction::Status::RUNNING;
_state->_status = Transaction::Status::RUNNING;
}
return TRI_ERROR_NO_ERROR;
}
return _trx->beginTransaction(_hints, _nestingLevel);
return _state->beginTransaction(_hints, _nestingLevel);
}
/// @brief commit / finish the transaction
int Transaction::commit() {
if (_trx == nullptr || getStatus() != Transaction::Status::RUNNING) {
if (_state == nullptr || getStatus() != Transaction::Status::RUNNING) {
// transaction not created or not running
return TRI_ERROR_TRANSACTION_INTERNAL;
}
if (!_isReal) {
if (_nestingLevel == 0) {
_trx->_status = Transaction::Status::COMMITTED;
_state->_status = Transaction::Status::COMMITTED;
}
return TRI_ERROR_NO_ERROR;
}
return _trx->commitTransaction(this, _nestingLevel);
return _state->commitTransaction(this, _nestingLevel);
}
/// @brief abort the transaction
int Transaction::abort() {
if (_trx == nullptr || getStatus() != Transaction::Status::RUNNING) {
if (_state == nullptr || getStatus() != Transaction::Status::RUNNING) {
// transaction not created or not running
return TRI_ERROR_TRANSACTION_INTERNAL;
}
if (!_isReal) {
if (_nestingLevel == 0) {
_trx->_status = Transaction::Status::ABORTED;
_state->_status = Transaction::Status::ABORTED;
}
return TRI_ERROR_NO_ERROR;
}
return _trx->abortTransaction(this, _nestingLevel);
return _state->abortTransaction(this, _nestingLevel);
}
/// @brief finish a transaction (commit or abort), based on the previous state
@ -1264,7 +1264,7 @@ TRI_voc_cid_t Transaction::addCollectionAtRuntime(TRI_voc_cid_t cid,
auto collection = this->trxCollection(cid);
if (collection == nullptr) {
int res = _trx->addCollection(cid, type, _nestingLevel, true, _allowImplicitCollections);
int res = _state->addCollection(cid, type, _nestingLevel, true, _allowImplicitCollections);
if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION) {
@ -1273,7 +1273,7 @@ TRI_voc_cid_t Transaction::addCollectionAtRuntime(TRI_voc_cid_t cid,
}
THROW_ARANGO_EXCEPTION(res);
}
_trx->ensureCollections(_nestingLevel);
_state->ensureCollections(_nestingLevel);
collection = this->trxCollection(cid);
if (collection == nullptr) {
@ -3077,7 +3077,7 @@ std::unique_ptr<OperationCursor> Transaction::indexScan(
/// @brief return the collection
arangodb::LogicalCollection* Transaction::documentCollection(
TransactionCollection const* trxCollection) const {
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
TRI_ASSERT(trxCollection != nullptr);
TRI_ASSERT(getStatus() == Transaction::Status::RUNNING);
TRI_ASSERT(trxCollection->_collection != nullptr);
@ -3088,10 +3088,10 @@ arangodb::LogicalCollection* Transaction::documentCollection(
/// @brief return the collection
arangodb::LogicalCollection* Transaction::documentCollection(
TRI_voc_cid_t cid) const {
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
TRI_ASSERT(getStatus() == Transaction::Status::RUNNING);
auto trxCollection = _trx->collection(cid, AccessMode::Type::READ);
auto trxCollection = _state->collection(cid, AccessMode::Type::READ);
if (trxCollection == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find collection");
@ -3122,7 +3122,7 @@ int Transaction::addCollection(TRI_voc_cid_t cid, std::string const& name,
/// @brief add a collection by id
int Transaction::addCollection(TRI_voc_cid_t cid, AccessMode::Type type) {
if (_trx == nullptr) {
if (_state == nullptr) {
return registerError(TRI_ERROR_INTERNAL);
}
@ -3163,11 +3163,11 @@ int Transaction::addCollection(std::string const& name, AccessMode::Type type) {
/// @brief test if a collection is already locked
bool Transaction::isLocked(LogicalCollection* document,
AccessMode::Type type) {
if (_trx == nullptr || getStatus() != Transaction::Status::RUNNING) {
if (_state == nullptr || getStatus() != Transaction::Status::RUNNING) {
return false;
}
TransactionCollection* trxCollection = _trx->collection(document->cid(), type);
TransactionCollection* trxCollection = _state->collection(document->cid(), type);
TRI_ASSERT(trxCollection != nullptr);
return trxCollection->isLocked(type, _nestingLevel);
@ -3176,7 +3176,7 @@ bool Transaction::isLocked(LogicalCollection* document,
/// @brief read- or write-lock a collection
int Transaction::lock(TransactionCollection* trxCollection,
AccessMode::Type type) {
if (_trx == nullptr || getStatus() != Transaction::Status::RUNNING) {
if (_state == nullptr || getStatus() != Transaction::Status::RUNNING) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
@ -3186,7 +3186,7 @@ int Transaction::lock(TransactionCollection* trxCollection,
/// @brief read- or write-unlock a collection
int Transaction::unlock(TransactionCollection* trxCollection,
AccessMode::Type type) {
if (_trx == nullptr || getStatus() != Transaction::Status::RUNNING) {
if (_state == nullptr || getStatus() != Transaction::Status::RUNNING) {
return TRI_ERROR_TRANSACTION_INTERNAL;
}
@ -3300,9 +3300,9 @@ Transaction::IndexHandle Transaction::getIndexByIdentifier(
/// @brief add a collection to an embedded transaction
int Transaction::addCollectionEmbedded(TRI_voc_cid_t cid, AccessMode::Type type) {
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
int res = _trx->addCollection(cid, type, _nestingLevel, false, _allowImplicitCollections);
int res = _state->addCollection(cid, type, _nestingLevel, false, _allowImplicitCollections);
if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION) {
@ -3317,7 +3317,7 @@ int Transaction::addCollectionEmbedded(TRI_voc_cid_t cid, AccessMode::Type type)
/// @brief add a collection to a top-level transaction
int Transaction::addCollectionToplevel(TRI_voc_cid_t cid, AccessMode::Type type) {
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
int res;
@ -3325,7 +3325,7 @@ int Transaction::addCollectionToplevel(TRI_voc_cid_t cid, AccessMode::Type type)
// transaction already started?
res = TRI_ERROR_TRANSACTION_INTERNAL;
} else {
res = _trx->addCollection(cid, type, _nestingLevel, false, _allowImplicitCollections);
res = _state->addCollection(cid, type, _nestingLevel, false, _allowImplicitCollections);
}
if (res != TRI_ERROR_NO_ERROR) {
@ -3344,12 +3344,12 @@ int Transaction::addCollectionToplevel(TRI_voc_cid_t cid, AccessMode::Type type)
/// transaction. if not, it will create a transaction of its own
int Transaction::setupTransaction() {
// check in the context if we are running embedded
_trx = this->_transactionContext->getParentTransaction();
_state = this->_transactionContext->getParentTransaction();
if (_trx != nullptr) {
if (_state != nullptr) {
// yes, we are embedded
_setupState = setupEmbedded();
_allowImplicitCollections = _trx->_allowImplicit;
_allowImplicitCollections = _state->_allowImplicit;
} else {
// non-embedded
_setupState = setupToplevel();
@ -3363,7 +3363,7 @@ int Transaction::setupTransaction() {
int Transaction::setupEmbedded() {
TRI_ASSERT(_nestingLevel == 0);
_nestingLevel = ++_trx->_nestingLevel;
_nestingLevel = ++_state->_nestingLevel;
if (!this->_transactionContext->isEmbeddable()) {
// we are embedded but this is disallowed...
@ -3379,27 +3379,27 @@ int Transaction::setupToplevel() {
// we are not embedded. now start our own transaction
try {
_trx = new TransactionState(_vocbase, _timeout, _waitForSync);
_state = new TransactionState(_vocbase, _timeout, _waitForSync);
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_ASSERT(_trx != nullptr);
TRI_ASSERT(_state != nullptr);
// register the transaction in the context
return this->_transactionContext->registerTransaction(_trx);
return this->_transactionContext->registerTransaction(_state);
}
/// @brief free transaction
void Transaction::freeTransaction() {
TRI_ASSERT(!isEmbeddedTransaction());
if (_trx != nullptr) {
if (_state != nullptr) {
TRI_ASSERT(getStatus() != Transaction::Status::RUNNING);
auto id = _trx->_id;
bool hasFailedOperations = _trx->hasFailedOperations();
delete _trx;
_trx = nullptr;
auto id = _state->_id;
bool hasFailedOperations = _state->hasFailedOperations();
delete _state;
_state = nullptr;
// store result
_transactionContext->storeTransactionResult(id, hasFailedOperations);

View File

@ -185,7 +185,7 @@ class Transaction {
inline TRI_vocbase_t* vocbase() const { return _vocbase; }
/// @brief return internals of transaction
inline TransactionState* getInternals() const { return _trx; }
inline TransactionState* state() const { return _state; }
/// @brief return role of server in cluster
inline ServerState::RoleEnum serverRole() const { return _serverRole; }
@ -716,8 +716,8 @@ class Transaction {
bool _isReal;
protected:
/// @brief the C transaction struct
TransactionState* _trx;
/// @brief the state
TransactionState* _state;
/// @brief the vocbase
TRI_vocbase_t* const _vocbase;

View File

@ -2060,7 +2060,7 @@ int LogicalCollection::insert(Transaction* trx, VPackSlice const slice,
// create marker
MMFilesCrudMarker insertMarker(
TRI_DF_MARKER_VPACK_DOCUMENT,
TRI_MarkerIdTransaction(trx->getInternals()), newSlice);
TRI_MarkerIdTransaction(trx->state()), newSlice);
MMFilesWalMarker const* marker;
if (options.recoveryMarker == nullptr) {
@ -2257,7 +2257,7 @@ int LogicalCollection::update(Transaction* trx, VPackSlice const newSlice,
// create marker
MMFilesCrudMarker updateMarker(
TRI_DF_MARKER_VPACK_DOCUMENT,
TRI_MarkerIdTransaction(trx->getInternals()), builder->slice());
TRI_MarkerIdTransaction(trx->state()), builder->slice());
MMFilesWalMarker const* marker;
if (options.recoveryMarker == nullptr) {
@ -2420,7 +2420,7 @@ int LogicalCollection::replace(Transaction* trx, VPackSlice const newSlice,
// create marker
MMFilesCrudMarker replaceMarker(
TRI_DF_MARKER_VPACK_DOCUMENT,
TRI_MarkerIdTransaction(trx->getInternals()), builder->slice());
TRI_MarkerIdTransaction(trx->state()), builder->slice());
MMFilesWalMarker const* marker;
if (options.recoveryMarker == nullptr) {
@ -2518,7 +2518,7 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
// create marker
MMFilesCrudMarker removeMarker(
TRI_DF_MARKER_VPACK_REMOVE, TRI_MarkerIdTransaction(trx->getInternals()),
TRI_DF_MARKER_VPACK_REMOVE, TRI_MarkerIdTransaction(trx->state()),
builder->slice());
MMFilesWalMarker const* marker;
@ -2604,7 +2604,7 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
res = trx->getInternals()->addOperation(revisionId, operation, marker, options.waitForSync);
res = trx->state()->addOperation(revisionId, operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
@ -2649,7 +2649,7 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
// create marker
MMFilesCrudMarker removeMarker(
TRI_DF_MARKER_VPACK_REMOVE, TRI_MarkerIdTransaction(trx->getInternals()),
TRI_DF_MARKER_VPACK_REMOVE, TRI_MarkerIdTransaction(trx->state()),
builder->slice());
MMFilesWalMarker const* marker = &removeMarker;
@ -2705,7 +2705,7 @@ int LogicalCollection::remove(arangodb::Transaction* trx,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
res = trx->getInternals()->addOperation(revisionId, operation, marker, options.waitForSync);
res = trx->state()->addOperation(revisionId, operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
@ -3323,7 +3323,7 @@ int LogicalCollection::updateDocument(
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
return trx->getInternals()->addOperation(newRevisionId, operation, marker, waitForSync);
return trx->state()->addOperation(newRevisionId, operation, marker, waitForSync);
}
/// @brief insert a document, low level worker
@ -3357,7 +3357,7 @@ int LogicalCollection::insertDocument(
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
return trx->getInternals()->addOperation(revisionId, operation, marker, waitForSync);
return trx->state()->addOperation(revisionId, operation, marker, waitForSync);
}
/// @brief creates a new entry in the primary index