1
0
Fork 0

RocksDB WAL entries for transactions

This commit is contained in:
Simon Grätzer 2017-04-26 18:54:55 +02:00
parent bd48154cca
commit 29286a9c1b
10 changed files with 99 additions and 50 deletions

View File

@ -38,6 +38,7 @@
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "RocksDBEngine/RocksDBToken.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
@ -435,19 +436,22 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
iter->Seek(documentBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), documentBounds.end()) < 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
// add possible log statement
state->prepareOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE);
rocksdb::Status s = rtrx->Delete(iter->key());
if (!s.ok()) {
auto converted = convertStatus(s);
THROW_ARANGO_EXCEPTION(converted);
}
// transaction size limit reached -- fail
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
// report size of key
RocksDBOperationResult result =
state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE,
0, iter->key().size());
// transaction size limit reached -- fail
if (result.fail()) {
THROW_ARANGO_EXCEPTION(result);
}
@ -696,8 +700,8 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
mergeObjectsForUpdate(trx, oldDoc, newSlice, isEdgeCollection,
TRI_RidToString(revisionId), options.mergeObjects,
options.keepNull, *builder.get());
if (trx->state()->isDBServer()) {
RocksDBTransactionState* state = static_cast<RocksDBTransactionState*>(trx->state());
if (state->isDBServer()) {
// Need to check that no sharding keys have changed:
if (arangodb::shardKeysChanged(_logicalCollection->dbName(),
trx->resolver()->getCollectionNameCluster(
@ -709,9 +713,11 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_UPDATE);
VPackSlice const newDoc(builder->slice());
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
options.waitForSync);
@ -724,8 +730,7 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
TRI_ASSERT(!mdr.empty());
// report document and key size
result = static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
result = state ->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_UPDATE,
newDoc.byteSize(), res.keySize());
@ -796,7 +801,8 @@ int RocksDBCollection::replace(
isEdgeCollection, TRI_RidToString(revisionId),
*builder.get());
if (trx->state()->isDBServer()) {
RocksDBTransactionState* state = static_cast<RocksDBTransactionState*>(trx->state());
if (state->isDBServer()) {
// Need to check that no sharding keys have changed:
if (arangodb::shardKeysChanged(_logicalCollection->dbName(),
trx->resolver()->getCollectionNameCluster(
@ -808,6 +814,10 @@ int RocksDBCollection::replace(
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
// add possible log statement under guard
state->prepareOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REPLACE);
RocksDBOperationResult opResult =
updateDocument(trx, oldRevisionId, oldDoc, revisionId,
@ -822,8 +832,7 @@ int RocksDBCollection::replace(
TRI_ASSERT(!mdr.empty());
// report document and key size
result = static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
result = state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REPLACE,
VPackSlice(builder->slice()).byteSize(),
opResult.keySize());
@ -894,12 +903,16 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx,
RocksDBSavePoint guard(rocksTransaction(trx),
trx->isSingleOperationTransaction());
// add possible log statement under guard
RocksDBTransactionState* state = static_cast<RocksDBTransactionState*>(trx->state());
state->prepareOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REMOVE);
//RocksDBLogValue val = RocksDBLogValue::DocumentRemove(StringRef(key));
//state->rocksTransaction()->PutLogData(val.slice());
res = removeDocument(trx, oldRevisionId, oldDoc, options.waitForSync);
if (res.ok()) {
// report key size
res =
static_cast<RocksDBTransactionState*>(trx->state())
->addOperation(_logicalCollection->cid(), revisionId,
res = state->addOperation(_logicalCollection->cid(), revisionId,
TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, res.keySize());
// transaction size limit reached -- fail
if (res.fail()) {

View File

@ -75,6 +75,10 @@ RocksDBLogValue RocksDBLogValue::ViewDrop(TRI_voc_cid_t cid, TRI_idx_iid_t iid)
return RocksDBLogValue(RocksDBLogType::ViewDrop, cid, iid);
}
RocksDBLogValue RocksDBLogValue::DocumentOpsPrologue(TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::DocumentOperationsPrologue, cid);
}
RocksDBLogValue RocksDBLogValue::DocumentRemove(arangodb::StringRef const& key) {
return RocksDBLogValue(RocksDBLogType::DocumentRemove, key);
}
@ -87,7 +91,8 @@ RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t val) : _buffer()
case RocksDBLogType::CollectionCreate:
case RocksDBLogType::CollectionDrop:
case RocksDBLogType::CollectionRename:
case RocksDBLogType::CollectionChange:{
case RocksDBLogType::CollectionChange:
case RocksDBLogType::DocumentOperationsPrologue: {
_buffer.reserve(sizeof(RocksDBLogType) + sizeof(uint64_t));
_buffer += static_cast<char>(type);
uint64ToPersistent(_buffer, val);// database or collection ID

View File

@ -54,6 +54,7 @@ class RocksDBLogValue {
static RocksDBLogValue ViewCreate(TRI_voc_cid_t, TRI_idx_iid_t);
static RocksDBLogValue ViewDrop(TRI_voc_cid_t, TRI_idx_iid_t);
static RocksDBLogValue DocumentOpsPrologue(TRI_voc_cid_t cid);
static RocksDBLogValue DocumentRemove(arangodb::StringRef const&);
public:

View File

@ -52,7 +52,7 @@
using namespace arangodb;
// for the RocksDB engine we do not need any additional data
// for the RocksDB engine we do not need any additional data
struct RocksDBTransactionData final : public TransactionData {};
RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx)
@ -98,7 +98,8 @@ RocksDBTransactionState::RocksDBTransactionState(
_numInserts(0),
_numUpdates(0),
_numRemoves(0),
_intermediateTransactionEnabled(intermediateTransactionEnabled) {}
_intermediateTransactionEnabled(intermediateTransactionEnabled),
_lastUsedCollection(UINT64_MAX) {}
/// @brief free a transaction container
RocksDBTransactionState::~RocksDBTransactionState() {
@ -111,9 +112,9 @@ RocksDBTransactionState::~RocksDBTransactionState() {
/// @brief start a transaction
Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type)
<< " transaction";
LOG_TRX(this, _nestingLevel)
<< "beginning " << AccessMode::typeString(_type) << " transaction";
Result result = useCollections(_nestingLevel);
if (result.ok()) {
@ -157,23 +158,23 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
_rocksWriteOptions, rocksdb::TransactionOptions()));
_rocksTransaction->SetSnapshot();
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
RocksDBLogValue header = RocksDBLogValue::BeginTransaction(_vocbase->id(),
_id);
RocksDBLogValue header =
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
_rocksTransaction->PutLogData(header.slice());
} else {
TRI_ASSERT(_status == transaction::Status::RUNNING);
}
return result;
}
/// @brief commit a transaction
Result RocksDBTransactionState::commitTransaction(
transaction::Methods* activeTrx) {
LOG_TRX(this, _nestingLevel) << "committing " << AccessMode::typeString(_type)
<< " transaction";
LOG_TRX(this, _nestingLevel)
<< "committing " << AccessMode::typeString(_type) << " transaction";
TRI_ASSERT(_status == transaction::Status::RUNNING);
TRI_IF_FAILURE("TransactionWriteCommitMarker") {
@ -189,7 +190,7 @@ Result RocksDBTransactionState::commitTransaction(
_rocksWriteOptions.sync = true;
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
}
// TODO wait for response on github issue to see how we can use the
// sequence number
result = rocksutils::convertStatus(_rocksTransaction->Commit());
@ -245,8 +246,8 @@ Result RocksDBTransactionState::commitTransaction(
/// @brief abort and rollback a transaction
Result RocksDBTransactionState::abortTransaction(
transaction::Methods* activeTrx) {
LOG_TRX(this, _nestingLevel) << "aborting " << AccessMode::typeString(_type)
<< " transaction";
LOG_TRX(this, _nestingLevel)
<< "aborting " << AccessMode::typeString(_type) << " transaction";
TRI_ASSERT(_status == transaction::Status::RUNNING);
Result result;
@ -277,6 +278,27 @@ Result RocksDBTransactionState::abortTransaction(
return result;
}
void RocksDBTransactionState::prepareOperation(
TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
TRI_voc_document_operation_e operationType) {
switch (operationType) {
case TRI_VOC_DOCUMENT_OPERATION_INSERT:
case TRI_VOC_DOCUMENT_OPERATION_UPDATE:
case TRI_VOC_DOCUMENT_OPERATION_REPLACE:
case TRI_VOC_DOCUMENT_OPERATION_REMOVE: {
if (collectionId != _lastUsedCollection) {
RocksDBLogValue logValue =
RocksDBLogValue::DocumentOpsPrologue(collectionId);
_rocksTransaction->PutLogData(logValue.slice());
_lastUsedCollection = collectionId;
}
} break;
case TRI_VOC_DOCUMENT_OPERATION_UNKNOWN:
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
break;
}
}
/// @brief add an operation for a transaction collection
RocksDBOperationResult RocksDBTransactionState::addOperation(
TRI_voc_cid_t cid, TRI_voc_rid_t revisionId,
@ -298,7 +320,7 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
static_cast<RocksDBTransactionCollection*>(findCollection(cid));
if (collection == nullptr) {
std::string message = "collection '" + std::to_string(cid) +
std::string message = "collection '" + std::to_string(cid) +
"' not found in transaction state";
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
}

View File

@ -42,7 +42,7 @@ namespace rocksdb {
class Transaction;
class Slice;
class Iterator;
}
} // namespace rocksdb
namespace arangodb {
namespace cache {
@ -62,6 +62,7 @@ class RocksDBSavePoint {
~RocksDBSavePoint();
void commit();
private:
void rollback();
@ -101,6 +102,9 @@ class RocksDBTransactionState final : public TransactionState {
return (_status == transaction::Status::ABORTED) && hasOperations();
}
void prepareOperation(TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
TRI_voc_document_operation_e operationType);
/// @brief add an operation for a transaction collection
RocksDBOperationResult addOperation(
TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
@ -135,7 +139,10 @@ class RocksDBTransactionState final : public TransactionState {
uint64_t _numUpdates;
uint64_t _numRemoves;
bool _intermediateTransactionEnabled;
/// Last collection used for transaction
TRI_voc_cid_t _lastUsedCollection;
};
}
} // namespace arangodb
#endif

View File

@ -57,7 +57,8 @@ enum class RocksDBLogType : char {
ViewCreate = '9',
ViewDrop = ':',
ViewChange = ';',
DocumentRemove = '<'
DocumentOperationsPrologue = '<',
DocumentRemove = '='
};

View File

@ -1687,7 +1687,7 @@ int32_t int32(std::string const& str) {
struct reent buffer;
return _strtol_r(&buffer, str.c_str(), 0, 10);
#else
return strtol(str.c_str(), 0, 10);
return (int32_t)strtol(str.c_str(), 0, 10);
#endif
#endif
}
@ -1713,7 +1713,7 @@ int32_t int32(char const* value, size_t size) {
struct reent buffer;
return _strtol_r(&buffer, value, 0, 10);
#else
return strtol(value, 0, 10);
return (int32_t)strtol(value, 0, 10);
#endif
#endif
}
@ -1727,7 +1727,7 @@ uint32_t uint32(std::string const& str) {
struct reent buffer;
return _strtoul_r(&buffer, str.c_str(), 0, 10);
#else
return strtoul(str.c_str(), 0, 10);
return (uint32_t)strtoul(str.c_str(), 0, 10);
#endif
#endif
}
@ -1741,7 +1741,7 @@ uint32_t unhexUint32(std::string const& str) {
struct reent buffer;
return _strtoul_r(&buffer, str.c_str(), 0, 16);
#else
return strtoul(str.c_str(), 0, 16);
return (uint32_t)strtoul(str.c_str(), 0, 16);
#endif
#endif
}
@ -1767,7 +1767,7 @@ uint32_t uint32(char const* value, size_t size) {
struct reent buffer;
return _strtoul_r(&buffer, value, 0, 10);
#else
return strtoul(value, 0, 10);
return (uint32_t)strtoul(value, 0, 10);
#endif
#endif
}
@ -1793,7 +1793,7 @@ uint32_t unhexUint32(char const* value, size_t size) {
struct reent buffer;
return _strtoul_r(&buffer, value, 0, 16);
#else
return strtoul(value, 0, 16);
return (uint32_t)strtoul(value, 0, 16);
#endif
#endif
}

View File

@ -105,11 +105,11 @@ static inline int TRI_bind(TRI_socket_t s, const struct sockaddr* address,
////////////////////////////////////////////////////////////////////////////////
static inline int TRI_connect(TRI_socket_t s, const struct sockaddr* address,
int addr_len) {
size_t addr_len) {
#ifdef _WIN32
return connect(s.fileHandle, address, addr_len);
return connect(s.fileHandle, address, (int)addr_len);
#else
return connect(s.fileDescriptor, address, addr_len);
return connect(s.fileDescriptor, address, (socklen_t)addr_len);
#endif
}
@ -117,7 +117,7 @@ static inline int TRI_connect(TRI_socket_t s, const struct sockaddr* address,
/// @brief send abstraction for different OSes
////////////////////////////////////////////////////////////////////////////////
static inline int TRI_send(TRI_socket_t s, const void* buffer, size_t length,
static inline long TRI_send(TRI_socket_t s, const void* buffer, size_t length,
int flags) {
#ifdef _WIN32
return send(s.fileHandle, (char*)buffer, (int)length, flags);

View File

@ -228,7 +228,7 @@ TRI_socket_t EndpointIp::connectSocket(const struct addrinfo* aip,
setTimeout(listenSocket, connectTimeout);
int result = TRI_connect(listenSocket, (const struct sockaddr*)aip->ai_addr,
(int)aip->ai_addrlen);
aip->ai_addrlen);
if (result != 0) {
pErr = STR_ERROR();

View File

@ -326,15 +326,15 @@ bool ClientConnection::writeClientConnection(void const* buffer, size_t length,
#if defined(__APPLE__)
// MSG_NOSIGNAL not supported on apple platform
int status = TRI_send(_socket, buffer, length, 0);
long status = TRI_send(_socket, buffer, length, 0);
#elif defined(_WIN32)
// MSG_NOSIGNAL not supported on windows platform
int status = TRI_send(_socket, buffer, length, 0);
long status = TRI_send(_socket, buffer, length, 0);
#elif defined(__sun)
// MSG_NOSIGNAL not supported on solaris platform
int status = TRI_send(_socket, buffer, length, 0);
long status = TRI_send(_socket, buffer, length, 0);
#else
int status = TRI_send(_socket, buffer, length, MSG_NOSIGNAL);
long status = TRI_send(_socket, buffer, length, MSG_NOSIGNAL);
#endif
if (status < 0) {