1
0
Fork 0

added savepoints

This commit is contained in:
jsteemann 2017-03-28 17:55:25 +02:00
parent 6a39e962bd
commit a221d30b8f
5 changed files with 79 additions and 46 deletions

View File

@ -54,12 +54,20 @@
using namespace arangodb;
using namespace arangodb::rocksutils;
namespace {
static std::string const Empty;
rocksdb::TransactionDB* db() {
static rocksdb::TransactionDB* db() {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
return static_cast<RocksDBEngine*>(engine)->db();
}
static inline rocksdb::Transaction* rocksTransaction(arangodb::transaction::Methods* trx) {
return static_cast<RocksDBTransactionState*>(trx->state())->rocksTransaction();
}
}
RocksDBCollection::RocksDBCollection(LogicalCollection* collection,
VPackSlice const& info)
@ -661,8 +669,10 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
RocksDBKey key(RocksDBKey::Document(_objectId, revisionId));
RocksDBValue value(RocksDBValue::Document(doc));
rocksdb::WriteBatch writeBatch;
writeBatch.Put(key.string(), value.value());
rocksdb::Transaction* rtrx = rocksTransaction(trx);
RocksDBSavePoint guard(rtrx);
rtrx->Put(key.string(), value.value());
auto indexes = _indexes;
size_t const n = indexes.size();
@ -688,24 +698,13 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
}
if (result != TRI_ERROR_NO_ERROR) {
rocksdb::WriteOptions writeOptions;
if (_logicalCollection->waitForSync()) {
waitForSync = true;
}
if (waitForSync) {
trx->state()->waitForSync(true);
// handle waitForSync for single operations here
if (trx->state()->isSingleOperation()) {
writeOptions.sync = true;
}
}
StorageEngine* engine = EngineSelectorFeature::ENGINE;
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db();
db->Write(writeOptions, &writeBatch);
}
return result;
@ -720,8 +719,11 @@ int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx,
auto key = RocksDBKey::Document(_objectId, revisionId);
rocksdb::WriteBatch writeBatch;
writeBatch.Delete(key.string());
rocksdb::Transaction* rtrx = rocksTransaction(trx);
RocksDBSavePoint guard(rtrx);
rtrx->Delete(key.string());
auto indexes = _indexes;
size_t const n = indexes.size();
@ -740,24 +742,13 @@ int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx,
}
if (result != TRI_ERROR_NO_ERROR) {
rocksdb::WriteOptions writeOptions;
if (_logicalCollection->waitForSync()) {
waitForSync = true;
}
if (waitForSync) {
trx->state()->waitForSync(true);
// handle waitForSync for single operations here
if (trx->state()->isSingleOperation()) {
writeOptions.sync = true;
}
}
StorageEngine* engine = EngineSelectorFeature::ENGINE;
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db();
db->Write(writeOptions, &writeBatch);
}
return result;
@ -799,11 +790,16 @@ Result RocksDBCollection::lookupDocumentToken(transaction::Methods* trx,
return outToken.revisionId() > 0 ? Result() : Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
void RocksDBCollection::lookupRevisionVPack(TRI_voc_rid_t revisionId, transaction::Methods* trx,arangodb::ManagedDocumentResult& result){
auto key = RocksDBKey::Document(_objectId,revisionId);
void RocksDBCollection::lookupRevisionVPack(TRI_voc_rid_t revisionId, transaction::Methods* trx,arangodb::ManagedDocumentResult& result) {
LOG_TOPIC(ERR, Logger::FIXME) << "LOOKING UP DOCUMENT: " << _objectId << ", REV: " << revisionId;
auto key = RocksDBKey::Document(_objectId, revisionId);
std::string value;
TRI_ASSERT(value.data());
auto* state = toRocksTransactionState(trx);
state->rocksTransaction()->Get(state->readOptions(), key.string(), &value);
result.setManaged(std::move(value), revisionId);
rocksdb::Status status = state->rocksTransaction()->Get(state->readOptions(), key.string(), &value);
if (status.ok()) {
LOG_TOPIC(ERR, Logger::FIXME) << "FOUND";
result.setManaged(std::move(value), revisionId);
}
}

View File

@ -88,8 +88,8 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
}
// aquire rocksdb transaction
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(_trx);
rocksdb::Transaction *rtrx = state->rocksTransaction();
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();
auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection);
while (limit > 0) {
@ -254,8 +254,8 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
primaryKey.copyString());
// aquire rocksdb transaction
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction *rtrx = state->rocksTransaction();
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();
rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string()));
if (status.ok()) {
@ -274,7 +274,7 @@ void RocksDBEdgeIndex::batchInsert(
// aquire rocksdb transaction
RocksDBTransactionState *state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction *rtrx = state->rocksTransaction();
rocksdb::Transaction* rtrx = state->rocksTransaction();
for (std::pair<TRI_voc_rid_t, VPackSlice> const& doc : documents) {
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);

View File

@ -44,8 +44,25 @@
using namespace arangodb;
struct RocksDBTransactionData final : public TransactionData {
};
struct RocksDBTransactionData final : public TransactionData {};
RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx)
: _trx(trx), _committed(false) {}
RocksDBSavePoint::~RocksDBSavePoint() {
if (!_committed) {
rollback();
}
}
void RocksDBSavePoint::commit() {
_committed = true; // this will prevent the rollback
}
void RocksDBSavePoint::rollback() {
_trx->RollbackToSavePoint();
_committed = true; // in order to not roll back again by accident
}
/// @brief transaction type
RocksDBTransactionState::RocksDBTransactionState(TRI_vocbase_t* vocbase)
@ -74,7 +91,7 @@ int RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db();
_rocksTransaction.reset(db->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
_rocksTransaction.reset(db->BeginTransaction(_rocksWriteOptions, rocksdb::TransactionOptions()));
} else {
TRI_ASSERT(_status == transaction::Status::RUNNING);
}
@ -109,6 +126,11 @@ int RocksDBTransactionState::commitTransaction(transaction::Methods* activeTrx)
if (_nestingLevel == 0) {
if (_rocksTransaction != nullptr) {
// set wait for sync flag if required
if (waitForSync()) {
_rocksWriteOptions.sync = true;
}
auto status = _rocksTransaction->Commit();
if (!status.ok()) {

View File

@ -24,7 +24,6 @@
#ifndef ARANGOD_ROCKSDB_ROCKSDB_TRANSACTION_STATE_H
#define ARANGOD_ROCKSDB_ROCKSDB_TRANSACTION_STATE_H 1
#include "Basics/Common.h"
#include "Basics/SmallVector.h"
#include "StorageEngine/TransactionState.h"
@ -32,10 +31,12 @@
#include "Transaction/Methods.h"
#include "VocBase/AccessMode.h"
#include "VocBase/voc-types.h"
#include "rocksdb/status.h"
#include "rocksdb/options.h"
struct TRI_vocbase_t;
#include <rocksdb/status.h>
#include <rocksdb/options.h>
struct TRI_vocbase_t;
namespace rocksdb {
class Transaction;
}
@ -48,6 +49,19 @@ namespace transaction {
class Methods;
}
class TransactionCollection;
class RocksDBSavePoint {
public:
explicit RocksDBSavePoint(rocksdb::Transaction* trx);
~RocksDBSavePoint();
void commit();
void rollback();
private:
rocksdb::Transaction* _trx;
bool _committed;
};
/// @brief transaction type
class RocksDBTransactionState final : public TransactionState {
@ -81,6 +95,7 @@ class RocksDBTransactionState final : public TransactionState {
private:
std::unique_ptr<rocksdb::Transaction> _rocksTransaction;
rocksdb::WriteOptions _rocksWriteOptions;
rocksdb::ReadOptions _rocksReadOptions;
bool _hasOperations;
};

View File

@ -24,11 +24,11 @@
#ifndef ARANGOD_VOC_BASE_MANAGED_DOCUMENT_RESULT_H
#define ARANGOD_VOC_BASE_MANAGED_DOCUMENT_RESULT_H 1
#include "velocypack/Slice.h"
#include "velocypack/Buffer.h"
#include "velocypack/velocypack-aliases.h"
#include "Basics/Common.h"
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
class ManagedDocumentResult {