1
0
Fork 0

Added WAL MARKERS for CreateCollection, RenameCollection and DropIndex, also adjested WalMarkerType constructors.

This commit is contained in:
Michael Hackstein 2017-04-26 18:32:28 +02:00
parent 0383a5cf4d
commit e423a5fde1
5 changed files with 114 additions and 54 deletions

View File

@ -38,6 +38,7 @@
#include "RocksDBEngine/RocksDBCounterManager.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "RocksDBEngine/RocksDBToken.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
@ -380,7 +381,9 @@ bool RocksDBCollection::dropIndex(TRI_idx_iid_t iid) {
int res =
static_cast<RocksDBEngine*>(engine)->writeCreateCollectionMarker(
_logicalCollection->vocbase()->id(), _logicalCollection->cid(),
builder.slice());
builder.slice(),
RocksDBLogValue::IndexDrop(_logicalCollection->vocbase()->id(),
_logicalCollection->cid(), iid));
return res == TRI_ERROR_NO_ERROR;
}
@ -467,9 +470,8 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// don't do anything beyond deleting their contents
for (std::shared_ptr<Index> const& index : _indexes) {
RocksDBIndex* rindex = static_cast<RocksDBIndex*>(index.get());
RocksDBKeyBounds indexBounds =
RocksDBKeyBounds::Empty();
RocksDBKeyBounds indexBounds = RocksDBKeyBounds::Empty();
switch (rindex->type()) {
case RocksDBIndex::TRI_IDX_TYPE_PRIMARY_INDEX:
indexBounds = RocksDBKeyBounds::PrimaryIndex(rindex->objectId());

View File

@ -47,8 +47,9 @@
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBIndexFactory.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBRestHandlers.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBReplicationManager.h"
#include "RocksDBEngine/RocksDBRestHandlers.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "RocksDBEngine/RocksDBTransactionContextData.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
@ -77,7 +78,7 @@ using namespace arangodb::application_features;
using namespace arangodb::options;
namespace arangodb {
std::string const RocksDBEngine::EngineName("rocksdb");
std::string const RocksDBEngine::FeatureName("RocksDBEngine");
@ -160,7 +161,7 @@ void RocksDBEngine::start() {
// options imported set by RocksDBOptionFeature
auto* opts = ApplicationServer::getFeature<arangodb::RocksDBOptionFeature>(
"RocksDBOption");
_options.write_buffer_size = static_cast<size_t>(opts->_writeBufferSize);
_options.max_write_buffer_number =
static_cast<int>(opts->_maxWriteBufferNumber);
@ -176,10 +177,11 @@ void RocksDBEngine::start() {
_options.use_direct_reads = opts->_useDirectReads;
_options.use_direct_writes = opts->_useDirectWrites;
if (opts->_skipCorrupted) {
_options.wal_recovery_mode = rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords;
_options.wal_recovery_mode =
rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords;
} else {
_options.wal_recovery_mode = rocksdb::WALRecoveryMode::kPointInTimeRecovery;
}
}
_options.base_background_compactions =
static_cast<int>(opts->_baseBackgroundCompactions);
@ -217,8 +219,9 @@ void RocksDBEngine::start() {
TRI_ASSERT(_db != nullptr);
_counterManager.reset(new RocksDBCounterManager(_db));
_replicationManager.reset(new RocksDBReplicationManager());
_backgroundThread.reset(new RocksDBBackgroundThread(this, counter_sync_seconds));
_backgroundThread.reset(
new RocksDBBackgroundThread(this, counter_sync_seconds));
if (!_backgroundThread->start()) {
LOG_TOPIC(ERR, Logger::ENGINES)
<< "could not start rocksdb counter manager";
@ -512,12 +515,18 @@ int RocksDBEngine::writeCreateDatabaseMarker(TRI_voc_tick_t id,
int RocksDBEngine::writeCreateCollectionMarker(TRI_voc_tick_t databaseId,
TRI_voc_cid_t cid,
VPackSlice const& slice) {
VPackSlice const& slice,
RocksDBLogValue&& logValue) {
auto key = RocksDBKey::Collection(databaseId, cid);
auto value = RocksDBValue::Collection(slice);
rocksdb::WriteOptions options; // TODO: check which options would make sense
rocksdb::Status res = _db->Put(options, key.string(), value.string());
// Write marker + key into RocksDB inside one batch
rocksdb::WriteBatch batch;
batch.PutLogData(logValue.slice());
batch.Put(key.string(), value.string());
rocksdb::Status res = _db->Write(options, &batch);
auto result = rocksutils::convertStatus(res);
return result.errorNumber();
}
@ -561,7 +570,7 @@ std::string RocksDBEngine::createCollection(
VPackBuilder builder = parameters->toVelocyPackIgnore(
{"path", "statusString"}, /*translate cid*/ true,
/*for persistence*/ true);
// should cause counter to be added to the manager
// in case the collection is created for the first time
VPackSlice objectId = builder.slice().get("objectId");
@ -569,8 +578,10 @@ std::string RocksDBEngine::createCollection(
RocksDBCounterManager::CounterAdjustment adj;
_counterManager->updateCounter(objectId.getUInt(), adj);
}
int res = writeCreateCollectionMarker(vocbase->id(), id, builder.slice());
int res = writeCreateCollectionMarker(
vocbase->id(), id, builder.slice(),
RocksDBLogValue::CollectionCreate(vocbase->id()));
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
@ -596,7 +607,9 @@ arangodb::Result RocksDBEngine::persistCollection(
TRI_ASSERT(cid != 0);
TRI_UpdateTickServer(static_cast<TRI_voc_tick_t>(cid));
int res = writeCreateCollectionMarker(vocbase->id(), cid, slice);
int res = writeCreateCollectionMarker(
vocbase->id(), cid, slice,
RocksDBLogValue::CollectionCreate(vocbase->id()));
result.reset(res);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
@ -670,8 +683,10 @@ arangodb::Result RocksDBEngine::renameCollection(
std::string const& oldName) {
VPackBuilder builder =
collection->toVelocyPackIgnore({"path", "statusString"}, true, true);
int res = writeCreateCollectionMarker(vocbase->id(), collection->cid(),
builder.slice());
int res = writeCreateCollectionMarker(
vocbase->id(), collection->cid(), builder.slice(),
RocksDBLogValue::CollectionRename(vocbase->id(), collection->cid(),
collection->name()));
return arangodb::Result(res);
}
@ -853,7 +868,8 @@ std::pair<TRI_voc_tick_t, TRI_voc_cid_t> RocksDBEngine::mapObjectToCollection(
return it->second;
}
Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder){
Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase,
VPackBuilder& builder) {
Result res;
rocksdb::Status status = _db->GetBaseDB()->SyncWAL();
@ -866,7 +882,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
rocksdb::SequenceNumber lastTick = _db->GetLatestSequenceNumber();
// "state" part
builder.add("state", VPackValue(VPackValueType::Object)); //open
builder.add("state", VPackValue(VPackValueType::Object)); // open
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(lastTick)));
builder.add("lastUncommittedLogTick", VPackValue(std::to_string(lastTick)));
@ -875,14 +891,14 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
builder.close();
// "server" part
builder.add("server", VPackValue(VPackValueType::Object)); //open
builder.add("server", VPackValue(VPackValueType::Object)); // open
builder.add("version", VPackValue(ARANGODB_VERSION));
builder.add("serverId", VPackValue(std::to_string(ServerIdFeature::getId())));
builder.close();
// "clients" part
builder.add("clients", VPackValue(VPackValueType::Array)); //open
if(vocbase != nullptr) { //add clients
builder.add("clients", VPackValue(VPackValueType::Array)); // open
if (vocbase != nullptr) { // add clients
auto allClients = vocbase->getReplicationClients();
for (auto& it : allClients) {
// One client
@ -893,7 +909,8 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(std::to_string(std::get<2>(it))));
builder.add("lastServedTick",
VPackValue(std::to_string(std::get<2>(it))));
builder.close();
}

View File

@ -39,10 +39,11 @@
namespace arangodb {
class PhysicalCollection;
class PhysicalView;
class RocksDBBackgroundThread;
class RocksDBComparator;
class RocksDBCounterManager;
class RocksDBReplicationManager;
class RocksDBBackgroundThread;
class RocksDBLogValue;
class TransactionCollection;
class TransactionState;
@ -243,7 +244,7 @@ class RocksDBEngine final : public StorageEngine {
RocksDBComparator* cmp() const { return _cmp.get(); }
int writeCreateCollectionMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t id,
VPackSlice const& slice);
VPackSlice const& slice, RocksDBLogValue&& logValue);
void addCollectionMapping(uint64_t, TRI_voc_tick_t, TRI_voc_cid_t);
std::pair<TRI_voc_tick_t, TRI_voc_cid_t> mapObjectToCollection(uint64_t);

View File

@ -34,8 +34,8 @@ RocksDBLogValue RocksDBLogValue::BeginTransaction(TRI_voc_tick_t dbid, TRI_voc_t
return RocksDBLogValue(RocksDBLogType::BeginTransaction, dbid, tid);
}
RocksDBLogValue RocksDBLogValue::DatabaseCreate(TRI_voc_tick_t dbid) {
return RocksDBLogValue(RocksDBLogType::BeginTransaction, dbid);
RocksDBLogValue RocksDBLogValue::DatabaseCreate() {
return RocksDBLogValue(RocksDBLogType::DatabaseCreate);
}
RocksDBLogValue RocksDBLogValue::DatabaseDrop(TRI_voc_tick_t dbid) {
@ -46,24 +46,24 @@ RocksDBLogValue RocksDBLogValue::CollectionCreate(TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::CollectionCreate, cid);
}
RocksDBLogValue RocksDBLogValue::CollectionDrop(TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::CollectionDrop, cid);
RocksDBLogValue RocksDBLogValue::CollectionDrop(TRI_voc_tick_t dbid, TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::CollectionDrop, dbid, cid);
}
RocksDBLogValue RocksDBLogValue::CollectionRename(TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::CollectionRename, cid);
RocksDBLogValue RocksDBLogValue::CollectionRename(TRI_voc_tick_t dbid, TRI_voc_cid_t cid, std::string const& newName) {
return RocksDBLogValue(RocksDBLogType::CollectionRename, dbid, cid, newName);
}
RocksDBLogValue RocksDBLogValue::CollectionChange(TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::CollectionChange, cid);
RocksDBLogValue RocksDBLogValue::CollectionChange(TRI_voc_tick_t dbid, TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::CollectionChange, dbid, cid);
}
RocksDBLogValue RocksDBLogValue::IndexCreate(TRI_voc_cid_t cid, TRI_idx_iid_t iid) {
return RocksDBLogValue(RocksDBLogType::IndexCreate, cid, iid);
RocksDBLogValue RocksDBLogValue::IndexCreate(TRI_voc_tick_t dbid, TRI_voc_cid_t cid, TRI_idx_iid_t iid) {
return RocksDBLogValue(RocksDBLogType::IndexCreate, dbid, cid, iid);
}
RocksDBLogValue RocksDBLogValue::IndexDrop(TRI_voc_cid_t cid, TRI_idx_iid_t iid) {
return RocksDBLogValue(RocksDBLogType::IndexDrop, cid, iid);
RocksDBLogValue RocksDBLogValue::IndexDrop(TRI_voc_tick_t dbid, TRI_voc_cid_t cid, TRI_idx_iid_t iid) {
return RocksDBLogValue(RocksDBLogType::IndexDrop, dbid, cid, iid);
}
@ -79,15 +79,21 @@ RocksDBLogValue RocksDBLogValue::DocumentRemove(arangodb::StringRef const& key)
return RocksDBLogValue(RocksDBLogType::DocumentRemove, key);
}
RocksDBLogValue::RocksDBLogValue(RocksDBLogType type) : _buffer() {
switch (type) {
case RocksDBLogType::DatabaseCreate:
_buffer.reserve(sizeof(RocksDBLogType));
_buffer += static_cast<char>(type);
break;
default:
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
}
}
RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t val) : _buffer() {
switch (type) {
case RocksDBLogType::DatabaseCreate:
case RocksDBLogType::DatabaseDrop:
case RocksDBLogType::CollectionCreate:
case RocksDBLogType::CollectionDrop:
case RocksDBLogType::CollectionRename:
case RocksDBLogType::CollectionChange:{
case RocksDBLogType::CollectionCreate: {
_buffer.reserve(sizeof(RocksDBLogType) + sizeof(uint64_t));
_buffer += static_cast<char>(type);
uint64ToPersistent(_buffer, val);// database or collection ID
@ -104,7 +110,9 @@ RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t dbId, uint64_t ci
: _buffer() {
switch (type) {
case RocksDBLogType::BeginTransaction: {
case RocksDBLogType::BeginTransaction:
case RocksDBLogType::CollectionChange:
case RocksDBLogType::CollectionDrop: {
_buffer.reserve(sizeof(RocksDBLogType) + sizeof(uint64_t) * 2);
_buffer += static_cast<char>(type);
uint64ToPersistent(_buffer, dbId);
@ -117,6 +125,35 @@ RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t dbId, uint64_t ci
}
}
RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t dbId, uint64_t cid, uint64_t iid)
: _buffer() {
switch (type) {
case RocksDBLogType::IndexCreate:
case RocksDBLogType::IndexDrop: {
_buffer.reserve(sizeof(RocksDBLogType) + sizeof(uint64_t) * 3);
_buffer += static_cast<char>(type);
uint64ToPersistent(_buffer, dbId);
uint64ToPersistent(_buffer, cid);
uint64ToPersistent(_buffer, iid);
break;
}
default:
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
}
}
RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t dbId, uint64_t cid, std::string const& data)
: _buffer() {
if (type != RocksDBLogType::CollectionRename) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
}
_buffer.reserve(sizeof(RocksDBLogType) + sizeof(uint64_t) * 2 + data.length());
_buffer += static_cast<char>(type);
uint64ToPersistent(_buffer, dbId);
uint64ToPersistent(_buffer, cid);
_buffer.append(data.data(), data.length()); // primary key
}
RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, StringRef const& data)
: _buffer() {
switch (type) {

View File

@ -40,16 +40,16 @@ class RocksDBLogValue {
// parameter in an appropriate format into the underlying string buffer.
//----------------------------------------------------------------------------
static RocksDBLogValue BeginTransaction(TRI_voc_tick_t, TRI_voc_tid_t);
static RocksDBLogValue DatabaseCreate(TRI_voc_tick_t);
static RocksDBLogValue DatabaseDrop(TRI_voc_tick_t);
static RocksDBLogValue CollectionCreate(TRI_voc_cid_t);
static RocksDBLogValue CollectionDrop(TRI_voc_cid_t);
static RocksDBLogValue CollectionRename(TRI_voc_cid_t);
static RocksDBLogValue CollectionChange(TRI_voc_cid_t);
static RocksDBLogValue BeginTransaction(TRI_voc_tick_t vocbaseId, TRI_voc_tid_t trxId);
static RocksDBLogValue DatabaseCreate();
static RocksDBLogValue DatabaseDrop(TRI_voc_tick_t vocbaseId);
static RocksDBLogValue CollectionCreate(TRI_voc_tick_t vocbaseId);
static RocksDBLogValue CollectionDrop(TRI_voc_tick_t vocbaseId, TRI_voc_cid_t cid);
static RocksDBLogValue CollectionRename(TRI_voc_tick_t vocbaseId, TRI_voc_cid_t cid, std::string const& newName);
static RocksDBLogValue CollectionChange(TRI_voc_tick_t vocbaseId, TRI_voc_cid_t cid);
static RocksDBLogValue IndexCreate(TRI_voc_cid_t, TRI_idx_iid_t);
static RocksDBLogValue IndexDrop(TRI_voc_cid_t, TRI_idx_iid_t);
static RocksDBLogValue IndexCreate(TRI_voc_tick_t vocbaseId, TRI_voc_cid_t cid, TRI_idx_iid_t indexId);
static RocksDBLogValue IndexDrop(TRI_voc_tick_t vocbaseId, TRI_voc_cid_t cid, TRI_idx_iid_t indexId);
static RocksDBLogValue ViewCreate(TRI_voc_cid_t, TRI_idx_iid_t);
static RocksDBLogValue ViewDrop(TRI_voc_cid_t, TRI_idx_iid_t);
@ -85,8 +85,11 @@ class RocksDBLogValue {
rocksdb::Slice slice() const { return rocksdb::Slice(_buffer); }
private:
explicit RocksDBLogValue(RocksDBLogType type);
RocksDBLogValue(RocksDBLogType type, uint64_t);
RocksDBLogValue(RocksDBLogType type, uint64_t, uint64_t);
RocksDBLogValue(RocksDBLogType type, uint64_t, uint64_t, uint64_t);
RocksDBLogValue(RocksDBLogType type, uint64_t, uint64_t, std::string const& data);
RocksDBLogValue(RocksDBLogType type, StringRef const& data);
private: