1
0
Fork 0
arangodb/arangod/RocksDBEngine/RocksDBReplicationContext.cpp

216 lines
7.1 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEngine/RocksDBReplicationContext.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
#include "Transaction/StandaloneContext.h"
#include "Transaction/UserTransaction.h"
#include "VocBase/replication-common.h"
#include "VocBase/ticks.h"
using namespace arangodb;
using namespace arangodb::rocksutils;
using namespace arangodb::velocypack;
/// WAL parser, no locking required here, because we have been locked from the
/// outside
class WBReader : public rocksdb::WriteBatch::Handler {
public:
explicit WBReader(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit,
bool includeSystem, uint64_t& lastTick)
: _vocbase(vocbase),
_from(from),
_limit(limit),
_includeSystem(includeSystem),
_lastTick(lastTick) {}
bool shouldHandleKey(const rocksdb::Slice& key) {
if (RocksDBKey::type(key) == RocksDBEntryType::Document) {
uint64_t objectId = RocksDBKey::collectionId(key);
auto mapping = mapObjectToCollection(objectId);
if (mapping.first == _vocbase->id()) {
std::string const collectionName =
_vocbase->collectionName(mapping.second);
if (collectionName.size() == 0) {
return false;
}
if (!_includeSystem && collectionName[0] == '_') {
return false;
}
return true;
}
}
return false;
}
void Put(const rocksdb::Slice& key,
const rocksdb::Slice& /*value*/) override {
if (shouldHandleKey(key)) {
// uint64_t objectId = RocksDBKey::collectionId(key);
// uint64_t revisionId = RocksDBKey::revisionId(key);
}
}
void Delete(const rocksdb::Slice& key) override {
if (shouldHandleKey(key)) {
// uint64_t objectId = RocksDBKey::collectionId(key);
// uint64_t revisionId = RocksDBKey::revisionId(key);
}
}
void SingleDelete(const rocksdb::Slice& key) override {
if (shouldHandleKey(key)) {
// uint64_t objectId = RocksDBKey::collectionId(key);
// uint64_t revisionId = RocksDBKey::revisionId(key);
}
}
private:
TRI_vocbase_t* _vocbase;
uint64_t _from;
size_t& _limit;
bool _includeSystem;
uint64_t& _lastTick;
};
RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber,
uint64_t maxTick)
: Result(errorNumber), _maxTick(maxTick) {}
uint64_t RocksDBReplicationResult::maxTick() const { return _maxTick; }
RocksDBReplicationContext::RocksDBReplicationContext()
: _id(TRI_NewTickServer()),
_lastTick(0),
_trx(),
_collection(nullptr),
_iter() {}
TRI_voc_tick_t RocksDBReplicationContext::id() const { return _id; }
// creates new transaction/snapshot, returns inventory
std::pair<RocksDBReplicationResult, std::shared_ptr<VPackBuilder>>
RocksDBReplicationContext::getInventory(TRI_vocbase_t* vocbase,
bool includeSystem) {
TRI_ASSERT(vocbase != nullptr);
if (_trx.get() != nullptr) {
return std::make_pair(
RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick), nullptr);
}
_trx = createTransaction(vocbase);
auto tick = TRI_CurrentTickServer();
_lastTick = toRocksTransactionState(_trx.get())->sequenceNumber();
std::shared_ptr<VPackBuilder> inventory = vocbase->inventory(
tick, filterCollection, &includeSystem, true, sortCollections);
return std::make_pair(
RocksDBReplicationResult(TRI_ERROR_NOT_YET_IMPLEMENTED, _lastTick),
inventory);
}
// iterates over at most 'limit' documents in the collection specified,
// creating a new iterator if one does not exist for this collection
std::pair<RocksDBReplicationResult, bool> RocksDBReplicationContext::dump(
TRI_vocbase_t* vocbase, std::string const& collectionName, TokenCallback cb,
size_t limit) {
TRI_ASSERT(vocbase != nullptr);
if (_trx.get() == nullptr) {
return std::make_pair(
RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick), false);
}
if ((_collection == nullptr) || _collection->name() != collectionName) {
_collection = vocbase->lookupCollection(collectionName);
if (_collection == nullptr) {
return std::make_pair(
RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick), false);
}
_iter = _collection->getAllIterator(_trx.get(), &_mdr, false);
}
bool hasMore = _iter->next(cb, limit);
return std::make_pair(
RocksDBReplicationResult(TRI_ERROR_NOT_YET_IMPLEMENTED, _lastTick),
hasMore);
}
// iterates over WAL starting at 'from' and returns up to 'limit' documents
// from the corresponding database
RocksDBReplicationResult RocksDBReplicationContext::tail(
TRI_vocbase_t* vocbase, uint64_t from, size_t limit, bool includeSystem,
VPackBuilder& builder) {
return {TRI_ERROR_NOT_YET_IMPLEMENTED, 0};
}
std::unique_ptr<transaction::Methods>
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
double lockTimeout = transaction::Methods::DefaultLockTimeout;
auto ctx = transaction::StandaloneContext::Create(vocbase);
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
ctx, {}, {}, {}, lockTimeout, false, true));
Result res = trx->begin();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
return trx;
}
/// @brief filter a collection based on collection attributes
bool RocksDBReplicationContext::filterCollection(
arangodb::LogicalCollection* collection, void* data) {
bool includeSystem = *((bool*)data);
std::string const collectionName(collection->name());
if (!includeSystem && collectionName[0] == '_') {
// exclude all system collections
return false;
}
if (TRI_ExcludeCollectionReplication(collectionName.c_str(), includeSystem)) {
// collection is excluded from replication
return false;
}
// all other cases should be included
return true;
}
bool RocksDBReplicationContext::sortCollections(
arangodb::LogicalCollection const* l,
arangodb::LogicalCollection const* r) {
if (l->type() != r->type()) {
return l->type() < r->type();
}
std::string const leftName = l->name();
std::string const rightName = r->name();
return strcasecmp(leftName.c_str(), rightName.c_str()) < 0;
}