mirror of https://gitee.com/bigwinds/arangodb
cleanup replication contexts (#8630)
This commit is contained in:
parent
ef7e6ec85c
commit
b6df220821
|
@ -109,6 +109,8 @@ class ClusterEngine final : public StorageEngine {
|
||||||
return std::string(); // no path to be returned here
|
return std::string(); // no path to be returned here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cleanupReplicationContexts() override {}
|
||||||
|
|
||||||
velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
||||||
int& status) override;
|
int& status) override;
|
||||||
velocypack::Builder getReplicationApplierConfiguration(int& status) override;
|
velocypack::Builder getReplicationApplierConfiguration(int& status) override;
|
||||||
|
|
|
@ -21,22 +21,6 @@
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include "ClusterRestReplicationHandler.h"
|
#include "ClusterRestReplicationHandler.h"
|
||||||
#include "Basics/StaticStrings.h"
|
|
||||||
#include "Basics/VPackStringBufferAdapter.h"
|
|
||||||
#include "Basics/VelocyPackHelper.h"
|
|
||||||
#include "Logger/Logger.h"
|
|
||||||
#include "Replication/InitialSyncer.h"
|
|
||||||
#include "RestServer/DatabaseFeature.h"
|
|
||||||
#include "StorageEngine/EngineSelectorFeature.h"
|
|
||||||
#include "StorageEngine/StorageEngine.h"
|
|
||||||
#include "Transaction/StandaloneContext.h"
|
|
||||||
#include "VocBase/LogicalCollection.h"
|
|
||||||
#include "VocBase/ticks.h"
|
|
||||||
|
|
||||||
#include <velocypack/Builder.h>
|
|
||||||
#include <velocypack/Iterator.h>
|
|
||||||
#include <velocypack/Slice.h>
|
|
||||||
#include <velocypack/velocypack-aliases.h>
|
|
||||||
|
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
using namespace arangodb::basics;
|
using namespace arangodb::basics;
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
|
|
||||||
#include "RestHandler/RestReplicationHandler.h"
|
#include "RestHandler/RestReplicationHandler.h"
|
||||||
|
|
||||||
#include "RocksDBEngine/RocksDBReplicationManager.h"
|
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
|
|
||||||
/// @brief replication request handler
|
/// @brief replication request handler
|
||||||
|
|
|
@ -49,7 +49,7 @@ MMFilesCollectionExport::MMFilesCollectionExport(TRI_vocbase_t& vocbase,
|
||||||
_restrictions(restrictions) {
|
_restrictions(restrictions) {
|
||||||
// prevent the collection from being unloaded while the export is ongoing
|
// prevent the collection from being unloaded while the export is ongoing
|
||||||
// this may throw
|
// this may throw
|
||||||
_guard.reset(new arangodb::CollectionGuard(&vocbase, _name.c_str(), false));
|
_guard.reset(new arangodb::CollectionGuard(&vocbase, _name));
|
||||||
|
|
||||||
_collection = _guard->collection();
|
_collection = _guard->collection();
|
||||||
TRI_ASSERT(_collection != nullptr);
|
TRI_ASSERT(_collection != nullptr);
|
||||||
|
|
|
@ -86,6 +86,8 @@ class MMFilesEngine final : public StorageEngine {
|
||||||
bool supportsDfdb() const override { return true; }
|
bool supportsDfdb() const override { return true; }
|
||||||
|
|
||||||
bool useRawDocumentPointers() override { return true; }
|
bool useRawDocumentPointers() override { return true; }
|
||||||
|
|
||||||
|
void cleanupReplicationContexts() override {}
|
||||||
|
|
||||||
velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
||||||
int& status) override;
|
int& status) override;
|
||||||
|
|
|
@ -38,7 +38,6 @@
|
||||||
#include "StorageEngine/EngineSelectorFeature.h"
|
#include "StorageEngine/EngineSelectorFeature.h"
|
||||||
#include "StorageEngine/StorageEngine.h"
|
#include "StorageEngine/StorageEngine.h"
|
||||||
#include "Transaction/Hints.h"
|
#include "Transaction/Hints.h"
|
||||||
#include "Utils/CollectionGuard.h"
|
|
||||||
#include "VocBase/LogicalCollection.h"
|
#include "VocBase/LogicalCollection.h"
|
||||||
#include "VocBase/voc-types.h"
|
#include "VocBase/voc-types.h"
|
||||||
#include "VocBase/vocbase.h"
|
#include "VocBase/vocbase.h"
|
||||||
|
|
|
@ -396,10 +396,12 @@ void DatabaseFeature::stop() {
|
||||||
p.maxResultsSize = 0;
|
p.maxResultsSize = 0;
|
||||||
p.includeSystem = false;
|
p.includeSystem = false;
|
||||||
p.showBindVars = false;
|
p.showBindVars = false;
|
||||||
|
|
||||||
|
|
||||||
arangodb::aql::QueryCache::instance()->properties(p);
|
arangodb::aql::QueryCache::instance()->properties(p);
|
||||||
arangodb::aql::QueryCache::instance()->invalidate();
|
arangodb::aql::QueryCache::instance()->invalidate();
|
||||||
|
|
||||||
|
StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||||
|
engine->cleanupReplicationContexts();
|
||||||
|
|
||||||
auto unuser(_databasesProtector.use());
|
auto unuser(_databasesProtector.use());
|
||||||
auto theLists = _databasesLists.load();
|
auto theLists = _databasesLists.load();
|
||||||
|
|
|
@ -51,7 +51,6 @@
|
||||||
#include "StorageEngine/EngineSelectorFeature.h"
|
#include "StorageEngine/EngineSelectorFeature.h"
|
||||||
#include "StorageEngine/StorageEngine.h"
|
#include "StorageEngine/StorageEngine.h"
|
||||||
#include "Transaction/Helpers.h"
|
#include "Transaction/Helpers.h"
|
||||||
#include "Utils/CollectionGuard.h"
|
|
||||||
#include "Utils/CollectionNameResolver.h"
|
#include "Utils/CollectionNameResolver.h"
|
||||||
#include "Utils/Events.h"
|
#include "Utils/Events.h"
|
||||||
#include "Utils/OperationOptions.h"
|
#include "Utils/OperationOptions.h"
|
||||||
|
|
|
@ -769,7 +769,6 @@ void RocksDBEngine::stop() {
|
||||||
|
|
||||||
// in case we missed the beginShutdown somehow, call it again
|
// in case we missed the beginShutdown somehow, call it again
|
||||||
replicationManager()->beginShutdown();
|
replicationManager()->beginShutdown();
|
||||||
|
|
||||||
replicationManager()->dropAll();
|
replicationManager()->dropAll();
|
||||||
|
|
||||||
if (_backgroundThread) {
|
if (_backgroundThread) {
|
||||||
|
@ -1002,6 +1001,12 @@ int RocksDBEngine::getViews(TRI_vocbase_t& vocbase, arangodb::velocypack::Builde
|
||||||
std::string RocksDBEngine::versionFilename(TRI_voc_tick_t id) const {
|
std::string RocksDBEngine::versionFilename(TRI_voc_tick_t id) const {
|
||||||
return _basePath + TRI_DIR_SEPARATOR_CHAR + "VERSION-" + std::to_string(id);
|
return _basePath + TRI_DIR_SEPARATOR_CHAR + "VERSION-" + std::to_string(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RocksDBEngine::cleanupReplicationContexts() {
|
||||||
|
if (_replicationManager != nullptr) {
|
||||||
|
_replicationManager->dropAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
VPackBuilder RocksDBEngine::getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
VPackBuilder RocksDBEngine::getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
||||||
int& status) {
|
int& status) {
|
||||||
|
|
|
@ -185,6 +185,8 @@ class RocksDBEngine final : public StorageEngine {
|
||||||
) const override {
|
) const override {
|
||||||
return std::string(); // no path to be returned here
|
return std::string(); // no path to be returned here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cleanupReplicationContexts() override;
|
||||||
|
|
||||||
velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
||||||
int& status) override;
|
int& status) override;
|
||||||
|
|
|
@ -797,6 +797,10 @@ RocksDBReplicationContext::CollectionIterator::CollectionIterator(
|
||||||
_readOptions.verify_checksums = false;
|
_readOptions.verify_checksums = false;
|
||||||
_readOptions.fill_cache = false;
|
_readOptions.fill_cache = false;
|
||||||
_readOptions.prefix_same_as_start = true;
|
_readOptions.prefix_same_as_start = true;
|
||||||
|
|
||||||
|
_cTypeHandler.reset(transaction::Context::createCustomTypeHandler(vocbase, _resolver));
|
||||||
|
vpackOptions.customTypeHandler = _cTypeHandler.get();
|
||||||
|
setSorted(sorted);
|
||||||
|
|
||||||
if (!vocbase.use()) { // false if vobase was deleted
|
if (!vocbase.use()) { // false if vobase was deleted
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||||
|
@ -806,10 +810,6 @@ RocksDBReplicationContext::CollectionIterator::CollectionIterator(
|
||||||
if (res != TRI_ERROR_NO_ERROR) { // collection was deleted
|
if (res != TRI_ERROR_NO_ERROR) { // collection was deleted
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
||||||
}
|
}
|
||||||
|
|
||||||
_cTypeHandler.reset(transaction::Context::createCustomTypeHandler(vocbase, _resolver));
|
|
||||||
vpackOptions.customTypeHandler = _cTypeHandler.get();
|
|
||||||
setSorted(sorted);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RocksDBReplicationContext::CollectionIterator::~CollectionIterator() {
|
RocksDBReplicationContext::CollectionIterator::~CollectionIterator() {
|
||||||
|
|
|
@ -383,6 +383,9 @@ bool RocksDBReplicationManager::garbageCollect(bool force) {
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void RocksDBReplicationManager::beginShutdown() {
|
void RocksDBReplicationManager::beginShutdown() {
|
||||||
MUTEX_LOCKER(mutexLocker, _lock);
|
{
|
||||||
_isShuttingDown = true;
|
MUTEX_LOCKER(mutexLocker, _lock);
|
||||||
|
_isShuttingDown = true;
|
||||||
|
}
|
||||||
|
garbageCollect(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,7 +161,7 @@ class StorageEngine : public application_features::ApplicationFeature {
|
||||||
|
|
||||||
virtual void waitForEstimatorSync(std::chrono::milliseconds maxWaitTime) = 0;
|
virtual void waitForEstimatorSync(std::chrono::milliseconds maxWaitTime) = 0;
|
||||||
|
|
||||||
//// operations on databasea
|
//// operations on databases
|
||||||
|
|
||||||
/// @brief opens a database
|
/// @brief opens a database
|
||||||
virtual std::unique_ptr<TRI_vocbase_t> openDatabase(arangodb::velocypack::Slice const& args,
|
virtual std::unique_ptr<TRI_vocbase_t> openDatabase(arangodb::velocypack::Slice const& args,
|
||||||
|
@ -337,6 +337,8 @@ class StorageEngine : public application_features::ApplicationFeature {
|
||||||
virtual void addRestHandlers(rest::RestHandlerFactory& handlerFactory) {}
|
virtual void addRestHandlers(rest::RestHandlerFactory& handlerFactory) {}
|
||||||
|
|
||||||
// replication
|
// replication
|
||||||
|
virtual void cleanupReplicationContexts() = 0;
|
||||||
|
|
||||||
virtual velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
virtual velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase,
|
||||||
int& status) = 0;
|
int& status) = 0;
|
||||||
virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(int&) = 0;
|
virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(int&) = 0;
|
||||||
|
|
|
@ -39,17 +39,16 @@ class CollectionGuard {
|
||||||
|
|
||||||
CollectionGuard(CollectionGuard&& other)
|
CollectionGuard(CollectionGuard&& other)
|
||||||
: _vocbase(other._vocbase),
|
: _vocbase(other._vocbase),
|
||||||
_collection(other._collection),
|
_collection(std::move(other._collection)),
|
||||||
_originalStatus(other._originalStatus),
|
_originalStatus(other._originalStatus),
|
||||||
_restoreOriginalStatus(other._restoreOriginalStatus) {
|
_restoreOriginalStatus(other._restoreOriginalStatus) {
|
||||||
other._collection = nullptr;
|
other._collection.reset();
|
||||||
other._vocbase = nullptr;
|
other._vocbase = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief create the guard, using a collection id
|
/// @brief create the guard, using a collection id
|
||||||
CollectionGuard(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid, bool restoreOriginalStatus = false)
|
CollectionGuard(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid, bool restoreOriginalStatus = false)
|
||||||
: _vocbase(vocbase),
|
: _vocbase(vocbase),
|
||||||
_collection(nullptr),
|
|
||||||
_originalStatus(TRI_VOC_COL_STATUS_CORRUPTED),
|
_originalStatus(TRI_VOC_COL_STATUS_CORRUPTED),
|
||||||
_restoreOriginalStatus(restoreOriginalStatus) {
|
_restoreOriginalStatus(restoreOriginalStatus) {
|
||||||
_collection = _vocbase->useCollection(cid, _originalStatus);
|
_collection = _vocbase->useCollection(cid, _originalStatus);
|
||||||
|
@ -58,28 +57,12 @@ class CollectionGuard {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
CollectionGuard(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, std::string const& name)
|
/// @brief create the guard, using a collection name
|
||||||
|
CollectionGuard(TRI_vocbase_t* vocbase, std::string const& name)
|
||||||
: _vocbase(vocbase),
|
: _vocbase(vocbase),
|
||||||
_collection(nullptr),
|
|
||||||
_originalStatus(TRI_VOC_COL_STATUS_CORRUPTED),
|
_originalStatus(TRI_VOC_COL_STATUS_CORRUPTED),
|
||||||
_restoreOriginalStatus(false) {
|
_restoreOriginalStatus(false) {
|
||||||
_collection = _vocbase->useCollection(id, _originalStatus);
|
|
||||||
if (!_collection && !name.empty()) {
|
|
||||||
_collection = _vocbase->useCollection(name, _originalStatus);
|
|
||||||
}
|
|
||||||
if (_collection == nullptr) {
|
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// @brief create the guard, using a collection name
|
|
||||||
CollectionGuard(TRI_vocbase_t* vocbase, std::string const& name,
|
|
||||||
bool restoreOriginalStatus = false)
|
|
||||||
: _vocbase(vocbase),
|
|
||||||
_collection(nullptr),
|
|
||||||
_originalStatus(TRI_VOC_COL_STATUS_CORRUPTED),
|
|
||||||
_restoreOriginalStatus(restoreOriginalStatus) {
|
|
||||||
if (!name.empty() && name[0] >= '0' && name[0] <= '9') {
|
if (!name.empty() && name[0] >= '0' && name[0] <= '9') {
|
||||||
TRI_voc_cid_t id =
|
TRI_voc_cid_t id =
|
||||||
NumberUtils::atoi_zero<TRI_voc_cid_t>(name.data(), name.data() + name.size());
|
NumberUtils::atoi_zero<TRI_voc_cid_t>(name.data(), name.data() + name.size());
|
||||||
|
@ -92,7 +75,7 @@ class CollectionGuard {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
CollectionGuard(TRI_vocbase_t* vocbase, std::shared_ptr<LogicalCollection> const& collection)
|
CollectionGuard(TRI_vocbase_t* vocbase, std::shared_ptr<LogicalCollection> const& collection)
|
||||||
: _vocbase(vocbase),
|
: _vocbase(vocbase),
|
||||||
_collection(collection),
|
_collection(collection),
|
||||||
|
@ -118,14 +101,6 @@ class CollectionGuard {
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/// @brief prematurely release the usage lock
|
|
||||||
void release() {
|
|
||||||
if (_collection != nullptr) {
|
|
||||||
_vocbase->releaseCollection(_collection.get());
|
|
||||||
_collection = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// @brief return the collection pointer
|
/// @brief return the collection pointer
|
||||||
inline arangodb::LogicalCollection* collection() const {
|
inline arangodb::LogicalCollection* collection() const {
|
||||||
return _collection.get();
|
return _collection.get();
|
||||||
|
|
|
@ -1175,6 +1175,10 @@ void StorageEngineMock::getDatabases(arangodb::velocypack::Builder& result) {
|
||||||
result.add(system.slice());
|
result.add(system.slice());
|
||||||
result.close();
|
result.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StorageEngineMock::cleanupReplicationContexts() {
|
||||||
|
// nothing to do here
|
||||||
|
}
|
||||||
|
|
||||||
arangodb::velocypack::Builder StorageEngineMock::getReplicationApplierConfiguration(
|
arangodb::velocypack::Builder StorageEngineMock::getReplicationApplierConfiguration(
|
||||||
TRI_vocbase_t& vocbase, int& result) {
|
TRI_vocbase_t& vocbase, int& result) {
|
||||||
|
|
|
@ -192,6 +192,7 @@ class StorageEngineMock: public arangodb::StorageEngine {
|
||||||
virtual void getCollectionInfo(TRI_vocbase_t& vocbase, TRI_voc_cid_t cid, arangodb::velocypack::Builder& result, bool includeIndexes, TRI_voc_tick_t maxTick) override;
|
virtual void getCollectionInfo(TRI_vocbase_t& vocbase, TRI_voc_cid_t cid, arangodb::velocypack::Builder& result, bool includeIndexes, TRI_voc_tick_t maxTick) override;
|
||||||
virtual int getCollectionsAndIndexes(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) override;
|
virtual int getCollectionsAndIndexes(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) override;
|
||||||
virtual void getDatabases(arangodb::velocypack::Builder& result) override;
|
virtual void getDatabases(arangodb::velocypack::Builder& result) override;
|
||||||
|
virtual void cleanupReplicationContexts() override;
|
||||||
virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase, int& result) override;
|
virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase, int& result) override;
|
||||||
virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(int& result) override;
|
virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(int& result) override;
|
||||||
virtual int getViews(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result) override;
|
virtual int getViews(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result) override;
|
||||||
|
|
Loading…
Reference in New Issue