1
0
Fork 0

issue 380.4: allow transaction status change and data-source registration callbacks to be registered with a transaction (#5624)

This commit is contained in:
Vasiliy 2018-06-18 14:16:32 +03:00 committed by Jan
parent af7299691d
commit b7f7711d30
10 changed files with 290 additions and 82 deletions

View File

@ -247,9 +247,9 @@ void registerViewFactory() {
}
template<typename Impl>
arangodb::Result transactionStateRegistrationCallback(
arangodb::Result transactionDataSourceRegistrationCallback(
arangodb::LogicalDataSource& dataSource,
arangodb::TransactionState& state
arangodb::transaction::Methods& trx
) {
if (arangodb::iresearch::DATA_SOURCE_TYPE != dataSource.type()) {
return arangodb::Result(); // not an IResearchView (noop)
@ -264,7 +264,7 @@ arangodb::Result transactionStateRegistrationCallback(
if (!view) {
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "failure to get LogicalView while processing a TransactionState by IResearchFeature for tid '" << state.id() << "' name '" << dataSource.name() << "'";
<< "failure to get LogicalView while processing a TransactionState by IResearchFeature for name '" << dataSource.name() << "'";
return arangodb::Result(TRI_ERROR_INTERNAL);
}
@ -278,26 +278,26 @@ arangodb::Result transactionStateRegistrationCallback(
if (!impl) {
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "failure to get IResearchView while processing a TransactionState by IResearchFeature for tid '" << state.id() << "' cid '" << dataSource.name() << "'";
<< "failure to get IResearchView while processing a TransactionState by IResearchFeature for cid '" << dataSource.name() << "'";
return arangodb::Result(TRI_ERROR_INTERNAL);
}
impl->apply(state);
return arangodb::Result();
return arangodb::Result(
impl->apply(trx) ? TRI_ERROR_NO_ERROR : TRI_ERROR_INTERNAL
);
}
void registerTransactionStateCallback() {
void registerTransactionDataSourceRegistrationCallback() {
if (arangodb::ServerState::instance()->isCoordinator()) {
// NOOP
} else if(arangodb::ServerState::instance()->isDBServer()) {
arangodb::transaction::Methods::addStateRegistrationCallback(
transactionStateRegistrationCallback<arangodb::iresearch::IResearchViewDBServer>
arangodb::transaction::Methods::addDataSourceRegistrationCallback(
transactionDataSourceRegistrationCallback<arangodb::iresearch::IResearchViewDBServer>
);
} else {
arangodb::transaction::Methods::addStateRegistrationCallback(
transactionStateRegistrationCallback<arangodb::iresearch::IResearchView>
arangodb::transaction::Methods::addDataSourceRegistrationCallback(
transactionDataSourceRegistrationCallback<arangodb::iresearch::IResearchView>
);
}
}
@ -361,8 +361,8 @@ void IResearchFeature::prepare() {
// register 'arangosearch' view
registerViewFactory();
// register 'arangosearch' TransactionState state-change callback factory
registerTransactionStateCallback();
// register 'arangosearch' Transaction DataSource registration callback
registerTransactionDataSourceRegistrationCallback();
registerRecoveryHelper();
}

View File

@ -969,8 +969,16 @@ IResearchView::MemoryStore& IResearchView::activeMemoryStore() const {
return _memoryNode->_store;
}
void IResearchView::apply(arangodb::TransactionState& state) {
state.addStatusChangeCallback(_trxReadCallback);
bool IResearchView::apply(arangodb::transaction::Methods& trx) {
auto* state = trx.state();
if (!state) {
return false;
}
state->addStatusChangeCallback(_trxReadCallback);
return true;
}
int IResearchView::drop(TRI_voc_cid_t cid) {
@ -2038,4 +2046,4 @@ NS_END // arangodb
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -130,9 +130,10 @@ class IResearchView final: public arangodb::DBServerLogicalView,
using arangodb::LogicalView::name;
///////////////////////////////////////////////////////////////////////////////
/// @brief apply any changes to 'state' required by this view
/// @brief apply any changes to 'trx' required by this view
/// @return success
///////////////////////////////////////////////////////////////////////////////
void apply(arangodb::TransactionState& state);
bool apply(arangodb::transaction::Methods& trx);
////////////////////////////////////////////////////////////////////////////////
/// @brief persist the specified WAL file into permanent storage

View File

@ -33,6 +33,7 @@
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/ViewTypesFeature.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
#include "VocBase/vocbase.h"
NS_LOCAL
@ -306,8 +307,16 @@ arangodb::Result IResearchViewDBServer::appendVelocyPack(
return arangodb::Result();
}
void IResearchViewDBServer::apply(arangodb::TransactionState& state) {
state.addStatusChangeCallback(_trxReadCallback);
bool IResearchViewDBServer::apply(arangodb::transaction::Methods& trx) {
auto* state = trx.state();
if (!state) {
return false;
}
state->addStatusChangeCallback(_trxReadCallback);
return true;
}
arangodb::Result IResearchViewDBServer::drop() {

View File

@ -30,15 +30,21 @@
#include "velocypack/Builder.h"
#include "VocBase/LogicalView.h"
NS_BEGIN(arangodb)
namespace arangodb {
class DatabasePathFeature; // forward declaration
class TransactionState; // forward declaration
NS_END // arangodb
namespace transaction {
NS_BEGIN(arangodb)
NS_BEGIN(iresearch)
class Methods; // forward declaration
} // transaction
} // arangodb
namespace arangodb {
namespace iresearch {
class PrimaryKeyIndexReader; // forward declaration
@ -48,8 +54,9 @@ class IResearchViewDBServer final: public arangodb::LogicalView {
///////////////////////////////////////////////////////////////////////////////
/// @brief apply any changes to 'state' required by this view
/// @return success
///////////////////////////////////////////////////////////////////////////////
void apply(arangodb::TransactionState& state);
bool apply(arangodb::transaction::Methods& trx);
virtual arangodb::Result drop() override;
@ -126,7 +133,7 @@ class IResearchViewDBServer final: public arangodb::LogicalView {
);
};
NS_END // iresearch
NS_END // arangodb
} // iresearch
} // arangodb
#endif

View File

@ -74,22 +74,53 @@ using namespace arangodb::transaction::helpers;
namespace {
// wrap vector inside a static function to ensure proper initialization order
std::vector<arangodb::transaction::Methods::StateRegistrationCallback>& getStateRegistrationCallbacks() {
static std::vector<arangodb::transaction::Methods::StateRegistrationCallback> callbacks;
std::vector<arangodb::transaction::Methods::DataSourceRegistrationCallback>& getDataSourceRegistrationCallbacks() {
static std::vector<arangodb::transaction::Methods::DataSourceRegistrationCallback> callbacks;
return callbacks;
}
/// @return the status change callbacks stored in state
/// or nullptr if none and !create
std::vector<arangodb::transaction::Methods::StatusChangeCallback>* getStatusChangeCallbacks(
arangodb::TransactionState& state,
bool create = false
) {
struct CookieType: public arangodb::TransactionState::Cookie {
std::vector<arangodb::transaction::Methods::StatusChangeCallback> _callbacks;
};
static const int key = 0; // arbitrary location in memory, common for all
// TODO FIXME find a better way to look up a ViewState
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto* cookie = dynamic_cast<CookieType*>(state.cookie(&key));
#else
auto* cookie = static_cast<CookieType*>(state.cookie(&key));
#endif
if (!cookie && create) {
auto ptr = std::make_unique<CookieType>();
cookie = ptr.get();
state.cookie(&key, std::move(ptr));
}
return cookie ? &(cookie->_callbacks) : nullptr;
}
/// @brief notify callbacks of association of 'cid' with this TransactionState
/// @note done separately from addCollection() to avoid creating a
/// TransactionCollection instance for virtual entities, e.g. View
arangodb::Result applyStateRegistrationCallbacks(
arangodb::Result applyDataSourceRegistrationCallbacks(
LogicalDataSource& dataSource,
arangodb::TransactionState& state
arangodb::transaction::Methods& trx
) {
for (auto& callback: getStateRegistrationCallbacks()) {
for (auto& callback: getDataSourceRegistrationCallbacks()) {
TRI_ASSERT(callback); // addDataSourceRegistrationCallback(...) ensures valid
try {
auto res = callback(dataSource, state);
auto res = callback(dataSource, trx);
if (!res.ok()) {
return res;
@ -102,6 +133,50 @@ arangodb::Result applyStateRegistrationCallbacks(
return arangodb::Result();
}
/// @brief notify callbacks of association of 'cid' with this TransactionState
/// @note done separately from addCollection() to avoid creating a
/// TransactionCollection instance for virtual entities, e.g. View
void applyStatusChangeCallbacks(
arangodb::transaction::Methods& trx,
arangodb::transaction::Status status
) noexcept {
TRI_ASSERT(
arangodb::transaction::Status::ABORTED == status
|| arangodb::transaction::Status::COMMITTED == status
|| arangodb::transaction::Status::RUNNING == status
);
TRI_ASSERT(
!trx.state() // for embeded transactions status is not always updated
|| (trx.state()->isTopLevelTransaction() && trx.state()->status() == status)
|| (!trx.state()->isTopLevelTransaction()
&& arangodb::transaction::Status::RUNNING == trx.state()->status()
)
);
auto* state = trx.state();
if (!state) {
return; // nothing to apply
}
auto* callbacks = getStatusChangeCallbacks(*state);
if (!callbacks) {
return; // no callbacks to apply
}
// no need to lock since transactions are single-threaded
for (auto& callback: *callbacks) {
TRI_ASSERT(callback); // addStatusChangeCallback(...) ensures valid
try {
callback(trx, status);
} catch (...) {
// we must not propagate exceptions from here
}
}
}
static void throwCollectionNotFound(char const* name) {
if (name == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
@ -162,14 +237,37 @@ static OperationResult emptyResult(OperationOptions const& options) {
}
} // namespace
/*static*/ void transaction::Methods::addStateRegistrationCallback(
StateRegistrationCallback callback
/*static*/ void transaction::Methods::addDataSourceRegistrationCallback(
DataSourceRegistrationCallback const& callback
) {
getStateRegistrationCallbacks().emplace_back(callback);
if (callback) {
getDataSourceRegistrationCallbacks().emplace_back(callback);
}
}
/*static*/ void transaction::Methods::clearStateRegistrationCallbacks() {
getStateRegistrationCallbacks().clear();
bool transaction::Methods::addStatusChangeCallback(
StatusChangeCallback const& callback
) {
if (!callback) {
return true; // nothing to call back
}
if (!_state) {
return false; // nothing to add to
}
auto* statusChangeCallbacks = getStatusChangeCallbacks(*_state, true);
TRI_ASSERT(nullptr != statusChangeCallbacks); // 'create' was specified
// no need to lock since transactions are single-threaded
statusChangeCallbacks->emplace_back(callback);
return true;
}
/*static*/ void transaction::Methods::clearDataSourceRegistrationCallbacks() {
getDataSourceRegistrationCallbacks().clear();
}
/// @brief Get the field names of the used index
@ -799,10 +897,17 @@ Result transaction::Methods::begin() {
if (_state->isTopLevelTransaction()) {
_state->updateStatus(transaction::Status::RUNNING);
}
return Result();
} else {
auto res = _state->beginTransaction(_localHints);
if (!res.ok()) {
return res;
}
}
return _state->beginTransaction(_localHints);
applyStatusChangeCallbacks(*this, Status::RUNNING);
return Result();
}
/// @brief commit / finish the transaction
@ -828,10 +933,17 @@ Result transaction::Methods::commit() {
if (_state->isTopLevelTransaction()) {
_state->updateStatus(transaction::Status::COMMITTED);
}
return Result();
} else {
auto res = _state->commitTransaction(this);
if (!res.ok()) {
return res;
}
}
return _state->commitTransaction(this);
applyStatusChangeCallbacks(*this, Status::COMMITTED);
return Result();
}
/// @brief abort the transaction
@ -845,11 +957,17 @@ Result transaction::Methods::abort() {
if (_state->isTopLevelTransaction()) {
_state->updateStatus(transaction::Status::ABORTED);
}
} else {
auto res = _state->abortTransaction(this);
return Result();
if (!res.ok()) {
return res;
}
}
return _state->abortTransaction(this);
applyStatusChangeCallbacks(*this, Status::ABORTED);
return Result();
}
/// @brief finish a transaction (commit or abort), based on the previous state
@ -959,11 +1077,12 @@ TRI_voc_cid_t transaction::Methods::addCollectionAtRuntime(
}
auto dataSource = resolver()->getDataSource(cid);
if (!dataSource) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
auto result = applyStateRegistrationCallbacks(*dataSource, *_state);
auto result = applyDataSourceRegistrationCallbacks(*dataSource, *this);
if (!result.ok()) {
THROW_ARANGO_EXCEPTION(result.errorNumber());
@ -3003,12 +3122,12 @@ Result transaction::Methods::addCollection(TRI_voc_cid_t cid, std::string const&
throwCollectionNotFound(cname.c_str());
}
auto addCollection = [this, &cname, type](TRI_voc_cid_t cid)->int {
auto addCollection = [this, &cname, type](TRI_voc_cid_t cid)->void {
auto res =
_state->addCollection(cid, cname, type, _state->nestingLevel(), false);
if (TRI_ERROR_NO_ERROR == res) {
return res;
return;
}
if (TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION == res) {
@ -3031,12 +3150,9 @@ Result transaction::Methods::addCollection(TRI_voc_cid_t cid, std::string const&
bool visited = false;
std::function<bool(LogicalCollection&)> visitor(
[this, &addCollection, &res, cid, &visited](LogicalCollection& col)->bool {
res = addCollection(col.id());
if (res.ok()) {
res = applyStateRegistrationCallbacks(col, *_state);
visited |= cid == col.id();
}
addCollection(col.id()); // will throw on error
res = applyDataSourceRegistrationCallbacks(col, *this);
visited |= cid == col.id();
return res.ok(); // add the remaining collections (or break on error)
}
@ -3045,7 +3161,7 @@ Result transaction::Methods::addCollection(TRI_voc_cid_t cid, std::string const&
if (!resolver()->visitCollections(visitor, cid) || !res.ok()) {
// trigger exception as per the original behaviour (tests depend on this)
if (res.ok() && !visited) {
res = addCollection(cid);
addCollection(cid); // will throw on error
}
return res.ok() ? Result(TRI_ERROR_INTERNAL) : res; // return first error
@ -3059,7 +3175,7 @@ Result transaction::Methods::addCollection(TRI_voc_cid_t cid, std::string const&
auto dataSource = resolver()->getDataSource(cid);
return dataSource
? applyStateRegistrationCallbacks(*dataSource, *_state)
? applyDataSourceRegistrationCallbacks(*dataSource, *this)
: Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)
;
}

View File

@ -143,7 +143,7 @@ class Methods {
transaction::Options const& options = transaction::Options());
public:
/// @brief create the transaction, used to be UserTransaction
Methods(std::shared_ptr<transaction::Context> const& ctx,
std::vector<std::string> const& readCollections,
@ -154,18 +154,29 @@ class Methods {
/// @brief destroy the transaction
virtual ~Methods();
typedef Result(*StateRegistrationCallback)(LogicalDataSource& dataSource, TransactionState& state);
typedef Result(*DataSourceRegistrationCallback)(LogicalDataSource& dataSource, Methods& trx);
/// @brief add a callback to be called for state instance association events
/// e.g. addCollection(...)
/// @brief definition from TransactionState::StatusChangeCallback
/// @param status the new status of the transaction
/// will match trx.state()->status() for top-level transactions
/// may not match trx.state()->status() for embeded transactions
/// since their staus is not updated from RUNNING
typedef std::function<void(transaction::Methods& trx, transaction::Status& status)> StatusChangeCallback;
/// @brief add a callback to be called for LogicalDataSource instance
/// association events, e.g. addCollection(...)
/// @note not thread-safe on the assumption of static factory registration
static void addStateRegistrationCallback(StateRegistrationCallback callback);
static void addDataSourceRegistrationCallback(DataSourceRegistrationCallback const& callback);
/// @brief clear all called for state instance association events
/// @brief add a callback to be called for state change events
/// @return success
bool addStatusChangeCallback(StatusChangeCallback const& callback);
/// @brief clear all called for LogicalDataSource instance association events
/// @note not thread-safe on the assumption of static factory registration
/// @note FOR USE IN TESTS ONLY to reset test state
/// FIXME TODO StateRegistrationCallback logic should be moved into its own feature
static void clearStateRegistrationCallbacks();
static void clearDataSourceRegistrationCallbacks();
/// @brief default batch size for index and other operations
static constexpr uint64_t defaultBatchSize() { return 1000; }

View File

@ -2496,8 +2496,15 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == false, waitForSync = false)
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
viewImpl->apply(*state);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
viewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
auto* snapshot = viewImpl->snapshot(*state);
CHECK((nullptr != snapshot));
@ -2507,8 +2514,15 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == true, waitForSync = false)
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
viewImpl->apply(*state);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
viewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
auto* snapshot = viewImpl->snapshot(*state, true);
CHECK((nullptr != snapshot));
@ -2518,8 +2532,15 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == true, waitForSync = false during updateStatus(), true during snapshot())
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
viewImpl->apply(*state);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
viewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
state->waitForSync(true);
auto* snapshot = viewImpl->snapshot(*state, true);
@ -2530,9 +2551,16 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == true, waitForSync = true during updateStatus(), false during snapshot())
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
state->waitForSync(true);
viewImpl->apply(*state);
viewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
state->waitForSync(false);
auto* snapshot = viewImpl->snapshot(*state, true);
@ -3497,4 +3525,4 @@ SECTION("test_update_partial") {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -825,8 +825,15 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == false, waitForSync = false)
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
wiewImpl->apply(*state);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
wiewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
auto* snapshot = wiewImpl->snapshot(*state);
CHECK((nullptr != snapshot));
@ -836,8 +843,15 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == true, waitForSync = false)
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
wiewImpl->apply(*state);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
wiewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
auto* snapshot = wiewImpl->snapshot(*state, true);
CHECK((nullptr != snapshot));
@ -847,8 +861,15 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == true, waitForSync = false during updateStatus(), true during snapshot())
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
wiewImpl->apply(*state);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
wiewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
state->waitForSync(true);
auto* snapshot = wiewImpl->snapshot(*state, true);
@ -859,9 +880,16 @@ SECTION("test_transaction_snapshot") {
// old snapshot in TransactionState (force == true, waitForSync = true during updateStatus(), false during snapshot())
{
auto state = s.engine.createTransactionState(vocbase, arangodb::transaction::Options());
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
auto* state = trx.state();
state->waitForSync(true);
wiewImpl->apply(*state);
wiewImpl->apply(trx);
state->updateStatus(arangodb::transaction::Status::RUNNING);
state->waitForSync(false);
auto* snapshot = wiewImpl->snapshot(*state, true);
@ -1176,4 +1204,4 @@ SECTION("test_visitCollections") {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -48,7 +48,7 @@ NS_BEGIN(arangodb)
NS_BEGIN(tests)
void init(bool withICU /*= false*/) {
arangodb::transaction::Methods::clearStateRegistrationCallbacks();
arangodb::transaction::Methods::clearDataSourceRegistrationCallbacks();
}
bool assertRules(