1
0
Fork 0

move engine specific syncer code into engines

This commit is contained in:
Jan Christoph Uhde 2017-05-02 16:36:27 +02:00
parent 3a19370978
commit da02fd36c6
10 changed files with 1616 additions and 1444 deletions

View File

@ -38,6 +38,7 @@
#include "MMFiles/MMFilesDatafile.h"
#include "MMFiles/MMFilesDatafileHelper.h"
#include "MMFiles/MMFilesIndexFactory.h"
#include "MMFiles/MMFilesInitialSync.h"
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/MMFilesOptimizerRules.h"
#include "MMFiles/MMFilesPersistentIndex.h"
@ -3340,3 +3341,13 @@ int MMFilesEngine::saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, a
return TRI_ERROR_NO_ERROR;
}
int MMFilesEngine::handleSyncKeys(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId,
std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick,
std::string& errorMsg) {
return handleSyncKeysMMFiles(syncer, col, keysId, cid, collectionName,maxTick, errorMsg);
}

View File

@ -27,8 +27,8 @@
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "MMFiles/MMFilesDatafile.h"
#include "MMFiles/MMFilesCollectorCache.h"
#include "MMFiles/MMFilesDatafile.h"
#include "StorageEngine/StorageEngine.h"
#include "VocBase/AccessMode.h"
@ -60,7 +60,6 @@ struct MMFilesEngineCollectionFiles {
class MMFilesEngine final : public StorageEngine {
public:
// create the storage engine
explicit MMFilesEngine(application_features::ApplicationServer*);
@ -84,18 +83,30 @@ class MMFilesEngine final : public StorageEngine {
// flush wal wait for collector
void stop() override;
std::shared_ptr<arangodb::velocypack::Builder> getReplicationApplierConfiguration(TRI_vocbase_t* vocbase, int& status) override;
std::shared_ptr<arangodb::velocypack::Builder>
getReplicationApplierConfiguration(TRI_vocbase_t* vocbase,
int& status) override;
int removeReplicationApplierConfiguration(TRI_vocbase_t* vocbase) override;
int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, arangodb::velocypack::Slice slice, bool doSync) override;
int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase,
arangodb::velocypack::Slice slice,
bool doSync) override;
int handleSyncKeys(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg) override;
transaction::ContextData* createTransactionContextData() override;
TransactionState* createTransactionState(TRI_vocbase_t*) override;
TransactionCollection* createTransactionCollection(TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel) override;
TransactionCollection* createTransactionCollection(
TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType,
int nestingLevel) override;
// create storage-engine specific collection
PhysicalCollection* createPhysicalCollection(LogicalCollection*, VPackSlice const&) override;
PhysicalCollection* createPhysicalCollection(LogicalCollection*,
VPackSlice const&) override;
// create storage-engine specific view
PhysicalView* createPhysicalView(LogicalView*, VPackSlice const&) override;
@ -107,22 +118,27 @@ class MMFilesEngine final : public StorageEngine {
// fully created (see "createDatabase" below). called at server start only
void getDatabases(arangodb::velocypack::Builder& result) override;
// fills the provided builder with information about the collection
void getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid,
arangodb::velocypack::Builder& result,
// fills the provided builder with information about the collection
void getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid,
arangodb::velocypack::Builder& result,
bool includeIndexes, TRI_voc_tick_t maxTick) override;
// fill the Builder object with an array of collections (and their corresponding
// indexes) that were detected by the storage engine. called at server start separately
// fill the Builder object with an array of collections (and their
// corresponding
// indexes) that were detected by the storage engine. called at server start
// separately
// for each database
int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result,
int getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
arangodb::velocypack::Builder& result,
bool wasCleanShutdown, bool isUpgrade) override;
int getViews(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result) override;
int getViews(TRI_vocbase_t* vocbase,
arangodb::velocypack::Builder& result) override;
// return the path for a collection
std::string collectionPath(TRI_vocbase_t const* vocbase, TRI_voc_cid_t id) const override {
return collectionDirectory(vocbase->id(), id);
std::string collectionPath(TRI_vocbase_t const* vocbase,
TRI_voc_cid_t id) const override {
return collectionDirectory(vocbase->id(), id);
}
// database, collection and index management
@ -134,17 +150,23 @@ class MMFilesEngine final : public StorageEngine {
}
std::string versionFilename(TRI_voc_tick_t id) const override;
void waitForSync(TRI_voc_tick_t tick) override;
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override;
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override {
virtual TRI_vocbase_t* openDatabase(
arangodb::velocypack::Slice const& parameters, bool isUpgrade,
int&) override;
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id,
arangodb::velocypack::Slice const& args,
int& status) override {
status = TRI_ERROR_NO_ERROR;
return createDatabaseMMFiles(id, args);
}
int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) override;
int writeCreateDatabaseMarker(TRI_voc_tick_t id,
VPackSlice const& slice) override;
void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) override;
void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker,
int& status) override;
Result dropDatabase(TRI_vocbase_t* database) override;
void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override;
@ -154,45 +176,51 @@ class MMFilesEngine final : public StorageEngine {
// start compactor thread and delete files form collections marked as deleted
void recoveryDone(TRI_vocbase_t* vocbase) override;
private:
private:
int dropDatabaseMMFiles(TRI_vocbase_t* vocbase);
TRI_vocbase_t* createDatabaseMMFiles(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data);
public:
TRI_vocbase_t* createDatabaseMMFiles(TRI_voc_tick_t id,
arangodb::velocypack::Slice const& data);
public:
// asks the storage engine to create a collection as specified in the VPack
// Slice object and persist the creation info. It is guaranteed by the server
// that no other active collection with the same name and id exists in the same
// database when this function is called. If this operation fails somewhere in
// the middle, the storage engine is required to fully clean up the creation
// and throw only then, so that subsequent collection creation requests will not fail.
// Slice object and persist the creation info. It is guaranteed by the server
// that no other active collection with the same name and id exists in the
// same
// database when this function is called. If this operation fails somewhere in
// the middle, the storage engine is required to fully clean up the creation
// and throw only then, so that subsequent collection creation requests will
// not fail.
// the WAL entry for the collection creation will be written *after* the call
// to "createCollection" returns
std::string createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
arangodb::LogicalCollection const*) override;
// asks the storage engine to persist the collection.
// After this call the collection is persisted over recovery.
// This call will write wal markers.
arangodb::Result persistCollection(
TRI_vocbase_t* vocbase,
arangodb::LogicalCollection const*) override;
TRI_vocbase_t* vocbase, arangodb::LogicalCollection const*) override;
// asks the storage engine to drop the specified collection and persist the
// deletion info. Note that physical deletion of the collection data must not
// asks the storage engine to drop the specified collection and persist the
// deletion info. Note that physical deletion of the collection data must not
// be carried out by this call, as there may
// still be readers of the collection's data.
// This call will write the WAL entry for collection deletion
arangodb::Result dropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection*) override;
arangodb::Result dropCollection(TRI_vocbase_t* vocbase,
arangodb::LogicalCollection*) override;
// perform a physical deletion of the collection
// After this call data of this collection is corrupted, only perform if
// assured that no one is using the collection anymore
void destroyCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection*) override;
// asks the storage engine to change properties of the collection as specified in
// the VPack Slice object and persist them. If this operation fails
// somewhere in the middle, the storage engine is required to fully revert the
// property changes and throw only then, so that subsequent operations will not fail.
void destroyCollection(TRI_vocbase_t* vocbase,
arangodb::LogicalCollection*) override;
// asks the storage engine to change properties of the collection as specified
// in
// the VPack Slice object and persist them. If this operation fails
// somewhere in the middle, the storage engine is required to fully revert the
// property changes and throw only then, so that subsequent operations will
// not fail.
// the WAL entry for the propery change will be written *after* the call
// to "changeCollection" returns
void changeCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
@ -201,82 +229,94 @@ public:
// asks the storage engine to persist renaming of a collection
// This will write a renameMarker if not in recovery
arangodb::Result renameCollection(
TRI_vocbase_t* vocbase, arangodb::LogicalCollection const*,
std::string const& oldName) override;
arangodb::Result renameCollection(TRI_vocbase_t* vocbase,
arangodb::LogicalCollection const*,
std::string const& oldName) override;
// asks the storage engine to create an index as specified in the VPack
// Slice object and persist the creation info. The database id, collection id
// Slice object and persist the creation info. The database id, collection id
// and index data are passed in the Slice object. Note that this function
// is not responsible for inserting the individual documents into the index.
// If this operation fails somewhere in the middle, the storage engine is required
// to fully clean up the creation and throw only then, so that subsequent index
// If this operation fails somewhere in the middle, the storage engine is
// required
// to fully clean up the creation and throw only then, so that subsequent
// index
// creation requests will not fail.
// the WAL entry for the index creation will be written *after* the call
// to "createIndex" returns
void createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) override;
// asks the storage engine to drop the specified index and persist the deletion
// info. Note that physical deletion of the index must not be carried out by this call,
// as there may still be users of the index. It is recommended that this operation
TRI_idx_iid_t id,
arangodb::velocypack::Slice const& data) override;
// asks the storage engine to drop the specified index and persist the
// deletion
// info. Note that physical deletion of the index must not be carried out by
// this call,
// as there may still be users of the index. It is recommended that this
// operation
// only sets a deletion flag for the index but let's an async task perform
// the actual deletion.
// the WAL entry for index deletion will be written *after* the call
// to "dropIndex" returns
void dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
TRI_idx_iid_t id) override;
void dropIndexWalMarker(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& data, bool writeMarker, int&) override;
arangodb::velocypack::Slice const& data,
bool writeMarker, int&) override;
void unloadCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) override;
void createView(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalView const*) override;
arangodb::Result persistView(
TRI_vocbase_t* vocbase,
arangodb::LogicalView const*) override;
void unloadCollection(TRI_vocbase_t* vocbase,
arangodb::LogicalCollection* collection) override;
void createView(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
arangodb::LogicalView const*) override;
arangodb::Result persistView(TRI_vocbase_t* vocbase,
arangodb::LogicalView const*) override;
arangodb::Result dropView(TRI_vocbase_t* vocbase,
arangodb::LogicalView*) override;
arangodb::Result dropView(TRI_vocbase_t* vocbase, arangodb::LogicalView*) override;
void destroyView(TRI_vocbase_t* vocbase, arangodb::LogicalView*) override;
void changeView(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
arangodb::LogicalView const*, bool doSync) override;
std::string createViewDirectoryName(std::string const& basePath, TRI_voc_cid_t id);
void saveViewInfo(TRI_vocbase_t* vocbase,
TRI_voc_cid_t id,
arangodb::LogicalView const*,
bool forceSync) const;
std::string createViewDirectoryName(std::string const& basePath,
TRI_voc_cid_t id);
void saveViewInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
arangodb::LogicalView const*, bool forceSync) const;
void signalCleanup(TRI_vocbase_t* vocbase) override;
// document operations
// -------------------
// iterate all documents of the underlying collection
// this is called when a collection is openend, and all its documents need to be added to
// this is called when a collection is openend, and all its documents need to
// be added to
// indexes etc.
void iterateDocuments(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
std::function<void(arangodb::velocypack::Slice const&)> const& cb) override;
void iterateDocuments(
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
std::function<void(arangodb::velocypack::Slice const&)> const& cb)
override;
// adds a document to the storage engine
// this will be called by the WAL collector when surviving documents are being moved
// this will be called by the WAL collector when surviving documents are being
// moved
// into the storage engine's realm
void addDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& document) override;
void addDocumentRevision(
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& document) override;
// removes a document from the storage engine
// this will be called by the WAL collector when non-surviving documents are being removed
// this will be called by the WAL collector when non-surviving documents are
// being removed
// from the storage engine's realm
void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& document) override;
void removeDocumentRevision(
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
arangodb::velocypack::Slice const& document) override;
/// @brief scans a collection and locates all files
MMFilesEngineCollectionFiles scanCollectionDirectory(std::string const& path);
@ -285,94 +325,113 @@ public:
bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override;
/// @brief insert a compaction blocker
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override;
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl,
TRI_voc_tick_t& id) override;
/// @brief touch an existing compaction blocker
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override;
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id,
double ttl) override;
/// @brief remove an existing compaction blocker
int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override;
int removeCompactionBlocker(TRI_vocbase_t* vocbase,
TRI_voc_tick_t id) override;
/// @brief a callback function that is run while it is guaranteed that there
/// is no compaction ongoing
void preventCompaction(
TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback) override;
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
void preventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback) override;
/// @brief a callback function that is run there is no compaction ongoing
bool tryPreventCompaction(TRI_vocbase_t* vocbase,
std::function<void(TRI_vocbase_t*)> const& callback,
bool checkForActiveBlockers) override;
int shutdownDatabase(TRI_vocbase_t* vocbase) override;
int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) override;
int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection,
bool ignoreErrors) override;
/// @brief Add engine-specific AQL functions.
void addAqlFunctions() override;
/// @brief Add engine-specific optimizer rules
void addOptimizerRules() override;
/// @brief Add engine-specific V8 functions
void addV8Functions() override;
/// @brief Add engine-specific REST handlers
void addRestHandlers(rest::RestHandlerFactory*) override;
/// @brief transfer markers into a collection
int transferMarkers(LogicalCollection* collection, MMFilesCollectorCache*,
MMFilesOperationsType const&);
std::string viewDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t viewId) const;
std::string viewDirectory(TRI_voc_tick_t databaseId,
TRI_voc_cid_t viewId) const;
private:
/// @brief: check the initial markers in a datafile
bool checkDatafileHeader(MMFilesDatafile* datafile, std::string const& filename) const;
bool checkDatafileHeader(MMFilesDatafile* datafile,
std::string const& filename) const;
/// @brief transfer markers into a collection, worker function
int transferMarkersWorker(LogicalCollection* collection, MMFilesCollectorCache*,
int transferMarkersWorker(LogicalCollection* collection,
MMFilesCollectorCache*,
MMFilesOperationsType const&);
/// @brief sync the active journal of a collection
int syncJournalCollection(LogicalCollection* collection);
/// @brief get the next free position for a new marker of the specified size
char* nextFreeMarkerPosition(LogicalCollection* collection,
TRI_voc_tick_t, MMFilesMarkerType,
TRI_voc_size_t, MMFilesCollectorCache*);
char* nextFreeMarkerPosition(LogicalCollection* collection, TRI_voc_tick_t,
MMFilesMarkerType, TRI_voc_size_t,
MMFilesCollectorCache*);
/// @brief set the tick of a marker and calculate its CRC value
void finishMarker(char const*, char*, LogicalCollection* collection,
TRI_voc_tick_t, MMFilesCollectorCache*);
void verifyDirectories();
void verifyDirectories();
std::vector<std::string> getDatabaseNames() const;
/// @brief create a new database directory
/// @brief create a new database directory
int createDatabaseDirectory(TRI_voc_tick_t id, std::string const& name);
/// @brief save a parameter.json file for a database
int saveDatabaseParameters(TRI_voc_tick_t id, std::string const& name, bool deleted);
arangodb::velocypack::Builder databaseToVelocyPack(TRI_voc_tick_t databaseId,
std::string const& name,
int saveDatabaseParameters(TRI_voc_tick_t id, std::string const& name,
bool deleted);
arangodb::velocypack::Builder databaseToVelocyPack(TRI_voc_tick_t databaseId,
std::string const& name,
bool deleted) const;
std::string databaseDirectory(TRI_voc_tick_t databaseId) const;
std::string databaseParametersFilename(TRI_voc_tick_t databaseId) const;
std::string collectionDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const;
std::string collectionParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const;
std::string viewParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t viewId) const;
std::string indexFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, TRI_idx_iid_t indexId) const;
std::string collectionDirectory(TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId) const;
std::string collectionParametersFilename(TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId) const;
std::string viewParametersFilename(TRI_voc_tick_t databaseId,
TRI_voc_cid_t viewId) const;
std::string indexFilename(TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
TRI_idx_iid_t indexId) const;
std::string indexFilename(TRI_idx_iid_t indexId) const;
int openDatabases();
/// @brief open an existing database. internal function
TRI_vocbase_t* openExistingDatabase(TRI_voc_tick_t id, std::string const& name, bool wasCleanShutdown, bool isUpgrade);
TRI_vocbase_t* openExistingDatabase(TRI_voc_tick_t id,
std::string const& name,
bool wasCleanShutdown, bool isUpgrade);
/// @brief note the maximum local tick
void noteTick(TRI_voc_tick_t tick) {
if (tick > _maxTick) { _maxTick = tick; }
if (tick > _maxTick) {
_maxTick = tick;
}
}
/// @brief physically erases the database directory
@ -386,33 +445,37 @@ public:
bool findMaxTickInJournals(std::string const& path);
/// @brief create a full directory name for a collection
std::string createCollectionDirectoryName(std::string const& basePath, TRI_voc_cid_t cid);
std::string createCollectionDirectoryName(std::string const& basePath,
TRI_voc_cid_t cid);
void registerCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path);
void registerCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id,
std::string const& path);
void unregisterCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id);
void registerViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path);
void registerViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id,
std::string const& path);
void unregisterViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id);
void saveCollectionInfo(TRI_vocbase_t* vocbase,
TRI_voc_cid_t id,
void saveCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
arangodb::LogicalCollection const* parameters,
bool forceSync) const;
arangodb::velocypack::Builder loadCollectionInfo(TRI_vocbase_t* vocbase, std::string const& path);
arangodb::velocypack::Builder loadViewInfo(TRI_vocbase_t* vocbase, std::string const& path);
// start the cleanup thread for the database
arangodb::velocypack::Builder loadCollectionInfo(TRI_vocbase_t* vocbase,
std::string const& path);
arangodb::velocypack::Builder loadViewInfo(TRI_vocbase_t* vocbase,
std::string const& path);
// start the cleanup thread for the database
int startCleanup(TRI_vocbase_t* vocbase);
// stop and delete the cleanup thread for the database
// stop and delete the cleanup thread for the database
int stopCleanup(TRI_vocbase_t* vocbase);
// start the compactor thread for the database
// start the compactor thread for the database
int startCompactor(TRI_vocbase_t* vocbase);
// signal the compactor thread to stop
int beginShutdownCompactor(TRI_vocbase_t* vocbase);
// stop and delete the compactor thread for the database
int stopCompactor(TRI_vocbase_t* vocbase);
/// @brief writes a drop-database marker into the log
int writeDropMarker(TRI_voc_tick_t id);
@ -428,11 +491,16 @@ public:
std::vector<std::pair<std::string, std::string>> _deleted;
arangodb::basics::ReadWriteLock mutable _pathsLock;
std::unordered_map<TRI_voc_tick_t, std::unordered_map<TRI_voc_cid_t, std::string>> _collectionPaths;
std::unordered_map<TRI_voc_tick_t, std::unordered_map<TRI_voc_cid_t, std::string>> _viewPaths;
std::unordered_map<TRI_voc_tick_t,
std::unordered_map<TRI_voc_cid_t, std::string>>
_collectionPaths;
std::unordered_map<TRI_voc_tick_t,
std::unordered_map<TRI_voc_cid_t, std::string>>
_viewPaths;
struct CompactionBlocker {
CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {}
CompactionBlocker(TRI_voc_tick_t id, double expires)
: _id(id), _expires(expires) {}
CompactionBlocker() = delete;
TRI_voc_tick_t _id;
@ -441,9 +509,11 @@ public:
// lock for compaction blockers
arangodb::basics::ReadWriteLock mutable _compactionBlockersLock;
// cross-database map of compaction blockers, protected by _compactionBlockersLock
std::unordered_map<TRI_vocbase_t*, std::vector<CompactionBlocker>> _compactionBlockers;
// cross-database map of compaction blockers, protected by
// _compactionBlockersLock
std::unordered_map<TRI_vocbase_t*, std::vector<CompactionBlocker>>
_compactionBlockers;
// lock for threads
arangodb::Mutex _threadsLock;
// per-database compactor threads, protected by _threadsLock
@ -451,7 +521,6 @@ public:
// per-database cleanup threads, protected by _threadsLock
std::unordered_map<TRI_vocbase_t*, MMFilesCleanupThread*> _cleanupThreads;
};
}
#endif

View File

@ -0,0 +1,712 @@
#include "Replication/InitialSyncer.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "MMFiles/MMFilesCollection.h"
#include "MMFiles/MMFilesDatafileHelper.h"
#include "MMFiles/MMFilesDitch.h"
#include "MMFiles/MMFilesIndexElement.h"
#include "MMFiles/MMFilesPrimaryIndex.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "Transaction/Helpers.h"
#include "Utils/OperationOptions.h"
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
namespace arangodb {
////////////////////////////////////////////////////////////////////////////////
/// @brief performs a binary search for the given key in the markers vector
////////////////////////////////////////////////////////////////////////////////
static bool BinarySearch(std::vector<uint8_t const*> const& markers,
std::string const& key, size_t& position) {
TRI_ASSERT(!markers.empty());
size_t l = 0;
size_t r = markers.size() - 1;
while (true) {
// determine midpoint
position = l + ((r - l) / 2);
TRI_ASSERT(position < markers.size());
VPackSlice const otherSlice(markers.at(position));
VPackSlice const otherKey = otherSlice.get(StaticStrings::KeyString);
int res = key.compare(otherKey.copyString());
if (res == 0) {
return true;
}
if (res < 0) {
if (position == 0) {
return false;
}
r = position - 1;
} else {
l = position + 1;
}
if (r < l) {
return false;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief finds a key range in the markers vector
////////////////////////////////////////////////////////////////////////////////
static bool FindRange(std::vector<uint8_t const*> const& markers,
std::string const& lower, std::string const& upper,
size_t& lowerPos, size_t& upperPos) {
bool found = false;
if (!markers.empty()) {
found = BinarySearch(markers, lower, lowerPos);
if (found) {
found = BinarySearch(markers, upper, upperPos);
}
}
return found;
}
int handleSyncKeysMMFiles(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId,
std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick,
std::string& errorMsg) {
std::string progress =
"collecting local keys for collection '" + collectionName + "'";
syncer.setProgress(progress);
// fetch all local keys from primary index
std::vector<uint8_t const*> markers;
MMFilesDocumentDitch* ditch = nullptr;
// acquire a replication ditch so no datafiles are thrown away from now on
// note: the ditch also protects against unloading the collection
{
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(syncer.vocbase()), col->cid(),
AccessMode::Type::READ);
Result res = trx.begin();
if (!res.ok()) {
errorMsg =
std::string("unable to start transaction: ") + res.errorMessage();
res.reset(res.errorNumber(), errorMsg);
return res.errorNumber();
}
ditch = arangodb::MMFilesCollection::toMMFilesCollection(col)
->ditches()
->createMMFilesDocumentDitch(false, __FILE__, __LINE__);
if (ditch == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
}
TRI_ASSERT(ditch != nullptr);
TRI_DEFER(arangodb::MMFilesCollection::toMMFilesCollection(col)
->ditches()
->freeDitch(ditch));
{
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(syncer.vocbase()), col->cid(),
AccessMode::Type::READ);
Result res = trx.begin();
if (!res.ok()) {
errorMsg =
std::string("unable to start transaction: ") + res.errorMessage();
res.reset(res.errorNumber(), errorMsg);
return res.errorNumber();
}
// We do not take responsibility for the index.
// The LogicalCollection is protected by trx.
// Neither it nor it's indexes can be invalidated
markers.reserve(trx.documentCollection()->numberDocuments(&trx));
uint64_t iterations = 0;
ManagedDocumentResult mmdr;
trx.invokeOnAllElements(
trx.name(), [&syncer, &trx, &mmdr, &markers,
&iterations](DocumentIdentifierToken const& token) {
if (trx.documentCollection()->readDocument(&trx, token, mmdr)) {
markers.emplace_back(mmdr.vpack());
if (++iterations % 10000 == 0) {
if (syncer.checkAborted()) {
return false;
}
}
}
return true;
});
if (syncer.checkAborted()) {
return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
}
syncer.sendExtendBatch();
syncer.sendExtendBarrier();
std::string progress = "sorting " + std::to_string(markers.size()) +
" local key(s) for collection '" + collectionName +
"'";
syncer.setProgress(progress);
// sort all our local keys
std::sort(
markers.begin(), markers.end(),
[](uint8_t const* lhs, uint8_t const* rhs) -> bool {
VPackSlice const l(lhs);
VPackSlice const r(rhs);
VPackValueLength lLength, rLength;
char const* lKey = l.get(StaticStrings::KeyString).getString(lLength);
char const* rKey = r.get(StaticStrings::KeyString).getString(rLength);
size_t const length =
static_cast<size_t>(lLength < rLength ? lLength : rLength);
int res = memcmp(lKey, rKey, length);
if (res < 0) {
// left is smaller than right
return true;
}
if (res == 0 && lLength < rLength) {
// left is equal to right, but of shorter length
return true;
}
return false;
});
}
if (syncer.checkAborted()) {
return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
}
syncer.sendExtendBatch();
syncer.sendExtendBarrier();
std::vector<size_t> toFetch;
TRI_voc_tick_t const chunkSize = 5000;
std::string const baseUrl = syncer.BaseUrl + "/keys";
std::string url =
baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize);
progress = "fetching remote keys chunks for collection '" + collectionName +
"' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": HTTP " + basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
auto builder = std::make_shared<VPackBuilder>();
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " +
std::string(syncer._masterInfo._endpoint) +
": invalid response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const slice = builder->slice();
if (!slice.isArray()) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
VPackBuilder keyBuilder;
size_t const n = static_cast<size_t>(slice.length());
// remove all keys that are below first remote key or beyond last remote key
if (n > 0) {
// first chunk
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(syncer._vocbase), col->cid(),
AccessMode::Type::WRITE);
Result res = trx.begin();
if (!res.ok()) {
errorMsg =
std::string("unable to start transaction: ") + res.errorMessage();
res.reset(res.errorNumber(), errorMsg);
return res.errorNumber();
}
VPackSlice chunk = slice.at(0);
TRI_ASSERT(chunk.isObject());
auto lowSlice = chunk.get("low");
TRI_ASSERT(lowSlice.isString());
std::string const lowKey(lowSlice.copyString());
for (size_t i = 0; i < markers.size(); ++i) {
VPackSlice const k(markers[i]);
std::string const key(k.get(StaticStrings::KeyString).copyString());
if (key.compare(lowKey) >= 0) {
break;
}
keyBuilder.clear();
keyBuilder.openObject();
keyBuilder.add(StaticStrings::KeyString, VPackValue(key));
keyBuilder.close();
trx.remove(collectionName, keyBuilder.slice(), options);
}
// last high
chunk = slice.at(n - 1);
TRI_ASSERT(chunk.isObject());
auto highSlice = chunk.get("high");
TRI_ASSERT(highSlice.isString());
std::string const highKey(highSlice.copyString());
for (size_t i = markers.size(); i >= 1; --i) {
VPackSlice const k(markers[i - 1]);
std::string const key(k.get(StaticStrings::KeyString).copyString());
if (key.compare(highKey) <= 0) {
break;
}
keyBuilder.clear();
keyBuilder.openObject();
keyBuilder.add(StaticStrings::KeyString, VPackValue(key));
keyBuilder.close();
trx.remove(collectionName, keyBuilder.slice(), options);
}
trx.commit();
}
size_t nextStart = 0;
// now process each chunk
for (size_t i = 0; i < n; ++i) {
if (syncer.checkAborted()) {
return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
}
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(syncer._vocbase), col->cid(),
AccessMode::Type::WRITE);
Result res = trx.begin();
if (!res.ok()) {
errorMsg =
std::string("unable to start transaction: ") + res.errorMessage();
res.reset(res.errorNumber(), res.errorMessage());
return res.errorNumber();
}
trx.pinData(col->cid()); // will throw when it fails
// We do not take responsibility for the index.
// The LogicalCollection is protected by trx.
// Neither it nor it's indexes can be invalidated
// TODO Move to MMFiles
auto physical = static_cast<MMFilesCollection*>(
trx.documentCollection()->getPhysical());
auto idx = physical->primaryIndex();
size_t const currentChunkId = i;
progress = "processing keys chunk " + std::to_string(currentChunkId) +
" for collection '" + collectionName + "'";
syncer.setProgress(progress);
syncer.sendExtendBatch();
syncer.sendExtendBarrier();
// read remote chunk
VPackSlice chunk = slice.at(i);
if (!chunk.isObject()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": chunk is no object";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const lowSlice = chunk.get("low");
VPackSlice const highSlice = chunk.get("high");
VPackSlice const hashSlice = chunk.get("hash");
if (!lowSlice.isString() || !highSlice.isString() ||
!hashSlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint +
": chunks in response have an invalid format";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
std::string const lowString = lowSlice.copyString();
std::string const highString = highSlice.copyString();
size_t localFrom;
size_t localTo;
bool match = FindRange(markers, lowString, highString, localFrom, localTo);
if (match) {
// now must hash the range
uint64_t hash = 0x012345678;
for (size_t i = localFrom; i <= localTo; ++i) {
TRI_ASSERT(i < markers.size());
VPackSlice const current(markers.at(i));
hash ^= current.get(StaticStrings::KeyString).hashString();
hash ^= current.get(StaticStrings::RevString).hash();
}
if (std::to_string(hash) != hashSlice.copyString()) {
match = false;
}
}
if (match) {
// match
nextStart = localTo + 1;
} else {
// no match
// must transfer keys for non-matching range
std::string url = baseUrl + "/" + keysId +
"?type=keys&chunk=" + std::to_string(i) +
"&chunkSize=" + std::to_string(chunkSize);
progress = "fetching keys chunk " + std::to_string(currentChunkId) +
" for collection '" + collectionName + "' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
auto builder = std::make_shared<VPackBuilder>();
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const slice = builder->slice();
if (!slice.isArray()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// delete all keys at start of the range
while (nextStart < markers.size()) {
VPackSlice const keySlice(markers[nextStart]);
std::string const localKey(
keySlice.get(StaticStrings::KeyString).copyString());
if (localKey.compare(lowString) < 0) {
// we have a local key that is not present remotely
keyBuilder.clear();
keyBuilder.openObject();
keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey));
keyBuilder.close();
trx.remove(collectionName, keyBuilder.slice(), options);
++nextStart;
} else {
break;
}
}
toFetch.clear();
size_t const n = static_cast<size_t>(slice.length());
TRI_ASSERT(n > 0);
for (size_t i = 0; i < n; ++i) {
VPackSlice const pair = slice.at(i);
if (!pair.isArray() || pair.length() != 2) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint +
": response key pair is no valid array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// key
VPackSlice const keySlice = pair.at(0);
if (!keySlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response key is no string";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// rid
if (markers.empty()) {
// no local markers
toFetch.emplace_back(i);
continue;
}
std::string const keyString = keySlice.copyString();
while (nextStart < markers.size()) {
VPackSlice const localKeySlice(markers[nextStart]);
std::string const localKey(
localKeySlice.get(StaticStrings::KeyString).copyString());
int res = localKey.compare(keyString);
if (res != 0) {
// we have a local key that is not present remotely
keyBuilder.clear();
keyBuilder.openObject();
keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey));
keyBuilder.close();
trx.remove(collectionName, keyBuilder.slice(), options);
++nextStart;
} else {
// key match
break;
}
}
MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice);
if (!element) {
// key not found locally
toFetch.emplace_back(i);
} else if (TRI_RidToString(element.revisionId()) !=
pair.at(1).copyString()) {
// key found, but revision id differs
toFetch.emplace_back(i);
++nextStart;
} else {
// a match - nothing to do!
++nextStart;
}
}
// calculate next starting point
if (!markers.empty()) {
BinarySearch(markers, highString, nextStart);
while (nextStart < markers.size()) {
VPackSlice const localKeySlice(markers[nextStart]);
std::string const localKey(
localKeySlice.get(StaticStrings::KeyString).copyString());
int res = localKey.compare(highString);
if (res <= 0) {
++nextStart;
} else {
break;
}
}
}
if (!toFetch.empty()) {
VPackBuilder keysBuilder;
keysBuilder.openArray();
for (auto& it : toFetch) {
keysBuilder.add(VPackValue(it));
}
keysBuilder.close();
std::string url = baseUrl + "/" + keysId +
"?type=docs&chunk=" + std::to_string(currentChunkId) +
"&chunkSize=" + std::to_string(chunkSize);
progress = "fetching documents chunk " +
std::to_string(currentChunkId) + " for collection '" +
collectionName + "' from " + url;
syncer.setProgress(progress);
std::string const keyJsonString(keysBuilder.slice().toJson());
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::PUT, url,
keyJsonString.c_str(), keyJsonString.size()));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
auto builder = std::make_shared<VPackBuilder>();
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " +
std::string(syncer._masterInfo._endpoint) +
": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const slice = builder->slice();
if (!slice.isArray()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
for (auto const& it : VPackArrayIterator(slice)) {
if (!it.isObject()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document is no object";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const keySlice = it.get(StaticStrings::KeyString);
if (!keySlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document key is invalid";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const revSlice = it.get(StaticStrings::RevString);
if (!revSlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document revision is invalid";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice);
if (!element) {
// INSERT
OperationResult opRes = trx.insert(collectionName, it, options);
res = opRes.code;
} else {
// UPDATE
OperationResult opRes = trx.update(collectionName, it, options);
res = opRes.code;
}
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
}
}
res = trx.commit();
if (!res.ok()) {
return res.errorNumber();
}
}
return res;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -37,12 +37,50 @@ struct TRI_vocbase_t;
namespace arangodb {
class LogicalCollection;
class InitialSyncer;
int handleSyncKeysMMFiles(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick, std::string& errorMsg);
int handleSyncKeysRocksDB(InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick, std::string& errorMsg);
int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
std::string const& keysId, uint64_t chunkId,
std::string const& lowString,
std::string const& highString,
std::vector<std::pair<std::string, uint64_t>> markers,
std::string& errorMsg);
namespace httpclient {
class SimpleHttpResult;
}
class InitialSyncer : public Syncer {
friend int ::arangodb::handleSyncKeysMMFiles(
arangodb::InitialSyncer& syncer, arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
friend int ::arangodb::handleSyncKeysRocksDB(
InitialSyncer& syncer, arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
friend int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
std::string const& keysId, uint64_t chunkId,
std::string const& lowString,
std::string const& highString,
std::vector<std::pair<std::string, uint64_t>> markers,
std::string& errorMsg);
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief apply phases
@ -64,7 +102,6 @@ class InitialSyncer : public Syncer {
~InitialSyncer();
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief run method, performs a full synchronization
//////////////////////////////////////////////////////////////////////////////
@ -117,8 +154,7 @@ class InitialSyncer : public Syncer {
if (_verbose) {
LOG_TOPIC(INFO, Logger::REPLICATION) << msg;
}
else {
} else {
LOG_TOPIC(DEBUG, Logger::REPLICATION) << msg;
}
@ -161,8 +197,7 @@ class InitialSyncer : public Syncer {
/// @brief apply the data from a collection dump
//////////////////////////////////////////////////////////////////////////////
int applyCollectionDump(transaction::Methods&,
std::string const&,
int applyCollectionDump(transaction::Methods&, std::string const&,
httpclient::SimpleHttpResult*, uint64_t&,
std::string&);
@ -185,22 +220,21 @@ class InitialSyncer : public Syncer {
int handleCollectionSync(arangodb::LogicalCollection*, std::string const&,
std::string const&, TRI_voc_tick_t, std::string&);
//////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
//////////////////////////////////////////////////////////////////////////////
int handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
std::string const& collectionName,
TRI_voc_tick_t maxTick, std::string& errorMsg);
//////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch chunk data from a collection
//////////////////////////////////////////////////////////////////////////////
int syncChunkRocksDB(SingleCollectionTransaction* trx,
std::string const& keysId,
uint64_t chunkId,
std::string const& keysId, uint64_t chunkId,
std::string const& lowKey, std::string const& highKey,
std::vector<std::pair<std::string, uint64_t>> markers,
std::string& errorMsg);
@ -211,8 +245,8 @@ class InitialSyncer : public Syncer {
int handleSyncKeysMMFiles(arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
std::string const& collectionName,
TRI_voc_tick_t maxTick, std::string& errorMsg);
//////////////////////////////////////////////////////////////////////////////
/// @brief changes the properties of a collection, based on the VelocyPack
@ -226,24 +260,24 @@ class InitialSyncer : public Syncer {
/// @brief handle the information about a collection
//////////////////////////////////////////////////////////////////////////////
int handleCollection(arangodb::velocypack::Slice const&,
arangodb::velocypack::Slice const&, bool,
std::string&, sync_phase_e);
int handleCollection(arangodb::velocypack::Slice const&,
arangodb::velocypack::Slice const&, bool, std::string&,
sync_phase_e);
//////////////////////////////////////////////////////////////////////////////
/// @brief handle the inventory response of the master
//////////////////////////////////////////////////////////////////////////////
int handleInventoryResponse(arangodb::velocypack::Slice const&,
bool, std::string&);
int handleInventoryResponse(arangodb::velocypack::Slice const&, bool,
std::string&);
//////////////////////////////////////////////////////////////////////////////
/// @brief iterate over all collections from an array and apply an action
//////////////////////////////////////////////////////////////////////////////
int iterateCollections(
std::vector<
std::pair<arangodb::velocypack::Slice, arangodb::velocypack::Slice>> const&,
std::vector<std::pair<arangodb::velocypack::Slice,
arangodb::velocypack::Slice>> const&,
bool, std::string&, sync_phase_e);
private:

View File

@ -58,6 +58,8 @@ class Syncer {
Syncer(TRI_vocbase_t*, TRI_replication_applier_configuration_t const*);
virtual ~Syncer();
TRI_vocbase_t* vocbase() { return _vocbase; }
//////////////////////////////////////////////////////////////////////////////
/// @brief sleeps (nanoseconds)

View File

@ -57,6 +57,7 @@
#include "RocksDBEngine/RocksDBV8Functions.h"
#include "RocksDBEngine/RocksDBValue.h"
#include "RocksDBEngine/RocksDBView.h"
#include "RocksDBEngine/RocksDBInitialSync.h"
#include "VocBase/replication-applier.h"
#include "VocBase/ticks.h"
@ -1210,4 +1211,14 @@ RocksDBReplicationManager* RocksDBEngine::replicationManager() const {
return _replicationManager.get();
}
int RocksDBEngine::handleSyncKeys(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId,
std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick,
std::string& errorMsg) {
return handleSyncKeysRocksDB(syncer, col, keysId, cid, collectionName,
maxTick, errorMsg);
}
} // namespace

View File

@ -115,7 +115,11 @@ class RocksDBEngine final : public StorageEngine {
int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase,
arangodb::velocypack::Slice slice,
bool doSync) override;
int handleSyncKeys(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
// database, collection and index management
// -----------------------------------------

View File

@ -0,0 +1,595 @@
#include "Replication/InitialSyncer.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Indexes/IndexIterator.h"
#include "SimpleHttpClient/SimpleHttpClient.h"
#include "SimpleHttpClient/SimpleHttpResult.h"
#include "Transaction/Helpers.h"
#include "Utils/OperationOptions.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ManagedDocumentResult.h"
#include "VocBase/PhysicalCollection.h"
#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx,
std::string const& keysId, uint64_t chunkId,
std::string const& lowString,
std::string const& highString,
std::vector<std::pair<std::string, uint64_t>> markers,
std::string& errorMsg) {
std::string const baseUrl = syncer.BaseUrl + "/keys";
TRI_voc_tick_t const chunkSize = 5000;
std::string const& collectionName = trx->documentCollection()->name();
PhysicalCollection* physical = trx->documentCollection()->getPhysical();
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
// no match
// must transfer keys for non-matching range
std::string url = baseUrl + "/" + keysId + "?type=keys&chunk=" +
std::to_string(chunkId) + "&chunkSize=" +
std::to_string(chunkSize) + "&low=" + lowString;
std::string progress =
"fetching keys chunk '" + std::to_string(chunkId) + "' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": HTTP " + basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
auto builder = std::make_shared<VPackBuilder>();
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const responseBody = builder->slice();
if (!responseBody.isArray()) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
transaction::BuilderLeaser keyBuilder(trx);
/*size_t nextStart = 0;
// delete all keys at start of the range
while (nextStart < markers.size()) {
std::string const& localKey = markers[nextStart].first;
if ( localKey.compare(lowString) < 0) {
// we have a local key that is not present remotely
keyBuilder.clear();
keyBuilder.openObject();
keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey));
keyBuilder.close();
trx.remove(collectionName, keyBuilder.slice(), options);
++nextStart;
} else {
break;
}
}*/
std::vector<size_t> toFetch;
size_t const numKeys = static_cast<size_t>(responseBody.length());
if (numKeys == 0) {
errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint +
": response contains an empty chunk. Collection: " +
collectionName + " Chunk: " + std::to_string(chunkId);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_ASSERT(numKeys > 0);
size_t i = 0;
size_t nextStart = 0;
for (VPackSlice const& pair : VPackArrayIterator(responseBody)) {
if (!pair.isArray() || pair.length() != 2) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint +
": response key pair is no valid array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// key
VPackSlice const keySlice = pair.at(0);
if (!keySlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response key is no string";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// rid
if (markers.empty()) {
// no local markers
toFetch.emplace_back(i);
i++;
continue;
}
// remove keys not present anymore
while (nextStart < markers.size()) {
std::string const& localKey = markers[nextStart].first;
int res = keySlice.compareString(localKey);
if (res != 0) {
// we have a local key that is not present remotely
keyBuilder->clear();
keyBuilder->openObject();
keyBuilder->add(StaticStrings::KeyString, VPackValue(localKey));
keyBuilder->close();
trx->remove(collectionName, keyBuilder->slice(), options);
++nextStart;
} else {
// key match
break;
}
}
// see if key exists
DocumentIdentifierToken token = physical->lookupKey(trx, keySlice);
if (token._data == 0) {
// key not found locally
toFetch.emplace_back(i);
} else if (TRI_RidToString(token._data) != pair.at(1).copyString()) {
// key found, but revision id differs
toFetch.emplace_back(i);
++nextStart;
} else {
// a match - nothing to do!
++nextStart;
}
i++;
}
// delete all keys at end of the range
while (nextStart < markers.size()) {
std::string const& localKey = markers[nextStart].first;
TRI_ASSERT(localKey.compare(highString) > 0);
// if (localKey.compare(highString) > 0) {
// we have a local key that is not present remotely
keyBuilder->clear();
keyBuilder->openObject();
keyBuilder->add(StaticStrings::KeyString, VPackValue(localKey));
keyBuilder->close();
trx->remove(collectionName, keyBuilder->slice(), options);
//}
++nextStart;
}
if (!toFetch.empty()) {
VPackBuilder keysBuilder;
keysBuilder.openArray();
for (auto& it : toFetch) {
keysBuilder.add(VPackValue(it));
}
keysBuilder.close();
std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" +
std::to_string(chunkId) + "&chunkSize=" +
std::to_string(chunkSize) + "&low=" + lowString ;
progress = "fetching documents chunk " + std::to_string(chunkId) +
" for collection '" + collectionName + "' from " + url;
syncer.setProgress(progress);
std::string const keyJsonString(keysBuilder.slice().toJson());
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::PUT, url,
keyJsonString.c_str(), keyJsonString.size()));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint +
": " + syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
auto builder = std::make_shared<VPackBuilder>();
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " +
std::string(syncer._masterInfo._endpoint) + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const slice = builder->slice();
if (!slice.isArray()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
for (auto const& it : VPackArrayIterator(slice)) {
if (!it.isObject()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document is no object";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const keySlice = it.get(StaticStrings::KeyString);
if (!keySlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document key is invalid";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const revSlice = it.get(StaticStrings::RevString);
if (!revSlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": document revision is invalid";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
DocumentIdentifierToken token = physical->lookupKey(trx, keySlice);
if (!token._data) {
// INSERT
OperationResult opRes = trx->insert(collectionName, it, options);
res = opRes.code;
} else {
// UPDATE
OperationResult opRes = trx->update(collectionName, it, options);
res = opRes.code;
}
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
}
return TRI_ERROR_NO_ERROR;
}
int handleSyncKeysRocksDB(InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick, std::string& errorMsg) {
std::string progress =
"collecting local keys for collection '" + collectionName + "'";
syncer.setProgress(progress);
if (syncer.checkAborted()) {
return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
}
syncer.sendExtendBatch();
syncer.sendExtendBarrier();
TRI_voc_tick_t const chunkSize = 5000;
std::string const baseUrl = syncer.BaseUrl + "/keys";
std::string url =
baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize);
progress = "fetching remote keys chunks for collection '" + collectionName +
"' from " + url;
syncer.setProgress(progress);
std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
if (response == nullptr || !response->isComplete()) {
errorMsg = "could not connect to master at " +
syncer._masterInfo._endpoint + ": " +
syncer._client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " +
response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
auto builder = std::make_shared<VPackBuilder>();
int res = syncer.parseResponse(builder, response.get());
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "got invalid response from master at " +
std::string(syncer._masterInfo._endpoint) +
": invalid response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
VPackSlice const chunkSlice = builder->slice();
if (!chunkSlice.isArray()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
ManagedDocumentResult mmdr;
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
VPackBuilder keyBuilder;
size_t const numChunks = static_cast<size_t>(chunkSlice.length());
// remove all keys that are below first remote key or beyond last remote key
if (numChunks > 0) {
// first chunk
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(syncer._vocbase), col->cid(),
AccessMode::Type::WRITE);
Result res = trx.begin();
if (!res.ok()) {
errorMsg =
std::string("unable to start transaction: ") + res.errorMessage();
res.reset(res.errorNumber(), errorMsg);
return res.errorNumber();
}
VPackSlice chunk = chunkSlice.at(0);
TRI_ASSERT(chunk.isObject());
auto lowSlice = chunk.get("low");
TRI_ASSERT(lowSlice.isString());
// last high
chunk = chunkSlice.at(numChunks - 1);
TRI_ASSERT(chunk.isObject());
auto highSlice = chunk.get("high");
TRI_ASSERT(highSlice.isString());
std::string const lowKey(lowSlice.copyString());
std::string const highKey(highSlice.copyString());
LogicalCollection* coll = trx.documentCollection();
std::unique_ptr<IndexIterator> iterator =
coll->getAllIterator(&trx, &mmdr, false);
iterator->next(
[&](DocumentIdentifierToken const& token) {
if (coll->readDocument(&trx, token, mmdr) == false) {
return;
}
VPackSlice doc(mmdr.vpack());
VPackSlice key = doc.get(StaticStrings::KeyString);
if (key.compareString(lowKey.data(), lowKey.length()) < 0) {
trx.remove(collectionName, key, options);
} else if (key.compareString(highKey.data(), highKey.length()) > 0) {
trx.remove(collectionName, key, options);
}
},
UINT64_MAX);
trx.commit();
}
{
if (syncer.checkAborted()) {
return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
}
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(syncer._vocbase), col->cid(),
AccessMode::Type::WRITE);
Result res = trx.begin();
if (!res.ok()) {
errorMsg =
std::string("unable to start transaction: ") + res.errorMessage();
res.reset(res.errorNumber(), res.errorMessage());
return res.errorNumber();
}
// We do not take responsibility for the index.
// The LogicalCollection is protected by trx.
// Neither it nor it's indexes can be invalidated
size_t currentChunkId = 0;
std::string lowKey;
std::string highKey;
std::string hashString;
uint64_t localHash = 0x012345678;
// chunk keys + revisionId
std::vector<std::pair<std::string, uint64_t>> markers;
bool foundLowKey = false;
auto resetChunk = [&]() -> void {
syncer.sendExtendBatch();
syncer.sendExtendBarrier();
progress = "processing keys chunk " + std::to_string(currentChunkId) +
" for collection '" + collectionName + "'";
syncer.setProgress(progress);
// read remote chunk
VPackSlice chunk = chunkSlice.at(currentChunkId);
if (!chunk.isObject()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint + ": chunk is no object";
THROW_ARANGO_EXCEPTION(TRI_ERROR_REPLICATION_INVALID_RESPONSE);
}
VPackSlice const lowSlice = chunk.get("low");
VPackSlice const highSlice = chunk.get("high");
VPackSlice const hashSlice = chunk.get("hash");
if (!lowSlice.isString() || !highSlice.isString() ||
!hashSlice.isString()) {
errorMsg = "got invalid response from master at " +
syncer._masterInfo._endpoint +
": chunks in response have an invalid format";
THROW_ARANGO_EXCEPTION(TRI_ERROR_REPLICATION_INVALID_RESPONSE);
}
// now reset chunk information
markers.clear();
lowKey = lowSlice.copyString();
highKey = highSlice.copyString();
hashString = hashSlice.copyString();
localHash = 0x012345678;
foundLowKey = false;
};
// set to first chunk
resetChunk();
std::function<void(VPackSlice, VPackSlice)> parseDoc = [&](VPackSlice doc,
VPackSlice key) {
bool rangeUnequal = false;
bool nextChunk = false;
int cmp1 = key.compareString(lowKey.data(), lowKey.length());
int cmp2 = key.compareString(highKey.data(), highKey.length());
if (cmp1 < 0) {
// smaller values than lowKey mean they don't exist remotely
trx.remove(collectionName, key, options);
return;
} else if (cmp1 >= 0 && cmp2 <= 0) {
// we only need to hash we are in the range
if (cmp1 == 0) {
foundLowKey = true;
}
markers.emplace_back(key.copyString(), TRI_ExtractRevisionId(doc));
// don't bother hashing if we have't found lower key
if (foundLowKey) {
VPackSlice revision = doc.get(StaticStrings::RevString);
localHash ^= key.hashString();
localHash ^= revision.hash();
if (cmp2 == 0) { // found highKey
rangeUnequal = std::to_string(localHash) != hashString;
nextChunk = true;
}
} else if (cmp2 == 0) { // found high key, but not low key
rangeUnequal = true;
nextChunk = true;
}
} else if (cmp2 > 0) { // higher than highKey
// current range was unequal and we did not find the
// high key. Load range and skip to next
rangeUnequal = true;
nextChunk = true;
}
TRI_ASSERT(!rangeUnequal || nextChunk); // A => B
if (nextChunk) { // we are out of range, see next chunk
if (rangeUnequal && currentChunkId < numChunks) {
int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey,
highKey, markers, errorMsg);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
}
currentChunkId++;
if (currentChunkId < numChunks) {
resetChunk();
// key is higher than upper bound, recheck the current document
if (cmp2 > 0) {
parseDoc(doc, key);
}
}
}
};
std::unique_ptr<IndexIterator> iterator =
col->getAllIterator(&trx, &mmdr, false);
iterator->next(
[&](DocumentIdentifierToken const& token) {
if (col->readDocument(&trx, token, mmdr) == false) {
return;
}
VPackSlice doc(mmdr.vpack());
VPackSlice key = doc.get(StaticStrings::KeyString);
parseDoc(doc, key);
},
UINT64_MAX);
// we might have missed chunks, if the keys don't exist at all locally
while (currentChunkId < numChunks) {
int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey, highKey,
markers, errorMsg);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
currentChunkId++;
if (currentChunkId < numChunks) {
resetChunk();
}
}
res = trx.commit();
if (!res.ok()) {
return res.errorNumber();
}
}
return res;
}
}

View File

@ -48,6 +48,7 @@ class PhysicalView;
class Result;
class TransactionCollection;
class TransactionState;
class InitialSyncer;
namespace rest {
class RestHandlerFactory;
@ -405,6 +406,14 @@ class StorageEngine : public application_features::ApplicationFeature {
virtual std::shared_ptr<arangodb::velocypack::Builder> getReplicationApplierConfiguration(TRI_vocbase_t*, int& status) = 0;
virtual int removeReplicationApplierConfiguration(TRI_vocbase_t* vocbase) = 0;
virtual int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, arangodb::velocypack::Slice slice, bool doSync) = 0;
virtual int handleSyncKeys(arangodb::InitialSyncer& syncer,
arangodb::LogicalCollection* col,
std::string const& keysId,
std::string const& cid,
std::string const& collectionName,
TRI_voc_tick_t maxTick,
std::string& errorMsg) = 0;
void getCapabilities(VPackBuilder& builder) const {
builder.openObject();