1
0
Fork 0

intermediate commits

This commit is contained in:
Simon Grätzer 2017-05-15 17:35:16 +02:00
parent c4f19e5f91
commit 70a1d27121
12 changed files with 428 additions and 269 deletions

View File

@ -21,6 +21,7 @@ set(ROCKSDB_SOURCES
RocksDBEngine/RocksDBKey.cpp RocksDBEngine/RocksDBKey.cpp
RocksDBEngine/RocksDBKeyBounds.cpp RocksDBEngine/RocksDBKeyBounds.cpp
RocksDBEngine/RocksDBLogValue.cpp RocksDBEngine/RocksDBLogValue.cpp
RocksDBEngine/RocksDBMethods.cpp
RocksDBEngine/RocksDBPrefixExtractor.cpp RocksDBEngine/RocksDBPrefixExtractor.cpp
RocksDBEngine/RocksDBPrimaryIndex.cpp RocksDBEngine/RocksDBPrimaryIndex.cpp
RocksDBEngine/RocksDBReplicationCommon.cpp RocksDBEngine/RocksDBReplicationCommon.cpp

View File

@ -42,6 +42,7 @@
#include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBLogValue.h" #include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h" #include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "RocksDBEngine/RocksDBToken.h" #include "RocksDBEngine/RocksDBToken.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h" #include "RocksDBEngine/RocksDBTransactionCollection.h"
@ -73,11 +74,6 @@ namespace {
static std::string const Empty; static std::string const Empty;
static inline rocksdb::Transaction* rocksTransaction(
arangodb::transaction::Methods* trx) {
return static_cast<RocksDBTransactionState*>(trx->state())
->rocksTransaction();
}
} // namespace } // namespace
RocksDBCollection::RocksDBCollection(LogicalCollection* collection, RocksDBCollection::RocksDBCollection(LogicalCollection* collection,
@ -625,13 +621,13 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
TRI_voc_cid_t cid = _logicalCollection->cid(); TRI_voc_cid_t cid = _logicalCollection->cid();
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction(); RocksDBMethods *mthd = state->rocksdbMethods();
//rocksdb::Transaction* rtrx = state->rocksTransaction();
// delete documents // delete documents
RocksDBKeyBounds documentBounds = RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId()); RocksDBKeyBounds::CollectionDocuments(this->objectId());
std::unique_ptr<rocksdb::Iterator> iter( std::unique_ptr<rocksdb::Iterator> iter = mthd->NewIterator();
rtrx->GetIterator(state->readOptions()));
iter->Seek(documentBounds.start()); iter->Seek(documentBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), documentBounds.end()) < 0) { while (iter->Valid() && cmp->Compare(iter->key(), documentBounds.end()) < 0) {
@ -645,10 +641,9 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// add possible log statement // add possible log statement
state->prepareOperation(cid, revisionId, StringRef(key), state->prepareOperation(cid, revisionId, StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE); TRI_VOC_DOCUMENT_OPERATION_REMOVE);
rocksdb::Status s = rtrx->Delete(iter->key()); Result r = mthd->Delete(iter->key());
if (!s.ok()) { if (!r.ok()) {
auto converted = convertStatus(s); THROW_ARANGO_EXCEPTION(r);
THROW_ARANGO_EXCEPTION(converted);
} }
// report size of key // report size of key
RocksDBOperationResult result = RocksDBOperationResult result =
@ -660,11 +655,6 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
THROW_ARANGO_EXCEPTION(result); THROW_ARANGO_EXCEPTION(result);
} }
// force intermediate commit
if (result.commitRequired()) {
// force commit
}
iter->Next(); iter->Next();
} }
@ -867,7 +857,7 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
RocksDBTransactionState* state = toRocksTransactionState(trx); RocksDBTransactionState* state = toRocksTransactionState(trx);
RocksDBSavePoint guard(rocksTransaction(trx), RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
trx->isSingleOperationTransaction(), trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); }); [&state]() { state->resetLogState(); });

View File

@ -49,30 +49,27 @@ namespace arangodb {
class RocksDBOperationResult : public Result { class RocksDBOperationResult : public Result {
public: public:
explicit RocksDBOperationResult() : Result(), _keySize(0), _commitRequired(false) {} explicit RocksDBOperationResult() : Result(), _keySize(0) {}
RocksDBOperationResult(Result const& other) RocksDBOperationResult(Result const& other)
: _keySize(0), _commitRequired(false) { : _keySize(0) {
cloneData(other); cloneData(other);
} }
RocksDBOperationResult(Result&& other) : _keySize(0), _commitRequired(false) { RocksDBOperationResult(Result&& other) : _keySize(0) {
cloneData(std::move(other)); cloneData(std::move(other));
} }
uint64_t keySize() const { return _keySize; } uint64_t keySize() const { return _keySize; }
void keySize(uint64_t s) { _keySize = s; } void keySize(uint64_t s) { _keySize = s; }
bool commitRequired() const { return _commitRequired; }
void commitRequired(bool cr) { _commitRequired = cr; }
protected: protected:
uint64_t _keySize; uint64_t _keySize;
bool _commitRequired;
}; };
class TransactionState; class TransactionState;
class RocksDBTransactionState; class RocksDBTransactionState;
class RocksDBMethods;
class RocksDBKeyBounds; class RocksDBKeyBounds;
class RocksDBEngine; class RocksDBEngine;
namespace transaction { namespace transaction {
@ -93,6 +90,8 @@ std::pair<VPackSlice, std::unique_ptr<VPackBuffer<uint8_t>>> stripObjectIds(
VPackSlice const& inputSlice, bool checkBeforeCopy = true); VPackSlice const& inputSlice, bool checkBeforeCopy = true);
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx); RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx);
RocksDBMethods* toRocksMethods(transaction::Methods* trx);
rocksdb::TransactionDB* globalRocksDB(); rocksdb::TransactionDB* globalRocksDB();
RocksDBEngine* globalRocksEngine(); RocksDBEngine* globalRocksEngine();
arangodb::Result globalRocksDBPut( arangodb::Result globalRocksDBPut(

View File

@ -476,10 +476,7 @@ int RocksDBGeoIndex::internalInsert(TRI_voc_rid_t revisionId,
int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId,
VPackSlice const& doc, bool isRollback) { VPackSlice const& doc, bool isRollback) {
// acquire rocksdb transaction // acquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx));
rocksdb::Transaction* rtrx = state->rocksTransaction();
GeoIndex_setRocksTransaction(_geoIndex, rtrx);
int res = this->internalInsert(revisionId, doc); int res = this->internalInsert(revisionId, doc);
GeoIndex_clearRocks(_geoIndex); GeoIndex_clearRocks(_geoIndex);
return res; return res;
@ -488,7 +485,7 @@ int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId,
int RocksDBGeoIndex::insertRaw(rocksdb::WriteBatchWithIndex* batch, int RocksDBGeoIndex::insertRaw(rocksdb::WriteBatchWithIndex* batch,
TRI_voc_rid_t revisionId, TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc) { arangodb::velocypack::Slice const& doc) {
GeoIndex_setRocksBatch(_geoIndex, batch); GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx));
int res = this->internalInsert(revisionId, doc); int res = this->internalInsert(revisionId, doc);
GeoIndex_clearRocks(_geoIndex); GeoIndex_clearRocks(_geoIndex);
return res; return res;
@ -556,11 +553,9 @@ int RocksDBGeoIndex::internalRemove(TRI_voc_rid_t revisionId,
int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId,
VPackSlice const& doc, bool isRollback) { VPackSlice const& doc, bool isRollback) {
// acquire rocksdb transaction // acquire rocksdb methods
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); RocksDBMethods *methods = rocksutils::toRocksMethods(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction(); GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx));
GeoIndex_setRocksTransaction(_geoIndex, rtrx);
int res = this->internalRemove(revisionId, doc); int res = this->internalRemove(revisionId, doc);
GeoIndex_clearRocks(_geoIndex); GeoIndex_clearRocks(_geoIndex);
return res; return res;
@ -569,7 +564,7 @@ int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId,
int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch, int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch,
TRI_voc_rid_t revisionId, TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc) { arangodb::velocypack::Slice const& doc) {
GeoIndex_setRocksBatch(_geoIndex, batch); GeoIndex_setRocksMethods(_geoIndex, rocksutils::toRocksMethods(trx));
int res = this->internalRemove(revisionId, doc); int res = this->internalRemove(revisionId, doc);
GeoIndex_clearRocks(_geoIndex); GeoIndex_clearRocks(_geoIndex);
return res; return res;

View File

@ -28,9 +28,8 @@
#include <cstddef> #include <cstddef>
#include <iostream> #include <iostream>
#include <RocksDBEngine/RocksDBGeoIndexImpl.h> #include "RocksDBEngine/RocksDBGeoIndexImpl.h"
#include <rocksdb/utilities/transaction.h> #include "RocksDBEngine/RocksDBMethods.h"
#include <rocksdb/utilities/write_batch_with_index.h>
/* Radius of the earth used for distances */ /* Radius of the earth used for distances */
#define EARTHRADIAN 6371000.0 #define EARTHRADIAN 6371000.0
@ -132,8 +131,7 @@ typedef struct {
GeoIndexFixed fixed; /* fixed point data */ GeoIndexFixed fixed; /* fixed point data */
int nextFreePot; /* pots allocated */ int nextFreePot; /* pots allocated */
int nextFreeSlot; /* slots allocated */ int nextFreeSlot; /* slots allocated */
rocksdb::Transaction *rocksTransaction; RocksDBMethods *rocksMethods;
rocksdb::WriteBatchWithIndex *rocksBatch;
//GeoPot* ypots; /* the pots themselves */// gone //GeoPot* ypots; /* the pots themselves */// gone
//GeoCoordinate* gxc; /* the slots themselves */// gone //GeoCoordinate* gxc; /* the slots themselves */// gone
//size_t _memoryUsed; /* the amount of memory currently used */// gone //size_t _memoryUsed; /* the amount of memory currently used */// gone
@ -272,39 +270,19 @@ namespace arangodb { namespace rocksdbengine {
/* CRUD interface */ /* CRUD interface */
void GeoIndex_setRocksTransaction(GeoIdx* gi, void GeoIndex_setRocksMethods(GeoIdx* gi, RocksDBMethods* trx) {
rocksdb::Transaction* trx) {
GeoIx* gix = (GeoIx*)gi; GeoIx* gix = (GeoIx*)gi;
gix->rocksTransaction = trx; gix->rocksMethods = trx;
}
void GeoIndex_setRocksBatch(GeoIdx* gi,
rocksdb::WriteBatchWithIndex* wb) {
GeoIx* gix = (GeoIx*)gi;
gix->rocksBatch = wb;
} }
void GeoIndex_clearRocks(GeoIdx* gi) { void GeoIndex_clearRocks(GeoIdx* gi) {
GeoIx* gix = (GeoIx*)gi; GeoIx* gix = (GeoIx*)gi;
gix->rocksTransaction = nullptr; gix->rocksMethods = nullptr;
gix->rocksBatch = nullptr;
} }
inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) { inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) {
rocksdb::Status s; arangodb::Result r = gix->rocksMethods->Get(key, val);
rocksdb::ReadOptions opts; if (!r.ok()) {
if (gix->rocksTransaction != nullptr) {
s = gix->rocksTransaction->Get(opts, key.string(), val);
} else {
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
if (gix->rocksBatch != nullptr) {
s = gix->rocksBatch->GetFromBatchAndDB(db, opts, key.string(), val);
} else {
s = db->Get(opts, key.string(), val);
}
}
if (!s.ok()) {
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
} }
} }
@ -312,39 +290,15 @@ inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) {
inline void RocksWrite(GeoIx * gix, inline void RocksWrite(GeoIx * gix,
RocksDBKey const& key, RocksDBKey const& key,
rocksdb::Slice const& slice) { rocksdb::Slice const& slice) {
rocksdb::Status s; arangodb::Result r = gix->rocksMethods->Put(key, slice);
if (gix->rocksTransaction != nullptr) { if (!r.ok()) {
s = gix->rocksTransaction->Put(key.string(), slice);
} else {
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
if (gix->rocksBatch != nullptr) {
gix->rocksBatch->Put(key.string(), slice);
} else {
rocksdb::WriteOptions opts;
s = db->Put(opts, key.string(), slice);
}
}
if (!s.ok()) {
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
} }
} }
inline void RocksDelete(GeoIx* gix, RocksDBKey const& key) { inline void RocksDelete(GeoIx* gix, RocksDBKey const& key) {
rocksdb::Status s; arangodb::Result r = gix->rocksMethods->Delete(key);
if (gix->rocksTransaction != nullptr) { if (!r.ok()) {
s = gix->rocksTransaction->Delete(key.string());
} else {
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
if (gix->rocksBatch != nullptr) {
gix->rocksBatch->Delete(key.string());
} else {
rocksdb::WriteOptions opts;
s = db->Delete(opts, key.string());
}
}
if (!s.ok()) {
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
} }
} }
@ -466,8 +420,7 @@ GeoIdx* GeoIndex_new(uint64_t objectId,
return (GeoIdx*)gix; return (GeoIdx*)gix;
} }
// need to set these to null // need to set these to null
gix->rocksTransaction = nullptr; gix->rocksMethods = nullptr;
gix->rocksBatch = nullptr;
/* set up the fixed points structure */ /* set up the fixed points structure */

View File

@ -30,12 +30,9 @@
#include "Basics/Common.h" #include "Basics/Common.h"
#include <cstdint> #include <cstdint>
namespace rocksdb { namespace arangodb {
class Transaction; struct RocksDBMethods;
class WriteBatchWithIndex; namespace rocksdbengine {
}
namespace arangodb { namespace rocksdbengine {
/* first the things that a user might want to change */ /* first the things that a user might want to change */
@ -115,8 +112,7 @@ void GeoIndex_INDEXDUMP(GeoIdx* gi, FILE* f);
int GeoIndex_INDEXVALID(GeoIdx* gi); int GeoIndex_INDEXVALID(GeoIdx* gi);
#endif #endif
void GeoIndex_setRocksTransaction(GeoIdx* gi, rocksdb::Transaction*); void GeoIndex_setRocksMethods(GeoIdx* gi, RocksDBMethods*);
void GeoIndex_setRocksBatch(GeoIdx* gi, rocksdb::WriteBatchWithIndex*);
void GeoIndex_clearRocks(GeoIdx* gi); void GeoIndex_clearRocks(GeoIdx* gi);
}} }}

View File

@ -0,0 +1,128 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBMethods.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/write_batch_with_index.h>
using namespace arangodb;
// ================= RocksDBSavePoint ==================
RocksDBSavePoint::RocksDBSavePoint(
RocksDBMethods* trx, bool handled,
std::function<void()> const& rollbackCallback)
: _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) {
TRI_ASSERT(trx != nullptr);
if (!_handled) {
_trx->SetSavePoint();
}
}
RocksDBSavePoint::~RocksDBSavePoint() {
if (!_handled) {
rollback();
}
}
void RocksDBSavePoint::commit() {
// note: _handled may already be true here
_handled = true; // this will prevent the rollback
}
void RocksDBSavePoint::rollback() {
TRI_ASSERT(!_handled);
_trx->RollbackToSavePoint();
_handled = true; // in order to not roll back again by accident
_rollbackCallback();
}
// =================== RocksDBReadOnlyMethods ====================
RocksDBReadOnlyMethods::RocksDBReadOnlyMethods(RocksDBTransactionState* state)
: _state(state) {
_db = rocksutils::globalRocksDB();
}
arangodb::Result RocksDBReadOnlyMethods::Get(RocksDBKey const& key,
std::string* val) {
rocksdb::Status s = _db->Get(_state->_rocksReadOptions, key.string(), val);
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
arangodb::Result RocksDBReadOnlyMethods::Put(RocksDBKey const& key,
rocksdb::Slice const& val) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
}
arangodb::Result RocksDBReadOnlyMethods::Delete(RocksDBKey const& key) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
}
std::unique_ptr<rocksdb::Iterator> RocksDBReadOnlyMethods::NewIterator() {
return std::unique_ptr<rocksdb::Iterator>(
_db->NewIterator(_state->_rocksReadOptions));
}
// =================== RocksDBTrxMethods ====================
RocksDBTrxMethods::RocksDBTrxMethods(RocksDBTransactionState* state)
: _state(state) {}
arangodb::Result RocksDBTrxMethods::Get(RocksDBKey const& key,
std::string* val) {
rocksdb::Status s = _state->_rocksTransaction->Get(_state->_rocksReadOptions,
key.string(), val);
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
arangodb::Result RocksDBTrxMethods::Put(RocksDBKey const& key,
rocksdb::Slice const& val) {
rocksdb::Status s = _state->_rocksTransaction->Put(key.string(), val);
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
arangodb::Result RocksDBTrxMethods::Delete(RocksDBKey const& key) {
rocksdb::Status s = _state->_rocksTransaction->Delete(key.string());
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
std::unique_ptr<rocksdb::Iterator> RocksDBTrxMethods::NewIterator() {
return std::unique_ptr<rocksdb::Iterator>(
_state->_rocksTransaction->GetIterator(_state->_rocksReadOptions));
}
void RocksDBTrxMethods::SetSavePoint() {
_state->_rocksTransaction->SetSavePoint();
}
arangodb::Result RocksDBTrxMethods::RollbackToSavePoint() {
return rocksutils::convertStatus(
_state->_rocksTransaction->RollbackToSavePoint());
}

View File

@ -0,0 +1,114 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_ROCKSDB_ROCKSDB_METHODS_H
#define ARANGOD_ROCKSDB_ROCKSDB_METHODS_H 1
#include "Basics/Result.h"
namespace rocksdb {
class Transaction;
class Slice;
class Iterator;
class TransactionDB;
} // namespace rocksdb
namespace arangodb {
class RocksDBKey;
class RocksDBMethods;
class RocksDBTransactionState;
class RocksDBSavePoint {
public:
RocksDBSavePoint(RocksDBMethods* trx, bool handled,
std::function<void()> const& rollbackCallback);
~RocksDBSavePoint();
void commit();
private:
void rollback();
private:
RocksDBMethods* _trx;
std::function<void()> const _rollbackCallback;
bool _handled;
};
class RocksDBMethods {
public:
// RocksDBOperations(rocksdb::ReadOptions ro, rocksdb::WriteOptions wo) :
// _readOptions(ro), _writeOptions(wo) {}
virtual ~RocksDBMethods() {}
virtual arangodb::Result Get(RocksDBKey const&, std::string*) = 0;
virtual arangodb::Result Put(RocksDBKey const&, rocksdb::Slice const&) = 0;
// virtual arangodb::Result Merge(RocksDBKey const&, rocksdb::Slice const&) =
// 0;
virtual arangodb::Result Delete(RocksDBKey const&) = 0;
virtual std::unique_ptr<rocksdb::Iterator> NewIterator() = 0;
virtual void SetSavePoint() = 0;
virtual arangodb::Result RollbackToSavePoint() = 0;
};
class RocksDBReadOnlyMethods : public RocksDBMethods {
public:
RocksDBReadOnlyMethods(RocksDBTransactionState* state);
arangodb::Result Get(RocksDBKey const& key, std::string* val) override;
arangodb::Result Put(RocksDBKey const& key,
rocksdb::Slice const& val) override;
arangodb::Result Delete(RocksDBKey const& key) override;
std::unique_ptr<rocksdb::Iterator> NewIterator() override;
void SetSavePoint() override {}
arangodb::Result RollbackToSavePoint() override { return arangodb::Result(); }
private:
RocksDBTransactionState* _state;
rocksdb::TransactionDB* _db;
};
/// transactional operations
class RocksDBTrxMethods : public RocksDBMethods {
public:
RocksDBTrxMethods(RocksDBTransactionState* state);
arangodb::Result Get(RocksDBKey const& key, std::string* val) override;
arangodb::Result Put(RocksDBKey const& key,
rocksdb::Slice const& val) override;
arangodb::Result Delete(RocksDBKey const& key) override;
std::unique_ptr<rocksdb::Iterator> NewIterator() override;
void SetSavePoint() override;
arangodb::Result RollbackToSavePoint() override;
private:
RocksDBTransactionState* _state;
};
} // namespace arangodb
#endif

View File

@ -258,6 +258,14 @@ void RocksDBTransactionCollection::addOperation(
_operationSize += operationSize; _operationSize += operationSize;
} }
void RocksDBTransactionCollection::commitCounts() {
_initialNumberDocuments = _numInserts - _numRemoves;
_operationSize = 0;
_numInserts = 0;
_numUpdates = 0;
_numRemoves = 0;
}
/// @brief lock a collection /// @brief lock a collection
int RocksDBTransactionCollection::doLock(AccessMode::Type type, int nestingLevel) { int RocksDBTransactionCollection::doLock(AccessMode::Type type, int nestingLevel) {
if (!AccessMode::isWriteOrExclusive(type)) { if (!AccessMode::isWriteOrExclusive(type)) {

View File

@ -76,8 +76,8 @@ class RocksDBTransactionCollection final : public TransactionCollection {
uint64_t numRemoves() const { return _numRemoves; } uint64_t numRemoves() const { return _numRemoves; }
/// @brief add an operation for a transaction collection /// @brief add an operation for a transaction collection
void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId) ; void addOperation(TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_rid_t revisionId);
void resetCounts(); void commitCounts();
private: private:
/// @brief request a lock for a collection /// @brief request a lock for a collection

View File

@ -34,6 +34,7 @@
#include "RocksDBEngine/RocksDBCounterManager.h" #include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBLogValue.h" #include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h" #include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h" #include "StorageEngine/StorageEngine.h"
@ -56,34 +57,6 @@ using namespace arangodb;
// for the RocksDB engine we do not need any additional data // for the RocksDB engine we do not need any additional data
struct RocksDBTransactionData final : public TransactionData {}; struct RocksDBTransactionData final : public TransactionData {};
RocksDBSavePoint::RocksDBSavePoint(
rocksdb::Transaction* trx, bool handled,
std::function<void()> const& rollbackCallback)
: _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) {
TRI_ASSERT(trx != nullptr);
if (!_handled) {
_trx->SetSavePoint();
}
}
RocksDBSavePoint::~RocksDBSavePoint() {
if (!_handled) {
rollback();
}
}
void RocksDBSavePoint::commit() {
// note: _handled may already be true here
_handled = true; // this will prevent the rollback
}
void RocksDBSavePoint::rollback() {
TRI_ASSERT(!_handled);
_trx->RollbackToSavePoint();
_handled = true; // in order to not roll back again by accident
_rollbackCallback();
}
/// @brief transaction type /// @brief transaction type
RocksDBTransactionState::RocksDBTransactionState( RocksDBTransactionState::RocksDBTransactionState(
TRI_vocbase_t* vocbase, uint64_t maxTransSize, TRI_vocbase_t* vocbase, uint64_t maxTransSize,
@ -92,7 +65,6 @@ RocksDBTransactionState::RocksDBTransactionState(
: TransactionState(vocbase), : TransactionState(vocbase),
_rocksReadOptions(), _rocksReadOptions(),
_cacheTx(nullptr), _cacheTx(nullptr),
_transactionSize(0),
_maxTransactionSize(maxTransSize), _maxTransactionSize(maxTransSize),
_intermediateTransactionSize(intermediateTransactionSize), _intermediateTransactionSize(intermediateTransactionSize),
_intermediateTransactionNumber(intermediateTransactionNumber), _intermediateTransactionNumber(intermediateTransactionNumber),
@ -157,24 +129,13 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
_cacheTx = _cacheTx =
CacheManagerFeature::MANAGER->beginTransaction(isReadOnlyTransaction()); CacheManagerFeature::MANAGER->beginTransaction(isReadOnlyTransaction());
// start rocks transaction if (isReadOnlyTransaction()) {
StorageEngine* engine = EngineSelectorFeature::ENGINE; _rocksMethods.reset(new RocksDBReadOnlyMethods());
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db(); } else {
_rocksTransaction.reset(db->BeginTransaction( createTransaction();
_rocksWriteOptions, rocksdb::TransactionOptions())); _rocksMethods.reset(new RocksDBTrxMethods(this));
_rocksTransaction->SetSnapshot();
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
_rocksReadOptions.prefix_same_as_start = true;
if (!isReadOnlyTransaction() &&
!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
RocksDBLogValue header =
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
_rocksTransaction->PutLogData(header.slice());
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
++_numLogdata;
#endif
} }
} else { } else {
TRI_ASSERT(_status == transaction::Status::RUNNING); TRI_ASSERT(_status == transaction::Status::RUNNING);
} }
@ -182,6 +143,102 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
return result; return result;
} }
void RocksDBTransactionState::createTransaction() {
TRI_ASSERT(!_rocksTransaction);
// TODO intermediates
// start rocks transaction
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
_rocksTransaction.reset(db->BeginTransaction(
_rocksWriteOptions, rocksdb::TransactionOptions()));
_rocksTransaction->SetSnapshot();
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
_rocksReadOptions.prefix_same_as_start = true;
if (!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
RocksDBLogValue header =
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
_rocksTransaction->PutLogData(header.slice());
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
++_numLogdata;
#endif
}
}
arangodb::Result RocksDBTransactionState::internalCommit() {
TRI_ASSERT(_rocksTransaction != nullptr);
arangodb::Result result;
if (_rocksTransaction->GetNumKeys() > 0) {
// set wait for sync flag if required
if (waitForSync()) {
_rocksWriteOptions.sync = true;
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
}
// double t1 = TRI_microtime();
result = rocksutils::convertStatus(_rocksTransaction->Commit());
// double t2 = TRI_microtime();
// if (t2 - t1 > 0.25) {
// LOG_TOPIC(ERR, Logger::FIXME)
// << "COMMIT TOOK: " << (t2 - t1)
// << " S. NUMINSERTS: " << _numInserts
// << ", NUMUPDATES: " << _numUpdates
// << ", NUMREMOVES: " << _numRemoves
// << ", TRANSACTIONSIZE: " << _transactionSize;
// }
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (!result.ok()) {
return result;
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment =
collection->numInserts() - collection->numRemoves();
if (collection->numInserts() != 0 || collection->numRemoves() != 0 ||
collection->revision() != 0) {
RocksDBCollection* coll = static_cast<RocksDBCollection*>(
trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
coll->setRevision(collection->revision());
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
RocksDBCounterManager::CounterAdjustment update(
latestSeq, collection->numInserts(), collection->numRemoves(),
collection->revision());
engine->counterManager()->updateCounter(coll->objectId(), update);
}
// we need this in case of an intermediate commit. The number of
// initial documents is adjusted and numInserts / removes is set to 0
collection->commitCounts();
}
} else {
// don't write anything if the transaction is empty
result = rocksutils::convertStatus(_rocksTransaction->Rollback());
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
}
_rocksTransaction.reset();
return result;
}
/// @brief commit a transaction /// @brief commit a transaction
Result RocksDBTransactionState::commitTransaction( Result RocksDBTransactionState::commitTransaction(
transaction::Methods* activeTrx) { transaction::Methods* activeTrx) {
@ -193,86 +250,20 @@ Result RocksDBTransactionState::commitTransaction(
return Result(TRI_ERROR_DEBUG); return Result(TRI_ERROR_DEBUG);
} }
arangodb::Result result; arangodb::Result res;
if (_nestingLevel == 0) { if (_nestingLevel == 0) {
if (_rocksTransaction != nullptr) { if (_rocksTransaction != nullptr) {
// if (hasOperations()) { res = internalCommit();
if (_rocksTransaction->GetNumKeys() > 0) { if (!res.ok()) {
// set wait for sync flag if required abortTransaction(activeTrx);
if (waitForSync()) {
_rocksWriteOptions.sync = true;
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
}
// TODO wait for response on github issue to see how we can use the
// sequence number
// double t1 = TRI_microtime();
result = rocksutils::convertStatus(_rocksTransaction->Commit());
// double t2 = TRI_microtime();
// if (t2 - t1 > 0.25) {
// LOG_TOPIC(ERR, Logger::FIXME)
// << "COMMIT TOOK: " << (t2 - t1)
// << " S. NUMINSERTS: " << _numInserts
// << ", NUMUPDATES: " << _numUpdates
// << ", NUMREMOVES: " << _numRemoves
// << ", TRANSACTIONSIZE: " << _transactionSize;
// }
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (!result.ok()) {
abortTransaction(activeTrx);
return result;
}
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
static_cast<RocksDBTransactionCollection*>(trxCollection);
int64_t adjustment =
collection->numInserts() - collection->numRemoves();
if (collection->numInserts() != 0 || collection->numRemoves() != 0 ||
collection->revision() != 0) {
RocksDBCollection* coll = static_cast<RocksDBCollection*>(
trxCollection->collection()->getPhysical());
coll->adjustNumberDocuments(adjustment);
coll->setRevision(collection->revision());
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
RocksDBCounterManager::CounterAdjustment update(
latestSeq, collection->numInserts(), collection->numRemoves(),
collection->revision());
engine->counterManager()->updateCounter(coll->objectId(), update);
}
}
} else {
// don't write anything if the transaction is empty
result = rocksutils::convertStatus(_rocksTransaction->Rollback());
if (_cacheTx != nullptr) {
// note: endTransaction() will delete _cacheTx!
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
_cacheTx = nullptr;
}
} }
_rocksTransaction.reset();
} }
updateStatus(transaction::Status::COMMITTED); updateStatus(transaction::Status::COMMITTED);
} }
unuseCollections(_nestingLevel); unuseCollections(_nestingLevel);
return result; return res;
} }
/// @brief abort and rollback a transaction /// @brief abort and rollback a transaction
@ -385,7 +376,8 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
uint64_t keySize) { uint64_t keySize) {
RocksDBOperationResult res; RocksDBOperationResult res;
uint64_t newSize = _transactionSize + operationSize + keySize; size_t currentSize = _rocksTransaction->GetWriteBatch()->GetWriteBatch()->GetDataSize();
uint64_t newSize = currentSize + operationSize + keySize;
if (_maxTransactionSize < newSize) { if (_maxTransactionSize < newSize) {
// we hit the transaction size limit // we hit the transaction size limit
std::string message = std::string message =
@ -428,9 +420,7 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
break; break;
} }
_transactionSize = newSize;
auto numOperations = _numInserts + _numUpdates + _numRemoves; auto numOperations = _numInserts + _numUpdates + _numRemoves;
// signal if intermediate commit is required // signal if intermediate commit is required
// this will be done if intermediate transactions are enabled // this will be done if intermediate transactions are enabled
// and either the "number of operations" or the "transaction size" // and either the "number of operations" or the "transaction size"
@ -438,25 +428,23 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
if (_intermediateTransactionEnabled && if (_intermediateTransactionEnabled &&
(_intermediateTransactionNumber <= numOperations || (_intermediateTransactionNumber <= numOperations ||
_intermediateTransactionSize <= newSize)) { _intermediateTransactionSize <= newSize)) {
res.commitRequired(true); //res.commitRequired(true);
internalCommit();
// TODO perform intermediate commit
} }
return res; return res;
} }
RocksDBMethods* RocksDBTransactionState::rocksdbMethods() {
TRI_ASSERT(_rocksMethods);
return _rocksMethods.get();
}
uint64_t RocksDBTransactionState::sequenceNumber() const { uint64_t RocksDBTransactionState::sequenceNumber() const {
return static_cast<uint64_t>( return static_cast<uint64_t>(
_rocksTransaction->GetSnapshot()->GetSequenceNumber()); _rocksTransaction->GetSnapshot()->GetSequenceNumber());
} }
/*
class RocksDBBatchTrx : public RocksDBBatch {
arangodb::Result Get(RocksDBKey const&, std::string*) override {
}
arangodb::Result Put(RocksDBKey const&, rocksdb::Slice const&) override {
}
arangodb::Result Delete(RocksDBKey const&) override ;
};*/

View File

@ -54,32 +54,14 @@ namespace transaction {
class Methods; class Methods;
} }
class TransactionCollection; class TransactionCollection;
class RocksDBMethods;
class RocksDBSavePoint {
public:
RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function<void()> const& rollbackCallback);
~RocksDBSavePoint();
void commit();
private:
void rollback();
private:
rocksdb::Transaction* _trx;
std::function<void()> const _rollbackCallback;
bool _handled;
};
/*class RocksDBKey;
struct RocksDBBatch {
virtual arangodb::Result Get(RocksDBKey const&, std::string*) = 0;
virtual arangodb::Result Put(RocksDBKey const&, rocksdb::Slice const&) = 0;
virtual arangodb::Result Delete(RocksDBKey const&) = 0;
};*/
/// @brief transaction type /// @brief transaction type
class RocksDBTransactionState final : public TransactionState { class RocksDBTransactionState final : public TransactionState {
friend class RocksDBReadOnlyMethods;
friend class RocksDBTrxMethods;
//friend struct RocksDBIntermediateOps;
public: public:
explicit RocksDBTransactionState(TRI_vocbase_t* vocbase, explicit RocksDBTransactionState(TRI_vocbase_t* vocbase,
uint64_t maxOperationSize, uint64_t maxOperationSize,
@ -122,24 +104,29 @@ class RocksDBTransactionState final : public TransactionState {
TRI_voc_document_operation_e operationType, uint64_t operationSize, TRI_voc_document_operation_e operationType, uint64_t operationSize,
uint64_t keySize); uint64_t keySize);
rocksdb::Transaction* rocksTransaction() { RocksDBMethods* rocksdbMethods();
/*rocksdb::Transaction* rocksTransaction() {
TRI_ASSERT(_rocksTransaction != nullptr); TRI_ASSERT(_rocksTransaction != nullptr);
return _rocksTransaction.get(); return _rocksTransaction.get();
} }
rocksdb::ReadOptions const& readOptions() const { return _rocksReadOptions; } rocksdb::ReadOptions const& readOptions() const { return _rocksReadOptions; }
rocksdb::WriteOptions const& writeOptions() const { return _rocksWriteOptions; }*/
rocksdb::WriteOptions const& writeOptions() const { return _rocksWriteOptions; }
uint64_t sequenceNumber() const; uint64_t sequenceNumber() const;
private:
void createTransaction();
arangodb::Result internalCommit();
private: private:
std::unique_ptr<rocksdb::Transaction> _rocksTransaction; std::unique_ptr<rocksdb::Transaction> _rocksTransaction;
rocksdb::WriteOptions _rocksWriteOptions; rocksdb::WriteOptions _rocksWriteOptions;
rocksdb::ReadOptions _rocksReadOptions; rocksdb::ReadOptions _rocksReadOptions;
cache::Transaction* _cacheTx; cache::Transaction* _cacheTx;
// current transaction size
uint64_t _transactionSize; std::unique_ptr<RocksDBMethods> _rocksMethods;
// a transaction may not become bigger than this value // a transaction may not become bigger than this value
uint64_t _maxTransactionSize; uint64_t _maxTransactionSize;
// if a transaction gets bigger than this value and intermediate transactions // if a transaction gets bigger than this value and intermediate transactions