mirror of https://gitee.com/bigwinds/arangodb
Bug fix 3.5/fix transaction errors (#9928)
* don't make index selectivity estimate requests fail inside cluster transactions * improve logging, and don't make transactions more robust don't make transaction operations fail immediately if they cannot acquire the transaction lock instantly. it is somewhat expected that sometimes the garbage collection comes in-between and acquires locks for a short amount of time - this should not lead to failures in other operations on the transaction * fix a race for transaction leases * fix test failure * moved functionality into shutdownExecute
This commit is contained in:
parent
916cb42b16
commit
068d422d59
|
@ -107,17 +107,21 @@ RestStatus RestCursorHandler::continueExecute() {
|
|||
|
||||
void RestCursorHandler::shutdownExecute(bool isFinalized) noexcept {
|
||||
TRI_DEFER(RestVocbaseBaseHandler::shutdownExecute(isFinalized));
|
||||
auto const type = _request->requestType();
|
||||
|
||||
// request not done yet
|
||||
if (_state == HandlerState::PAUSED) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// only trace create cursor requests
|
||||
if (type != rest::RequestType::POST) {
|
||||
if (_request->requestType() != rest::RequestType::POST) {
|
||||
return;
|
||||
}
|
||||
|
||||
// destroy the query context.
|
||||
// this is needed because the context is managing resources (e.g. leases
|
||||
// for a managed transaction) that we want to free as early as possible
|
||||
_queryResult.context.reset();
|
||||
|
||||
if (!_isValidForFinalize || _auditLogged) {
|
||||
// set by RestCursorHandler before
|
||||
|
@ -344,6 +348,11 @@ RestStatus RestCursorHandler::handleQueryResult() {
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
}
|
||||
generateResult(rest::ResponseCode::CREATED, std::move(buffer), _queryResult.context);
|
||||
// directly after returning from here, we will free the query's context and free the
|
||||
// resources it uses (e.g. leases for a managed transaction). this way the server
|
||||
// can send back the query result to the client and the client can make follow-up
|
||||
// requests on the same transaction (e.g. trx.commit()) without the server code for
|
||||
// freeing the resources and the client code racing for who's first
|
||||
return RestStatus::DONE;
|
||||
} else {
|
||||
// result is bigger than batchSize, and a cursor will be created
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "Cluster/ClusterInfo.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
#include "Transaction/StandaloneContext.h"
|
||||
#include "Utils/Events.h"
|
||||
#include "Utils/SingleCollectionTransaction.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
|
@ -172,14 +173,30 @@ RestStatus RestIndexHandler::getSelectivityEstimates() {
|
|||
// .............................................................................
|
||||
|
||||
bool found = false;
|
||||
std::string cName = _request->value("collection", found);
|
||||
std::string const& cName = _request->value("collection", found);
|
||||
if (cName.empty()) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
// transaction protects access onto selectivity estimates
|
||||
auto trx = createTransaction(cName, AccessMode::Type::READ);
|
||||
std::unique_ptr<SingleCollectionTransaction> trx;
|
||||
|
||||
try {
|
||||
trx = createTransaction(cName, AccessMode::Type::READ);
|
||||
} catch (basics::Exception const& ex) {
|
||||
if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) {
|
||||
// this will happen if the tid of a managed transaction is passed in,
|
||||
// but the transaction hasn't yet started on the DB server. in
|
||||
// this case, we create an ad-hoc transaction on the underlying
|
||||
// collection
|
||||
trx = std::make_unique<SingleCollectionTransaction>(transaction::StandaloneContext::Create(_vocbase), cName, AccessMode::Type::READ);
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
TRI_ASSERT(trx != nullptr);
|
||||
|
||||
Result res = trx->begin();
|
||||
if (res.fail()) {
|
||||
|
|
|
@ -47,6 +47,8 @@
|
|||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
#include <thread>
|
||||
|
||||
namespace {
|
||||
bool authorized(std::string const& user) {
|
||||
auto context = arangodb::ExecContext::CURRENT;
|
||||
|
@ -476,7 +478,7 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
const size_t bucket = getBucket(tid);
|
||||
size_t const bucket = getBucket(tid);
|
||||
int i = 0;
|
||||
TransactionState* state = nullptr;
|
||||
do {
|
||||
|
@ -508,8 +510,10 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
|
|||
state = mtrx.state;
|
||||
break;
|
||||
}
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
|
||||
"transaction is already in use");
|
||||
|
||||
LOG_TOPIC("abd72", DEBUG, Logger::TRANSACTIONS) << "transaction '" << tid << "' is already in use";
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_LOCKED,
|
||||
std::string("transaction '") + std::to_string(tid) + "' is already in use");
|
||||
}
|
||||
|
||||
writeLocker.unlock(); // failure;
|
||||
|
@ -591,13 +595,34 @@ transaction::Status Manager::getManagedTrxStatus(TRI_voc_tid_t tid) const {
|
|||
return transaction::Status::ABORTED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Result Manager::statusChangeWithTimeout(TRI_voc_tid_t tid, transaction::Status status) {
|
||||
double startTime = 0.0;
|
||||
constexpr double maxWaitTime = 2.0;
|
||||
Result res;
|
||||
while (true) {
|
||||
res = updateTransaction(tid, status, false);
|
||||
if (res.ok() || !res.is(TRI_ERROR_LOCKED)) {
|
||||
break;
|
||||
}
|
||||
if (startTime <= 0.0001) { // fp tolerance
|
||||
startTime = TRI_microtime();
|
||||
} else if (TRI_microtime() - startTime > maxWaitTime) {
|
||||
// timeout
|
||||
break;
|
||||
}
|
||||
std::this_thread::yield();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
Result Manager::commitManagedTrx(TRI_voc_tid_t tid) {
|
||||
return updateTransaction(tid, transaction::Status::COMMITTED, false);
|
||||
return statusChangeWithTimeout(tid, transaction::Status::COMMITTED);
|
||||
}
|
||||
|
||||
Result Manager::abortManagedTrx(TRI_voc_tid_t tid) {
|
||||
return updateTransaction(tid, transaction::Status::ABORTED, false);
|
||||
return statusChangeWithTimeout(tid, transaction::Status::ABORTED);
|
||||
}
|
||||
|
||||
Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
|
||||
|
@ -609,7 +634,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
|
|||
<< "managed trx '" << tid << " updating to '" << status << "'";
|
||||
|
||||
Result res;
|
||||
const size_t bucket = getBucket(tid);
|
||||
size_t const bucket = getBucket(tid);
|
||||
bool wasExpired = false;
|
||||
|
||||
std::unique_ptr<TransactionState> state;
|
||||
|
@ -626,9 +651,11 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
|
|||
ManagedTrx& mtrx = it->second;
|
||||
TRY_WRITE_LOCKER(tryGuard, mtrx.rwlock);
|
||||
if (!tryGuard.isLocked()) {
|
||||
return res.reset(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
|
||||
LOG_TOPIC("dfc30", DEBUG, Logger::TRANSACTIONS) << "transaction '" << tid << "' is in use";
|
||||
return res.reset(TRI_ERROR_LOCKED,
|
||||
std::string("transaction '") + std::to_string(tid) + "' is in use");
|
||||
}
|
||||
TRI_ASSERT(tryGuard.isLocked());
|
||||
|
||||
if (mtrx.type == MetaType::StandaloneAQL) {
|
||||
return res.reset(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
|
||||
|
@ -646,7 +673,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
|
|||
}
|
||||
}
|
||||
|
||||
if (mtrx.expired()) {
|
||||
if (mtrx.expired() && status != transaction::Status::ABORTED) {
|
||||
status = transaction::Status::ABORTED;
|
||||
wasExpired = true;
|
||||
}
|
||||
|
@ -739,10 +766,9 @@ bool Manager::garbageCollect(bool abortAll) {
|
|||
auto it = _transactions[bucket]._managed.begin();
|
||||
while (it != _transactions[bucket]._managed.end()) {
|
||||
ManagedTrx& mtrx = it->second;
|
||||
|
||||
|
||||
if (mtrx.type == MetaType::Managed) {
|
||||
TRI_ASSERT(mtrx.state != nullptr);
|
||||
|
||||
if (abortAll || mtrx.expired()) {
|
||||
TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state
|
||||
|
||||
|
@ -774,12 +800,18 @@ bool Manager::garbageCollect(bool abortAll) {
|
|||
"transaction: '"
|
||||
<< tid << "'";
|
||||
Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true);
|
||||
if (res.fail()) {
|
||||
// updateTransaction can return TRI_ERROR_TRANSACTION_ABORTED when it
|
||||
// successfully aborts, so ignore this error.
|
||||
// we can also get the TRI_ERROR_LOCKED error in case we cannot
|
||||
// immediately acquire the lock on the transaction. this _can_ happen
|
||||
// infrequently, but is not an error
|
||||
if (res.fail() &&
|
||||
!res.is(TRI_ERROR_TRANSACTION_ABORTED) &&
|
||||
!res.is(TRI_ERROR_LOCKED)) {
|
||||
LOG_TOPIC("0a07f", INFO, Logger::TRANSACTIONS) << "error while aborting "
|
||||
"transaction: '"
|
||||
<< res.errorMessage() << "'";
|
||||
}
|
||||
|
||||
didWork = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -162,7 +162,10 @@ class Manager final {
|
|||
std::string const& username, bool fanout) const;
|
||||
|
||||
private:
|
||||
// hashes the transaction id into a bucket
|
||||
/// @brief performs a status change on a transaction using a timeout
|
||||
Result statusChangeWithTimeout(TRI_voc_tid_t tid, transaction::Status status);
|
||||
|
||||
/// @brief hashes the transaction id into a bucket
|
||||
inline size_t getBucket(TRI_voc_tid_t tid) const {
|
||||
return std::hash<TRI_voc_cid_t>()(tid) % numBuckets;
|
||||
}
|
||||
|
|
|
@ -260,7 +260,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit_while_in_use) {
|
|||
OperationOptions opts;
|
||||
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
|
||||
ASSERT_TRUE(opRes.ok());
|
||||
ASSERT_TRUE(mgr->commitManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION));
|
||||
ASSERT_EQ(TRI_ERROR_LOCKED, mgr->commitManagedTrx(tid).errorNumber());
|
||||
ASSERT_TRUE(trx.finish(opRes.result).ok());
|
||||
}
|
||||
ASSERT_TRUE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));
|
||||
|
|
Loading…
Reference in New Issue