mirror of https://gitee.com/bigwinds/arangodb
Split replication context/status/tailing.
This commit is contained in:
parent
2cf8cc79e8
commit
4ec2a40cd5
|
@ -16,8 +16,10 @@ set(ROCKSDB_SOURCES
|
|||
RocksDBEngine/RocksDBKey.cpp
|
||||
RocksDBEngine/RocksDBKeyBounds.cpp
|
||||
RocksDBEngine/RocksDBPrimaryIndex.cpp
|
||||
RocksDBEngine/RocksDBReplicationCommon.cpp
|
||||
RocksDBEngine/RocksDBReplicationContext.cpp
|
||||
RocksDBEngine/RocksDBReplicationManager.cpp
|
||||
RocksDBEngine/RocksDBReplicationTailing.cpp
|
||||
RocksDBEngine/RocksDBRestExportHandler.cpp
|
||||
RocksDBEngine/RocksDBRestHandlers.cpp
|
||||
RocksDBEngine/RocksDBRestReplicationHandler.cpp
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Daniel H. Larkin
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBEngine/RocksDBReplicationCommon.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber,
|
||||
uint64_t maxTick)
|
||||
: Result(errorNumber), _maxTick(maxTick) {}
|
||||
|
||||
uint64_t RocksDBReplicationResult::maxTick() const { return _maxTick; }
|
|
@ -0,0 +1,43 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Daniel H. Larkin
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGO_ROCKSDB_ROCKSDB_REPLICATION_COMMON_H
|
||||
#define ARANGO_ROCKSDB_ROCKSDB_REPLICATION_COMMON_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Result.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class RocksDBReplicationResult : public Result {
|
||||
public:
|
||||
RocksDBReplicationResult(int, uint64_t);
|
||||
uint64_t maxTick() const;
|
||||
|
||||
private:
|
||||
uint64_t _maxTick;
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
|
@ -30,159 +30,10 @@
|
|||
#include "VocBase/replication-common.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include <rocksdb/utilities/write_batch_with_index.h>
|
||||
#include <rocksdb/write_batch.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rocksutils;
|
||||
using namespace arangodb::velocypack;
|
||||
|
||||
/// WAL parser, no locking required here, because we have been locked from the
|
||||
/// outside
|
||||
class WBReader : public rocksdb::WriteBatch::Handler {
|
||||
public:
|
||||
explicit WBReader(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit,
|
||||
bool includeSystem, VPackBuilder& builder)
|
||||
: _vocbase(vocbase),
|
||||
_from(from),
|
||||
_limit(limit),
|
||||
_includeSystem(includeSystem),
|
||||
_builder(builder) {}
|
||||
|
||||
void Put(rocksdb::Slice const& key, rocksdb::Slice const& value) override {
|
||||
if (shouldHandleKey(key)) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
_builder.openObject();
|
||||
switch (RocksDBKey::type(key)) {
|
||||
case RocksDBEntryType::Collection: {
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_COLLECTION_CREATE)));
|
||||
}
|
||||
case RocksDBEntryType::Document: {
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_DOCUMENT)));
|
||||
// TODO: add transaction id?
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break; // shouldn't get here?
|
||||
}
|
||||
|
||||
auto containers = getContainerIds(key);
|
||||
_builder.add("database", VPackValue(containers.first));
|
||||
_builder.add("cid", VPackValue(containers.second));
|
||||
_builder.add("data", RocksDBValue::data(value));
|
||||
|
||||
_builder.close();
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
_limit--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Delete(rocksdb::Slice const& key) override { handleDeletion(key); }
|
||||
|
||||
void SingleDelete(rocksdb::Slice const& key) override { handleDeletion(key); }
|
||||
|
||||
private:
|
||||
bool shouldHandleKey(rocksdb::Slice const& key) {
|
||||
if (_limit == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (RocksDBKey::type(key)) {
|
||||
case RocksDBEntryType::Collection:
|
||||
case RocksDBEntryType::Document: {
|
||||
return fromEligibleCollection(key);
|
||||
}
|
||||
case RocksDBEntryType::View: // should handle these eventually?
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void handleDeletion(rocksdb::Slice const& key) {
|
||||
if (shouldHandleKey(key)) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
_builder.openObject();
|
||||
switch (RocksDBKey::type(key)) {
|
||||
case RocksDBEntryType::Collection: {
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_COLLECTION_DROP)));
|
||||
auto containers = getContainerIds(key);
|
||||
_builder.add("database", VPackValue(containers.first));
|
||||
_builder.add("cid", VPackValue(containers.second));
|
||||
break;
|
||||
}
|
||||
case RocksDBEntryType::Document: {
|
||||
uint64_t revisionId = RocksDBKey::revisionId(key);
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
|
||||
// TODO: add transaction id?
|
||||
auto containers = getContainerIds(key);
|
||||
_builder.add("database", VPackValue(containers.first));
|
||||
_builder.add("cid", VPackValue(containers.second));
|
||||
_builder.add("data", VPackValue(VPackValueType::Object));
|
||||
_builder.add(StaticStrings::RevString,
|
||||
VPackValue(std::to_string(revisionId)));
|
||||
_builder.close();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break; // shouldn't get here?
|
||||
}
|
||||
_builder.close();
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
_limit--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<TRI_voc_tick_t, TRI_voc_cid_t> getContainerIds(
|
||||
rocksdb::Slice const& key) {
|
||||
uint64_t objectId = RocksDBKey::collectionId(key);
|
||||
return mapObjectToCollection(objectId);
|
||||
}
|
||||
|
||||
bool fromEligibleCollection(rocksdb::Slice const& key) {
|
||||
auto mapping = getContainerIds(key);
|
||||
if (mapping.first == _vocbase->id()) {
|
||||
std::string const collectionName =
|
||||
_vocbase->collectionName(mapping.second);
|
||||
|
||||
if (collectionName.size() == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_includeSystem && collectionName[0] == '_') {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
TRI_vocbase_t* _vocbase;
|
||||
uint64_t _from;
|
||||
size_t& _limit;
|
||||
bool _includeSystem;
|
||||
VPackBuilder& _builder;
|
||||
};
|
||||
|
||||
RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber,
|
||||
uint64_t maxTick)
|
||||
: Result(errorNumber), _maxTick(maxTick) {}
|
||||
|
||||
uint64_t RocksDBReplicationResult::maxTick() const { return _maxTick; }
|
||||
|
||||
RocksDBReplicationContext::RocksDBReplicationContext()
|
||||
: _id(TRI_NewTickServer()),
|
||||
_lastTick(0),
|
||||
|
@ -190,6 +41,10 @@ RocksDBReplicationContext::RocksDBReplicationContext()
|
|||
_collection(nullptr),
|
||||
_iter() {}
|
||||
|
||||
RocksDBReplicationContext::~RocksDBReplicationContext() {
|
||||
releaseDumpingResources();
|
||||
}
|
||||
|
||||
TRI_voc_tick_t RocksDBReplicationContext::id() const { return _id; }
|
||||
|
||||
uint64_t RocksDBReplicationContext::lastTick() const { return _lastTick; }
|
||||
|
@ -242,42 +97,6 @@ std::pair<RocksDBReplicationResult, bool> RocksDBReplicationContext::dump(
|
|||
hasMore);
|
||||
}
|
||||
|
||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||
// from the corresponding database
|
||||
RocksDBReplicationResult RocksDBReplicationContext::tail(
|
||||
TRI_vocbase_t* vocbase, uint64_t from, size_t limit, bool includeSystem,
|
||||
VPackBuilder& builder) {
|
||||
releaseDumpingResources();
|
||||
std::unique_ptr<WBReader> handler(
|
||||
new WBReader(vocbase, from, limit, includeSystem, builder));
|
||||
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
|
||||
rocksdb::Status s = static_cast<rocksdb::DB*>(globalRocksDB())
|
||||
->GetUpdatesSince(from, &iterator);
|
||||
if (!s.ok()) { // TODO do something?
|
||||
auto converted = convertStatus(s);
|
||||
return {converted.errorNumber(), _lastTick};
|
||||
}
|
||||
|
||||
while (iterator->Valid() && limit > 0) {
|
||||
s = iterator->status();
|
||||
if (s.ok()) {
|
||||
rocksdb::BatchResult batch = iterator->GetBatch();
|
||||
_lastTick = batch.sequence;
|
||||
s = batch.writeBatchPtr->Iterate(handler.get());
|
||||
}
|
||||
if (!s.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan";
|
||||
LOG_TOPIC(ERR, Logger::ENGINES) << iterator->status().getState();
|
||||
auto converted = convertStatus(s);
|
||||
return {converted.errorNumber(), _lastTick};
|
||||
}
|
||||
|
||||
iterator->Next();
|
||||
}
|
||||
|
||||
return {TRI_ERROR_NO_ERROR, _lastTick};
|
||||
}
|
||||
|
||||
double RocksDBReplicationContext::expires() const { return _expires; }
|
||||
|
||||
bool RocksDBReplicationContext::isDeleted() const { return _isDeleted; }
|
||||
|
|
|
@ -25,8 +25,8 @@
|
|||
#define ARANGO_ROCKSDB_ROCKSDB_REPLICATION_CONTEXT_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Result.h"
|
||||
#include "Indexes/IndexIterator.h"
|
||||
#include "RocksDBEngine/RocksDBReplicationCommon.h"
|
||||
#include "RocksDBEngine/RocksDBToken.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
@ -36,15 +36,6 @@
|
|||
|
||||
namespace arangodb {
|
||||
|
||||
class RocksDBReplicationResult : public Result {
|
||||
public:
|
||||
RocksDBReplicationResult(int, uint64_t);
|
||||
uint64_t maxTick() const;
|
||||
|
||||
private:
|
||||
uint64_t _maxTick;
|
||||
};
|
||||
|
||||
class RocksDBReplicationContext {
|
||||
private:
|
||||
typedef std::function<void(DocumentIdentifierToken const& token)>
|
||||
|
@ -52,6 +43,7 @@ class RocksDBReplicationContext {
|
|||
|
||||
public:
|
||||
RocksDBReplicationContext();
|
||||
~RocksDBReplicationContext();
|
||||
|
||||
TRI_voc_tick_t id() const;
|
||||
uint64_t lastTick() const;
|
||||
|
@ -66,12 +58,6 @@ class RocksDBReplicationContext {
|
|||
TRI_vocbase_t* vocbase, std::string const& collectionName,
|
||||
TokenCallback cb, size_t limit);
|
||||
|
||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||
// from the corresponding database; releases dumping resources
|
||||
RocksDBReplicationResult tail(TRI_vocbase_t* vocbase, uint64_t from,
|
||||
size_t limit, bool includeSystem,
|
||||
VPackBuilder& builder);
|
||||
|
||||
double expires() const;
|
||||
bool isDeleted() const;
|
||||
void deleted();
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Daniel H. Larkin
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBEngine/RocksDBReplicationTailing.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "VocBase/replication-common.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include <rocksdb/utilities/write_batch_with_index.h>
|
||||
#include <rocksdb/write_batch.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rocksutils;
|
||||
using namespace arangodb::velocypack;
|
||||
|
||||
/// WAL parser, no locking required here, because we have been locked from the
|
||||
/// outside
|
||||
class WBReader : public rocksdb::WriteBatch::Handler {
|
||||
public:
|
||||
explicit WBReader(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit,
|
||||
bool includeSystem, VPackBuilder& builder)
|
||||
: _vocbase(vocbase),
|
||||
_from(from),
|
||||
_limit(limit),
|
||||
_includeSystem(includeSystem),
|
||||
_builder(builder) {}
|
||||
|
||||
void Put(rocksdb::Slice const& key, rocksdb::Slice const& value) override {
|
||||
if (shouldHandleKey(key)) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
_builder.openObject();
|
||||
switch (RocksDBKey::type(key)) {
|
||||
case RocksDBEntryType::Collection: {
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_COLLECTION_CREATE)));
|
||||
}
|
||||
case RocksDBEntryType::Document: {
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_DOCUMENT)));
|
||||
// TODO: add transaction id?
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break; // shouldn't get here?
|
||||
}
|
||||
|
||||
auto containers = getContainerIds(key);
|
||||
_builder.add("database", VPackValue(containers.first));
|
||||
_builder.add("cid", VPackValue(containers.second));
|
||||
_builder.add("data", RocksDBValue::data(value));
|
||||
|
||||
_builder.close();
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
_limit--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Delete(rocksdb::Slice const& key) override { handleDeletion(key); }
|
||||
|
||||
void SingleDelete(rocksdb::Slice const& key) override { handleDeletion(key); }
|
||||
|
||||
private:
|
||||
bool shouldHandleKey(rocksdb::Slice const& key) {
|
||||
if (_limit == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (RocksDBKey::type(key)) {
|
||||
case RocksDBEntryType::Collection:
|
||||
case RocksDBEntryType::Document: {
|
||||
return fromEligibleCollection(key);
|
||||
}
|
||||
case RocksDBEntryType::View: // should handle these eventually?
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void handleDeletion(rocksdb::Slice const& key) {
|
||||
if (shouldHandleKey(key)) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
_builder.openObject();
|
||||
switch (RocksDBKey::type(key)) {
|
||||
case RocksDBEntryType::Collection: {
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_COLLECTION_DROP)));
|
||||
auto containers = getContainerIds(key);
|
||||
_builder.add("database", VPackValue(containers.first));
|
||||
_builder.add("cid", VPackValue(containers.second));
|
||||
break;
|
||||
}
|
||||
case RocksDBEntryType::Document: {
|
||||
uint64_t revisionId = RocksDBKey::revisionId(key);
|
||||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
|
||||
// TODO: add transaction id?
|
||||
auto containers = getContainerIds(key);
|
||||
_builder.add("database", VPackValue(containers.first));
|
||||
_builder.add("cid", VPackValue(containers.second));
|
||||
_builder.add("data", VPackValue(VPackValueType::Object));
|
||||
_builder.add(StaticStrings::RevString,
|
||||
VPackValue(std::to_string(revisionId)));
|
||||
_builder.close();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break; // shouldn't get here?
|
||||
}
|
||||
_builder.close();
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
_limit--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<TRI_voc_tick_t, TRI_voc_cid_t> getContainerIds(
|
||||
rocksdb::Slice const& key) {
|
||||
uint64_t objectId = RocksDBKey::collectionId(key);
|
||||
return mapObjectToCollection(objectId);
|
||||
}
|
||||
|
||||
bool fromEligibleCollection(rocksdb::Slice const& key) {
|
||||
auto mapping = getContainerIds(key);
|
||||
if (mapping.first == _vocbase->id()) {
|
||||
std::string const collectionName =
|
||||
_vocbase->collectionName(mapping.second);
|
||||
|
||||
if (collectionName.size() == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_includeSystem && collectionName[0] == '_') {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
TRI_vocbase_t* _vocbase;
|
||||
uint64_t _from;
|
||||
size_t& _limit;
|
||||
bool _includeSystem;
|
||||
VPackBuilder& _builder;
|
||||
};
|
||||
|
||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||
// from the corresponding database
|
||||
RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
|
||||
uint64_t from, size_t limit,
|
||||
bool includeSystem,
|
||||
VPackBuilder& builder) {
|
||||
uint64_t lastTick = from;
|
||||
std::unique_ptr<WBReader> handler(
|
||||
new WBReader(vocbase, from, limit, includeSystem, builder));
|
||||
std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
|
||||
rocksdb::Status s = static_cast<rocksdb::DB*>(globalRocksDB())
|
||||
->GetUpdatesSince(from, &iterator);
|
||||
if (!s.ok()) { // TODO do something?
|
||||
auto converted = convertStatus(s);
|
||||
return {converted.errorNumber(), lastTick};
|
||||
}
|
||||
|
||||
while (iterator->Valid() && limit > 0) {
|
||||
s = iterator->status();
|
||||
if (s.ok()) {
|
||||
rocksdb::BatchResult batch = iterator->GetBatch();
|
||||
lastTick = batch.sequence;
|
||||
s = batch.writeBatchPtr->Iterate(handler.get());
|
||||
}
|
||||
if (!s.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan";
|
||||
LOG_TOPIC(ERR, Logger::ENGINES) << iterator->status().getState();
|
||||
auto converted = convertStatus(s);
|
||||
return {converted.errorNumber(), lastTick};
|
||||
}
|
||||
|
||||
iterator->Next();
|
||||
}
|
||||
|
||||
return {TRI_ERROR_NO_ERROR, lastTick};
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Daniel H. Larkin
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGO_ROCKSDB_ROCKSDB_REPLICATION_TAILING_H
|
||||
#define ARANGO_ROCKSDB_ROCKSDB_REPLICATION_TAILING_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "RocksDBEngine/RocksDBReplicationCommon.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
namespace arangodb {
|
||||
namespace rocksutils {
|
||||
|
||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||
// from the corresponding database; releases dumping resources
|
||||
RocksDBReplicationResult tailWal(TRI_vocbase_t* vocbase, uint64_t from,
|
||||
size_t limit, bool includeSystem,
|
||||
VPackBuilder& builder);
|
||||
|
||||
} // namespace rocksutils
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue