diff --git a/CHANGELOG b/CHANGELOG index 36601a65de..a3ce260804 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,7 +6,7 @@ devel * `--server.maximal-queue-size` is now an absolute maximum. If the queue is full, then 503 is returned. Setting it to 0 means "no limit". -* (Enterprise only) added authentication against a LDAP server +* (Enterprise only) added authentication against an LDAP server v3.2.alpha4 (2017-04-25) diff --git a/arangod/MMFiles/MMFilesEngine.cpp b/arangod/MMFiles/MMFilesEngine.cpp index 37c897b126..e9ad960992 100644 --- a/arangod/MMFiles/MMFilesEngine.cpp +++ b/arangod/MMFiles/MMFilesEngine.cpp @@ -38,6 +38,7 @@ #include "MMFiles/MMFilesDatafile.h" #include "MMFiles/MMFilesDatafileHelper.h" #include "MMFiles/MMFilesIndexFactory.h" +#include "MMFiles/MMFilesInitialSync.h" #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/MMFilesOptimizerRules.h" #include "MMFiles/MMFilesPersistentIndex.h" @@ -3340,3 +3341,13 @@ int MMFilesEngine::saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, a return TRI_ERROR_NO_ERROR; } + +int MMFilesEngine::handleSyncKeys(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, + std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, + std::string& errorMsg) { + return handleSyncKeysMMFiles(syncer, col, keysId, cid, collectionName,maxTick, errorMsg); +} diff --git a/arangod/MMFiles/MMFilesEngine.h b/arangod/MMFiles/MMFilesEngine.h index c2f845cf07..0679012d32 100644 --- a/arangod/MMFiles/MMFilesEngine.h +++ b/arangod/MMFiles/MMFilesEngine.h @@ -27,8 +27,8 @@ #include "Basics/Common.h" #include "Basics/Mutex.h" -#include "MMFiles/MMFilesDatafile.h" #include "MMFiles/MMFilesCollectorCache.h" +#include "MMFiles/MMFilesDatafile.h" #include "StorageEngine/StorageEngine.h" #include "VocBase/AccessMode.h" @@ -60,7 +60,6 @@ struct MMFilesEngineCollectionFiles { class MMFilesEngine final : public StorageEngine { public: - // create the storage engine explicit MMFilesEngine(application_features::ApplicationServer*); @@ -84,18 +83,30 @@ class MMFilesEngine final : public StorageEngine { // flush wal wait for collector void stop() override; - - std::shared_ptr getReplicationApplierConfiguration(TRI_vocbase_t* vocbase, int& status) override; + + std::shared_ptr + getReplicationApplierConfiguration(TRI_vocbase_t* vocbase, + int& status) override; int removeReplicationApplierConfiguration(TRI_vocbase_t* vocbase) override; - int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, arangodb::velocypack::Slice slice, bool doSync) override; + int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, + arangodb::velocypack::Slice slice, + bool doSync) override; + int handleSyncKeys(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, TRI_voc_tick_t maxTick, + std::string& errorMsg) override; transaction::ContextData* createTransactionContextData() override; TransactionState* createTransactionState(TRI_vocbase_t*) override; - TransactionCollection* createTransactionCollection(TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel) override; + TransactionCollection* createTransactionCollection( + TransactionState* state, TRI_voc_cid_t cid, AccessMode::Type accessType, + int nestingLevel) override; // create storage-engine specific collection - PhysicalCollection* createPhysicalCollection(LogicalCollection*, VPackSlice const&) override; - + PhysicalCollection* createPhysicalCollection(LogicalCollection*, + VPackSlice const&) override; + // create storage-engine specific view PhysicalView* createPhysicalView(LogicalView*, VPackSlice const&) override; @@ -107,22 +118,27 @@ class MMFilesEngine final : public StorageEngine { // fully created (see "createDatabase" below). called at server start only void getDatabases(arangodb::velocypack::Builder& result) override; - // fills the provided builder with information about the collection - void getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid, - arangodb::velocypack::Builder& result, + // fills the provided builder with information about the collection + void getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid, + arangodb::velocypack::Builder& result, bool includeIndexes, TRI_voc_tick_t maxTick) override; - // fill the Builder object with an array of collections (and their corresponding - // indexes) that were detected by the storage engine. called at server start separately + // fill the Builder object with an array of collections (and their + // corresponding + // indexes) that were detected by the storage engine. called at server start + // separately // for each database - int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result, + int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, + arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) override; - - int getViews(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result) override; - + + int getViews(TRI_vocbase_t* vocbase, + arangodb::velocypack::Builder& result) override; + // return the path for a collection - std::string collectionPath(TRI_vocbase_t const* vocbase, TRI_voc_cid_t id) const override { - return collectionDirectory(vocbase->id(), id); + std::string collectionPath(TRI_vocbase_t const* vocbase, + TRI_voc_cid_t id) const override { + return collectionDirectory(vocbase->id(), id); } // database, collection and index management @@ -134,17 +150,23 @@ class MMFilesEngine final : public StorageEngine { } std::string versionFilename(TRI_voc_tick_t id) const override; - + void waitForSync(TRI_voc_tick_t tick) override; - virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override; - TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override { + virtual TRI_vocbase_t* openDatabase( + arangodb::velocypack::Slice const& parameters, bool isUpgrade, + int&) override; + TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, + arangodb::velocypack::Slice const& args, + int& status) override { status = TRI_ERROR_NO_ERROR; return createDatabaseMMFiles(id, args); } - int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) override; + int writeCreateDatabaseMarker(TRI_voc_tick_t id, + VPackSlice const& slice) override; - void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) override; + void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, + int& status) override; Result dropDatabase(TRI_vocbase_t* database) override; void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override; @@ -154,45 +176,51 @@ class MMFilesEngine final : public StorageEngine { // start compactor thread and delete files form collections marked as deleted void recoveryDone(TRI_vocbase_t* vocbase) override; -private: + private: int dropDatabaseMMFiles(TRI_vocbase_t* vocbase); - TRI_vocbase_t* createDatabaseMMFiles(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data); - -public: + TRI_vocbase_t* createDatabaseMMFiles(TRI_voc_tick_t id, + arangodb::velocypack::Slice const& data); + + public: // asks the storage engine to create a collection as specified in the VPack - // Slice object and persist the creation info. It is guaranteed by the server - // that no other active collection with the same name and id exists in the same - // database when this function is called. If this operation fails somewhere in - // the middle, the storage engine is required to fully clean up the creation - // and throw only then, so that subsequent collection creation requests will not fail. + // Slice object and persist the creation info. It is guaranteed by the server + // that no other active collection with the same name and id exists in the + // same + // database when this function is called. If this operation fails somewhere in + // the middle, the storage engine is required to fully clean up the creation + // and throw only then, so that subsequent collection creation requests will + // not fail. // the WAL entry for the collection creation will be written *after* the call // to "createCollection" returns std::string createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalCollection const*) override; - + // asks the storage engine to persist the collection. // After this call the collection is persisted over recovery. // This call will write wal markers. arangodb::Result persistCollection( - TRI_vocbase_t* vocbase, - arangodb::LogicalCollection const*) override; + TRI_vocbase_t* vocbase, arangodb::LogicalCollection const*) override; - // asks the storage engine to drop the specified collection and persist the - // deletion info. Note that physical deletion of the collection data must not + // asks the storage engine to drop the specified collection and persist the + // deletion info. Note that physical deletion of the collection data must not // be carried out by this call, as there may // still be readers of the collection's data. // This call will write the WAL entry for collection deletion - arangodb::Result dropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection*) override; - + arangodb::Result dropCollection(TRI_vocbase_t* vocbase, + arangodb::LogicalCollection*) override; + // perform a physical deletion of the collection // After this call data of this collection is corrupted, only perform if // assured that no one is using the collection anymore - void destroyCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection*) override; - - // asks the storage engine to change properties of the collection as specified in - // the VPack Slice object and persist them. If this operation fails - // somewhere in the middle, the storage engine is required to fully revert the - // property changes and throw only then, so that subsequent operations will not fail. + void destroyCollection(TRI_vocbase_t* vocbase, + arangodb::LogicalCollection*) override; + + // asks the storage engine to change properties of the collection as specified + // in + // the VPack Slice object and persist them. If this operation fails + // somewhere in the middle, the storage engine is required to fully revert the + // property changes and throw only then, so that subsequent operations will + // not fail. // the WAL entry for the propery change will be written *after* the call // to "changeCollection" returns void changeCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, @@ -201,82 +229,94 @@ public: // asks the storage engine to persist renaming of a collection // This will write a renameMarker if not in recovery - arangodb::Result renameCollection( - TRI_vocbase_t* vocbase, arangodb::LogicalCollection const*, - std::string const& oldName) override; + arangodb::Result renameCollection(TRI_vocbase_t* vocbase, + arangodb::LogicalCollection const*, + std::string const& oldName) override; // asks the storage engine to create an index as specified in the VPack - // Slice object and persist the creation info. The database id, collection id + // Slice object and persist the creation info. The database id, collection id // and index data are passed in the Slice object. Note that this function // is not responsible for inserting the individual documents into the index. - // If this operation fails somewhere in the middle, the storage engine is required - // to fully clean up the creation and throw only then, so that subsequent index + // If this operation fails somewhere in the middle, the storage engine is + // required + // to fully clean up the creation and throw only then, so that subsequent + // index // creation requests will not fail. // the WAL entry for the index creation will be written *after* the call // to "createIndex" returns void createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, - TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) override; - - // asks the storage engine to drop the specified index and persist the deletion - // info. Note that physical deletion of the index must not be carried out by this call, - // as there may still be users of the index. It is recommended that this operation + TRI_idx_iid_t id, + arangodb::velocypack::Slice const& data) override; + + // asks the storage engine to drop the specified index and persist the + // deletion + // info. Note that physical deletion of the index must not be carried out by + // this call, + // as there may still be users of the index. It is recommended that this + // operation // only sets a deletion flag for the index but let's an async task perform // the actual deletion. // the WAL entry for index deletion will be written *after* the call // to "dropIndex" returns void dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t id) override; - + void dropIndexWalMarker(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, - arangodb::velocypack::Slice const& data, bool writeMarker, int&) override; + arangodb::velocypack::Slice const& data, + bool writeMarker, int&) override; - void unloadCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) override; - - - void createView(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalView const*) override; - - arangodb::Result persistView( - TRI_vocbase_t* vocbase, - arangodb::LogicalView const*) override; + void unloadCollection(TRI_vocbase_t* vocbase, + arangodb::LogicalCollection* collection) override; + + void createView(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::LogicalView const*) override; + + arangodb::Result persistView(TRI_vocbase_t* vocbase, + arangodb::LogicalView const*) override; + + arangodb::Result dropView(TRI_vocbase_t* vocbase, + arangodb::LogicalView*) override; - arangodb::Result dropView(TRI_vocbase_t* vocbase, arangodb::LogicalView*) override; - void destroyView(TRI_vocbase_t* vocbase, arangodb::LogicalView*) override; - + void changeView(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalView const*, bool doSync) override; - - std::string createViewDirectoryName(std::string const& basePath, TRI_voc_cid_t id); - - void saveViewInfo(TRI_vocbase_t* vocbase, - TRI_voc_cid_t id, - arangodb::LogicalView const*, - bool forceSync) const; - + std::string createViewDirectoryName(std::string const& basePath, + TRI_voc_cid_t id); + + void saveViewInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::LogicalView const*, bool forceSync) const; + void signalCleanup(TRI_vocbase_t* vocbase) override; // document operations // ------------------- // iterate all documents of the underlying collection - // this is called when a collection is openend, and all its documents need to be added to + // this is called when a collection is openend, and all its documents need to + // be added to // indexes etc. - void iterateDocuments(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, - std::function const& cb) override; - + void iterateDocuments( + TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + std::function const& cb) + override; // adds a document to the storage engine - // this will be called by the WAL collector when surviving documents are being moved + // this will be called by the WAL collector when surviving documents are being + // moved // into the storage engine's realm - void addDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, - arangodb::velocypack::Slice const& document) override; - + void addDocumentRevision( + TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + arangodb::velocypack::Slice const& document) override; + // removes a document from the storage engine - // this will be called by the WAL collector when non-surviving documents are being removed + // this will be called by the WAL collector when non-surviving documents are + // being removed // from the storage engine's realm - void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, - arangodb::velocypack::Slice const& document) override; + void removeDocumentRevision( + TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + arangodb::velocypack::Slice const& document) override; /// @brief scans a collection and locates all files MMFilesEngineCollectionFiles scanCollectionDirectory(std::string const& path); @@ -285,94 +325,113 @@ public: bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override; /// @brief insert a compaction blocker - int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override; + int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, + TRI_voc_tick_t& id) override; /// @brief touch an existing compaction blocker - int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override; + int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, + double ttl) override; /// @brief remove an existing compaction blocker - int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override; + int removeCompactionBlocker(TRI_vocbase_t* vocbase, + TRI_voc_tick_t id) override; + + /// @brief a callback function that is run while it is guaranteed that there + /// is no compaction ongoing + void preventCompaction( + TRI_vocbase_t* vocbase, + std::function const& callback) override; - /// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing - void preventCompaction(TRI_vocbase_t* vocbase, - std::function const& callback) override; - /// @brief a callback function that is run there is no compaction ongoing bool tryPreventCompaction(TRI_vocbase_t* vocbase, std::function const& callback, bool checkForActiveBlockers) override; - + int shutdownDatabase(TRI_vocbase_t* vocbase) override; - - int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) override; - + + int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, + bool ignoreErrors) override; + /// @brief Add engine-specific AQL functions. void addAqlFunctions() override; - + /// @brief Add engine-specific optimizer rules void addOptimizerRules() override; - + /// @brief Add engine-specific V8 functions void addV8Functions() override; - + /// @brief Add engine-specific REST handlers void addRestHandlers(rest::RestHandlerFactory*) override; - + /// @brief transfer markers into a collection int transferMarkers(LogicalCollection* collection, MMFilesCollectorCache*, MMFilesOperationsType const&); - - std::string viewDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t viewId) const; + + std::string viewDirectory(TRI_voc_tick_t databaseId, + TRI_voc_cid_t viewId) const; private: /// @brief: check the initial markers in a datafile - bool checkDatafileHeader(MMFilesDatafile* datafile, std::string const& filename) const; + bool checkDatafileHeader(MMFilesDatafile* datafile, + std::string const& filename) const; /// @brief transfer markers into a collection, worker function - int transferMarkersWorker(LogicalCollection* collection, MMFilesCollectorCache*, + int transferMarkersWorker(LogicalCollection* collection, + MMFilesCollectorCache*, MMFilesOperationsType const&); /// @brief sync the active journal of a collection int syncJournalCollection(LogicalCollection* collection); - + /// @brief get the next free position for a new marker of the specified size - char* nextFreeMarkerPosition(LogicalCollection* collection, - TRI_voc_tick_t, MMFilesMarkerType, - TRI_voc_size_t, MMFilesCollectorCache*); + char* nextFreeMarkerPosition(LogicalCollection* collection, TRI_voc_tick_t, + MMFilesMarkerType, TRI_voc_size_t, + MMFilesCollectorCache*); /// @brief set the tick of a marker and calculate its CRC value void finishMarker(char const*, char*, LogicalCollection* collection, TRI_voc_tick_t, MMFilesCollectorCache*); - void verifyDirectories(); + void verifyDirectories(); std::vector getDatabaseNames() const; - /// @brief create a new database directory + /// @brief create a new database directory int createDatabaseDirectory(TRI_voc_tick_t id, std::string const& name); - + /// @brief save a parameter.json file for a database - int saveDatabaseParameters(TRI_voc_tick_t id, std::string const& name, bool deleted); - - arangodb::velocypack::Builder databaseToVelocyPack(TRI_voc_tick_t databaseId, - std::string const& name, + int saveDatabaseParameters(TRI_voc_tick_t id, std::string const& name, + bool deleted); + + arangodb::velocypack::Builder databaseToVelocyPack(TRI_voc_tick_t databaseId, + std::string const& name, bool deleted) const; std::string databaseDirectory(TRI_voc_tick_t databaseId) const; std::string databaseParametersFilename(TRI_voc_tick_t databaseId) const; - std::string collectionDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const; - std::string collectionParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const; - std::string viewParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t viewId) const; - std::string indexFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, TRI_idx_iid_t indexId) const; + std::string collectionDirectory(TRI_voc_tick_t databaseId, + TRI_voc_cid_t collectionId) const; + std::string collectionParametersFilename(TRI_voc_tick_t databaseId, + TRI_voc_cid_t collectionId) const; + std::string viewParametersFilename(TRI_voc_tick_t databaseId, + TRI_voc_cid_t viewId) const; + std::string indexFilename(TRI_voc_tick_t databaseId, + TRI_voc_cid_t collectionId, + TRI_idx_iid_t indexId) const; std::string indexFilename(TRI_idx_iid_t indexId) const; int openDatabases(); /// @brief open an existing database. internal function - TRI_vocbase_t* openExistingDatabase(TRI_voc_tick_t id, std::string const& name, bool wasCleanShutdown, bool isUpgrade); + TRI_vocbase_t* openExistingDatabase(TRI_voc_tick_t id, + std::string const& name, + bool wasCleanShutdown, bool isUpgrade); /// @brief note the maximum local tick void noteTick(TRI_voc_tick_t tick) { - if (tick > _maxTick) { _maxTick = tick; } + if (tick > _maxTick) { + _maxTick = tick; + } } /// @brief physically erases the database directory @@ -386,33 +445,37 @@ public: bool findMaxTickInJournals(std::string const& path); /// @brief create a full directory name for a collection - std::string createCollectionDirectoryName(std::string const& basePath, TRI_voc_cid_t cid); + std::string createCollectionDirectoryName(std::string const& basePath, + TRI_voc_cid_t cid); - void registerCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path); + void registerCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, + std::string const& path); void unregisterCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id); - void registerViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path); + void registerViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, + std::string const& path); void unregisterViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id); - void saveCollectionInfo(TRI_vocbase_t* vocbase, - TRI_voc_cid_t id, + void saveCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalCollection const* parameters, bool forceSync) const; - arangodb::velocypack::Builder loadCollectionInfo(TRI_vocbase_t* vocbase, std::string const& path); - arangodb::velocypack::Builder loadViewInfo(TRI_vocbase_t* vocbase, std::string const& path); - - // start the cleanup thread for the database + arangodb::velocypack::Builder loadCollectionInfo(TRI_vocbase_t* vocbase, + std::string const& path); + arangodb::velocypack::Builder loadViewInfo(TRI_vocbase_t* vocbase, + std::string const& path); + + // start the cleanup thread for the database int startCleanup(TRI_vocbase_t* vocbase); - // stop and delete the cleanup thread for the database + // stop and delete the cleanup thread for the database int stopCleanup(TRI_vocbase_t* vocbase); - - // start the compactor thread for the database + + // start the compactor thread for the database int startCompactor(TRI_vocbase_t* vocbase); // signal the compactor thread to stop int beginShutdownCompactor(TRI_vocbase_t* vocbase); // stop and delete the compactor thread for the database int stopCompactor(TRI_vocbase_t* vocbase); - + /// @brief writes a drop-database marker into the log int writeDropMarker(TRI_voc_tick_t id); @@ -428,11 +491,16 @@ public: std::vector> _deleted; arangodb::basics::ReadWriteLock mutable _pathsLock; - std::unordered_map> _collectionPaths; - std::unordered_map> _viewPaths; + std::unordered_map> + _collectionPaths; + std::unordered_map> + _viewPaths; struct CompactionBlocker { - CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {} + CompactionBlocker(TRI_voc_tick_t id, double expires) + : _id(id), _expires(expires) {} CompactionBlocker() = delete; TRI_voc_tick_t _id; @@ -441,9 +509,11 @@ public: // lock for compaction blockers arangodb::basics::ReadWriteLock mutable _compactionBlockersLock; - // cross-database map of compaction blockers, protected by _compactionBlockersLock - std::unordered_map> _compactionBlockers; - + // cross-database map of compaction blockers, protected by + // _compactionBlockersLock + std::unordered_map> + _compactionBlockers; + // lock for threads arangodb::Mutex _threadsLock; // per-database compactor threads, protected by _threadsLock @@ -451,7 +521,6 @@ public: // per-database cleanup threads, protected by _threadsLock std::unordered_map _cleanupThreads; }; - } #endif diff --git a/arangod/MMFiles/MMFilesInitialSync.h b/arangod/MMFiles/MMFilesInitialSync.h new file mode 100644 index 0000000000..51358b6ddb --- /dev/null +++ b/arangod/MMFiles/MMFilesInitialSync.h @@ -0,0 +1,712 @@ +#include "Replication/InitialSyncer.h" + +#include "Basics/StaticStrings.h" +#include "Basics/StringUtils.h" +#include "MMFiles/MMFilesCollection.h" +#include "MMFiles/MMFilesDatafileHelper.h" +#include "MMFiles/MMFilesDitch.h" +#include "MMFiles/MMFilesIndexElement.h" +#include "MMFiles/MMFilesPrimaryIndex.h" +#include "SimpleHttpClient/SimpleHttpClient.h" +#include "SimpleHttpClient/SimpleHttpResult.h" +#include "Transaction/Helpers.h" +#include "Utils/OperationOptions.h" + +#include +#include +#include +#include + +//////////////////////////////////////////////////////////////////////////////// +/// @brief incrementally fetch data from a collection +//////////////////////////////////////////////////////////////////////////////// + +namespace arangodb { +//////////////////////////////////////////////////////////////////////////////// +/// @brief performs a binary search for the given key in the markers vector +//////////////////////////////////////////////////////////////////////////////// + +static bool BinarySearch(std::vector const& markers, + std::string const& key, size_t& position) { + TRI_ASSERT(!markers.empty()); + + size_t l = 0; + size_t r = markers.size() - 1; + + while (true) { + // determine midpoint + position = l + ((r - l) / 2); + + TRI_ASSERT(position < markers.size()); + VPackSlice const otherSlice(markers.at(position)); + VPackSlice const otherKey = otherSlice.get(StaticStrings::KeyString); + + int res = key.compare(otherKey.copyString()); + + if (res == 0) { + return true; + } + + if (res < 0) { + if (position == 0) { + return false; + } + r = position - 1; + } else { + l = position + 1; + } + + if (r < l) { + return false; + } + } +} + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief finds a key range in the markers vector +//////////////////////////////////////////////////////////////////////////////// + +static bool FindRange(std::vector const& markers, + std::string const& lower, std::string const& upper, + size_t& lowerPos, size_t& upperPos) { + bool found = false; + + if (!markers.empty()) { + found = BinarySearch(markers, lower, lowerPos); + + if (found) { + found = BinarySearch(markers, upper, upperPos); + } + } + + return found; +} + +int handleSyncKeysMMFiles(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, + std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, + std::string& errorMsg) { + + std::string progress = + "collecting local keys for collection '" + collectionName + "'"; + syncer.setProgress(progress); + + // fetch all local keys from primary index + std::vector markers; + + MMFilesDocumentDitch* ditch = nullptr; + + // acquire a replication ditch so no datafiles are thrown away from now on + // note: the ditch also protects against unloading the collection + { + SingleCollectionTransaction trx( + transaction::StandaloneContext::Create(syncer.vocbase()), col->cid(), + AccessMode::Type::READ); + + Result res = trx.begin(); + + if (!res.ok()) { + errorMsg = + std::string("unable to start transaction: ") + res.errorMessage(); + res.reset(res.errorNumber(), errorMsg); + return res.errorNumber(); + } + + ditch = arangodb::MMFilesCollection::toMMFilesCollection(col) + ->ditches() + ->createMMFilesDocumentDitch(false, __FILE__, __LINE__); + + if (ditch == nullptr) { + return TRI_ERROR_OUT_OF_MEMORY; + } + } + + TRI_ASSERT(ditch != nullptr); + + TRI_DEFER(arangodb::MMFilesCollection::toMMFilesCollection(col) + ->ditches() + ->freeDitch(ditch)); + + { + SingleCollectionTransaction trx( + transaction::StandaloneContext::Create(syncer.vocbase()), col->cid(), + AccessMode::Type::READ); + + Result res = trx.begin(); + + if (!res.ok()) { + errorMsg = + std::string("unable to start transaction: ") + res.errorMessage(); + res.reset(res.errorNumber(), errorMsg); + return res.errorNumber(); + } + + // We do not take responsibility for the index. + // The LogicalCollection is protected by trx. + // Neither it nor it's indexes can be invalidated + + markers.reserve(trx.documentCollection()->numberDocuments(&trx)); + + uint64_t iterations = 0; + ManagedDocumentResult mmdr; + trx.invokeOnAllElements( + trx.name(), [&syncer, &trx, &mmdr, &markers, + &iterations](DocumentIdentifierToken const& token) { + if (trx.documentCollection()->readDocument(&trx, token, mmdr)) { + markers.emplace_back(mmdr.vpack()); + + if (++iterations % 10000 == 0) { + if (syncer.checkAborted()) { + return false; + } + } + } + return true; + }); + + if (syncer.checkAborted()) { + return TRI_ERROR_REPLICATION_APPLIER_STOPPED; + } + + syncer.sendExtendBatch(); + syncer.sendExtendBarrier(); + + + + std::string progress = "sorting " + std::to_string(markers.size()) + + " local key(s) for collection '" + collectionName + + "'"; + syncer.setProgress(progress); + + // sort all our local keys + std::sort( + markers.begin(), markers.end(), + [](uint8_t const* lhs, uint8_t const* rhs) -> bool { + VPackSlice const l(lhs); + VPackSlice const r(rhs); + + VPackValueLength lLength, rLength; + char const* lKey = l.get(StaticStrings::KeyString).getString(lLength); + char const* rKey = r.get(StaticStrings::KeyString).getString(rLength); + + size_t const length = + static_cast(lLength < rLength ? lLength : rLength); + int res = memcmp(lKey, rKey, length); + + if (res < 0) { + // left is smaller than right + return true; + } + if (res == 0 && lLength < rLength) { + // left is equal to right, but of shorter length + return true; + } + + return false; + }); + } + + if (syncer.checkAborted()) { + return TRI_ERROR_REPLICATION_APPLIER_STOPPED; + } + + syncer.sendExtendBatch(); + syncer.sendExtendBarrier(); + + std::vector toFetch; + + TRI_voc_tick_t const chunkSize = 5000; + std::string const baseUrl = syncer.BaseUrl + "/keys"; + + std::string url = + baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize); + progress = "fetching remote keys chunks for collection '" + collectionName + + "' from " + url; + syncer.setProgress(progress); + + std::unique_ptr response( + syncer._client->retryRequest(rest::RequestType::GET, url, nullptr, 0)); + + if (response == nullptr || !response->isComplete()) { + errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint + + ": " + syncer._client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint + + ": HTTP " + basics::StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + auto builder = std::make_shared(); + int res = syncer.parseResponse(builder, response.get()); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "got invalid response from master at " + + std::string(syncer._masterInfo._endpoint) + + ": invalid response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const slice = builder->slice(); + + if (!slice.isArray()) { + errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + OperationOptions options; + options.silent = true; + options.ignoreRevs = true; + options.isRestore = true; + + VPackBuilder keyBuilder; + size_t const n = static_cast(slice.length()); + + // remove all keys that are below first remote key or beyond last remote key + if (n > 0) { + // first chunk + SingleCollectionTransaction trx( + transaction::StandaloneContext::Create(syncer._vocbase), col->cid(), + AccessMode::Type::WRITE); + + Result res = trx.begin(); + + if (!res.ok()) { + errorMsg = + std::string("unable to start transaction: ") + res.errorMessage(); + res.reset(res.errorNumber(), errorMsg); + return res.errorNumber(); + } + + VPackSlice chunk = slice.at(0); + + TRI_ASSERT(chunk.isObject()); + auto lowSlice = chunk.get("low"); + TRI_ASSERT(lowSlice.isString()); + + std::string const lowKey(lowSlice.copyString()); + + for (size_t i = 0; i < markers.size(); ++i) { + VPackSlice const k(markers[i]); + + std::string const key(k.get(StaticStrings::KeyString).copyString()); + + if (key.compare(lowKey) >= 0) { + break; + } + + keyBuilder.clear(); + keyBuilder.openObject(); + keyBuilder.add(StaticStrings::KeyString, VPackValue(key)); + keyBuilder.close(); + + trx.remove(collectionName, keyBuilder.slice(), options); + } + + // last high + chunk = slice.at(n - 1); + TRI_ASSERT(chunk.isObject()); + + auto highSlice = chunk.get("high"); + TRI_ASSERT(highSlice.isString()); + + std::string const highKey(highSlice.copyString()); + + for (size_t i = markers.size(); i >= 1; --i) { + VPackSlice const k(markers[i - 1]); + + std::string const key(k.get(StaticStrings::KeyString).copyString()); + + if (key.compare(highKey) <= 0) { + break; + } + + keyBuilder.clear(); + keyBuilder.openObject(); + keyBuilder.add(StaticStrings::KeyString, VPackValue(key)); + keyBuilder.close(); + + trx.remove(collectionName, keyBuilder.slice(), options); + } + + trx.commit(); + } + + size_t nextStart = 0; + + // now process each chunk + for (size_t i = 0; i < n; ++i) { + if (syncer.checkAborted()) { + return TRI_ERROR_REPLICATION_APPLIER_STOPPED; + } + + SingleCollectionTransaction trx( + transaction::StandaloneContext::Create(syncer._vocbase), col->cid(), + AccessMode::Type::WRITE); + + Result res = trx.begin(); + + if (!res.ok()) { + errorMsg = + std::string("unable to start transaction: ") + res.errorMessage(); + res.reset(res.errorNumber(), res.errorMessage()); + return res.errorNumber(); + } + + trx.pinData(col->cid()); // will throw when it fails + + // We do not take responsibility for the index. + // The LogicalCollection is protected by trx. + // Neither it nor it's indexes can be invalidated + + // TODO Move to MMFiles + auto physical = static_cast( + trx.documentCollection()->getPhysical()); + auto idx = physical->primaryIndex(); + + size_t const currentChunkId = i; + progress = "processing keys chunk " + std::to_string(currentChunkId) + + " for collection '" + collectionName + "'"; + syncer.setProgress(progress); + + syncer.sendExtendBatch(); + syncer.sendExtendBarrier(); + + // read remote chunk + VPackSlice chunk = slice.at(i); + + if (!chunk.isObject()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": chunk is no object"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const lowSlice = chunk.get("low"); + VPackSlice const highSlice = chunk.get("high"); + VPackSlice const hashSlice = chunk.get("hash"); + + if (!lowSlice.isString() || !highSlice.isString() || + !hashSlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + + ": chunks in response have an invalid format"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + std::string const lowString = lowSlice.copyString(); + std::string const highString = highSlice.copyString(); + + size_t localFrom; + size_t localTo; + bool match = FindRange(markers, lowString, highString, localFrom, localTo); + + if (match) { + // now must hash the range + uint64_t hash = 0x012345678; + + for (size_t i = localFrom; i <= localTo; ++i) { + TRI_ASSERT(i < markers.size()); + VPackSlice const current(markers.at(i)); + hash ^= current.get(StaticStrings::KeyString).hashString(); + hash ^= current.get(StaticStrings::RevString).hash(); + } + + if (std::to_string(hash) != hashSlice.copyString()) { + match = false; + } + } + + if (match) { + // match + nextStart = localTo + 1; + } else { + // no match + // must transfer keys for non-matching range + std::string url = baseUrl + "/" + keysId + + "?type=keys&chunk=" + std::to_string(i) + + "&chunkSize=" + std::to_string(chunkSize); + progress = "fetching keys chunk " + std::to_string(currentChunkId) + + " for collection '" + collectionName + "' from " + url; + syncer.setProgress(progress); + + std::unique_ptr response( + syncer._client->retryRequest(rest::RequestType::PUT, url, nullptr, 0)); + + if (response == nullptr || !response->isComplete()) { + errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint + + ": " + syncer._client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": HTTP " + + basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " + + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + auto builder = std::make_shared(); + int res = syncer.parseResponse(builder, response.get()); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const slice = builder->slice(); + if (!slice.isArray()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + // delete all keys at start of the range + while (nextStart < markers.size()) { + VPackSlice const keySlice(markers[nextStart]); + std::string const localKey( + keySlice.get(StaticStrings::KeyString).copyString()); + + if (localKey.compare(lowString) < 0) { + // we have a local key that is not present remotely + keyBuilder.clear(); + keyBuilder.openObject(); + keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey)); + keyBuilder.close(); + + trx.remove(collectionName, keyBuilder.slice(), options); + ++nextStart; + } else { + break; + } + } + + toFetch.clear(); + + size_t const n = static_cast(slice.length()); + TRI_ASSERT(n > 0); + + for (size_t i = 0; i < n; ++i) { + VPackSlice const pair = slice.at(i); + + if (!pair.isArray() || pair.length() != 2) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + + ": response key pair is no valid array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + // key + VPackSlice const keySlice = pair.at(0); + + if (!keySlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response key is no string"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + // rid + if (markers.empty()) { + // no local markers + toFetch.emplace_back(i); + continue; + } + + std::string const keyString = keySlice.copyString(); + + while (nextStart < markers.size()) { + VPackSlice const localKeySlice(markers[nextStart]); + std::string const localKey( + localKeySlice.get(StaticStrings::KeyString).copyString()); + + int res = localKey.compare(keyString); + + if (res != 0) { + // we have a local key that is not present remotely + keyBuilder.clear(); + keyBuilder.openObject(); + keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey)); + keyBuilder.close(); + + trx.remove(collectionName, keyBuilder.slice(), options); + ++nextStart; + } else { + // key match + break; + } + } + + MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice); + + if (!element) { + // key not found locally + toFetch.emplace_back(i); + } else if (TRI_RidToString(element.revisionId()) != + pair.at(1).copyString()) { + // key found, but revision id differs + toFetch.emplace_back(i); + ++nextStart; + } else { + // a match - nothing to do! + ++nextStart; + } + } + + // calculate next starting point + if (!markers.empty()) { + BinarySearch(markers, highString, nextStart); + + while (nextStart < markers.size()) { + VPackSlice const localKeySlice(markers[nextStart]); + std::string const localKey( + localKeySlice.get(StaticStrings::KeyString).copyString()); + + int res = localKey.compare(highString); + + if (res <= 0) { + ++nextStart; + } else { + break; + } + } + } + + if (!toFetch.empty()) { + VPackBuilder keysBuilder; + keysBuilder.openArray(); + for (auto& it : toFetch) { + keysBuilder.add(VPackValue(it)); + } + keysBuilder.close(); + + std::string url = baseUrl + "/" + keysId + + "?type=docs&chunk=" + std::to_string(currentChunkId) + + "&chunkSize=" + std::to_string(chunkSize); + progress = "fetching documents chunk " + + std::to_string(currentChunkId) + " for collection '" + + collectionName + "' from " + url; + syncer.setProgress(progress); + + std::string const keyJsonString(keysBuilder.slice().toJson()); + + std::unique_ptr response( + syncer._client->retryRequest(rest::RequestType::PUT, url, + keyJsonString.c_str(), keyJsonString.size())); + + if (response == nullptr || !response->isComplete()) { + errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint + + ": " + syncer._client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": HTTP " + + basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " + + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + auto builder = std::make_shared(); + int res = syncer.parseResponse(builder, response.get()); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "got invalid response from master at " + + std::string(syncer._masterInfo._endpoint) + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const slice = builder->slice(); + if (!slice.isArray()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + for (auto const& it : VPackArrayIterator(slice)) { + if (!it.isObject()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": document is no object"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const keySlice = it.get(StaticStrings::KeyString); + + if (!keySlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": document key is invalid"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const revSlice = it.get(StaticStrings::RevString); + + if (!revSlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": document revision is invalid"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice); + + if (!element) { + // INSERT + OperationResult opRes = trx.insert(collectionName, it, options); + res = opRes.code; + } else { + // UPDATE + OperationResult opRes = trx.update(collectionName, it, options); + res = opRes.code; + } + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } + } + } + + res = trx.commit(); + + if (!res.ok()) { + return res.errorNumber(); + } + } + + return res; +} +} diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 3accaa1700..de09ecd775 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -32,11 +32,6 @@ #include "Indexes/Index.h" #include "Indexes/IndexIterator.h" #include "Logger/Logger.h" -#include "MMFiles/MMFilesCollection.h" //TODO -- Remove -- ditches -#include "MMFiles/MMFilesDatafileHelper.h" -#include "MMFiles/MMFilesDitch.h" -#include "MMFiles/MMFilesIndexElement.h" -#include "MMFiles/MMFilesPrimaryIndex.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCommon.h" #include "SimpleHttpClient/SimpleHttpClient.h" @@ -64,66 +59,6 @@ using namespace arangodb::httpclient; using namespace arangodb::rest; using namespace arangodb::rocksutils; -//////////////////////////////////////////////////////////////////////////////// -/// @brief performs a binary search for the given key in the markers vector -//////////////////////////////////////////////////////////////////////////////// - -static bool BinarySearch(std::vector const& markers, - std::string const& key, size_t& position) { - TRI_ASSERT(!markers.empty()); - - size_t l = 0; - size_t r = markers.size() - 1; - - while (true) { - // determine midpoint - position = l + ((r - l) / 2); - - TRI_ASSERT(position < markers.size()); - VPackSlice const otherSlice(markers.at(position)); - VPackSlice const otherKey = otherSlice.get(StaticStrings::KeyString); - - int res = key.compare(otherKey.copyString()); - - if (res == 0) { - return true; - } - - if (res < 0) { - if (position == 0) { - return false; - } - r = position - 1; - } else { - l = position + 1; - } - - if (r < l) { - return false; - } - } -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief finds a key range in the markers vector -//////////////////////////////////////////////////////////////////////////////// - -static bool FindRange(std::vector const& markers, - std::string const& lower, std::string const& upper, - size_t& lowerPos, size_t& upperPos) { - bool found = false; - - if (!markers.empty()) { - found = BinarySearch(markers, lower, lowerPos); - - if (found) { - found = BinarySearch(markers, upper, upperPos); - } - } - - return found; -} - size_t const InitialSyncer::MaxChunkSize = 10 * 1024 * 1024; InitialSyncer::InitialSyncer( @@ -1026,16 +961,8 @@ int InitialSyncer::handleCollectionSync(arangodb::LogicalCollection* col, // now we can fetch the complete chunk information from the master try { - if (std::strcmp("mmfiles", EngineSelectorFeature::engineName()) == 0) { - res = handleSyncKeysMMFiles(col, id.copyString(), cid, collectionName, - maxTick, errorMsg); - } else if (std::strcmp("rocksdb", EngineSelectorFeature::engineName()) == - 0) { - res = handleSyncKeysRocksDB(col, id.copyString(), cid, collectionName, - maxTick, errorMsg); - } else { - THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); - } + res = EngineSelectorFeature::ENGINE->handleSyncKeys( + *this, col, id.copyString(), cid, collectionName, maxTick, errorMsg); } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (std::exception const& ex) { @@ -1052,1210 +979,6 @@ int InitialSyncer::handleCollectionSync(arangodb::LogicalCollection* col, /// @brief incrementally fetch data from a collection //////////////////////////////////////////////////////////////////////////////// -int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col, - std::string const& keysId, - std::string const& cid, - std::string const& collectionName, - TRI_voc_tick_t maxTick, - std::string& errorMsg) { - std::string progress = - "collecting local keys for collection '" + collectionName + "'"; - setProgress(progress); - - if (checkAborted()) { - return TRI_ERROR_REPLICATION_APPLIER_STOPPED; - } - - sendExtendBatch(); - sendExtendBarrier(); - - TRI_voc_tick_t const chunkSize = 5000; - std::string const baseUrl = BaseUrl + "/keys"; - - std::string url = - baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize); - progress = "fetching remote keys chunks for collection '" + collectionName + - "' from " + url; - setProgress(progress); - - std::unique_ptr response( - _client->retryRequest(rest::RequestType::GET, url, nullptr, 0)); - - if (response == nullptr || !response->isComplete()) { - errorMsg = "could not connect to master at " + _masterInfo._endpoint + - ": " + _client->getErrorMessage(); - - return TRI_ERROR_REPLICATION_NO_RESPONSE; - } - - TRI_ASSERT(response != nullptr); - - if (response->wasHttpError()) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + - ": " + response->getHttpReturnMessage(); - - return TRI_ERROR_REPLICATION_MASTER_ERROR; - } - - auto builder = std::make_shared(); - int res = parseResponse(builder, response.get()); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "got invalid response from master at " + - std::string(_masterInfo._endpoint) + - ": invalid response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const chunkSlice = builder->slice(); - - if (!chunkSlice.isArray()) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - ManagedDocumentResult mmdr; - OperationOptions options; - options.silent = true; - options.ignoreRevs = true; - options.isRestore = true; - - VPackBuilder keyBuilder; - size_t const numChunks = static_cast(chunkSlice.length()); - - // remove all keys that are below first remote key or beyond last remote key - if (numChunks > 0) { - // first chunk - SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(_vocbase), col->cid(), - AccessMode::Type::WRITE); - - Result res = trx.begin(); - - if (!res.ok()) { - errorMsg = - std::string("unable to start transaction: ") + res.errorMessage(); - res.reset(res.errorNumber(), errorMsg); - return res.errorNumber(); - } - - VPackSlice chunk = chunkSlice.at(0); - - TRI_ASSERT(chunk.isObject()); - auto lowSlice = chunk.get("low"); - TRI_ASSERT(lowSlice.isString()); - - // last high - chunk = chunkSlice.at(numChunks - 1); - TRI_ASSERT(chunk.isObject()); - - auto highSlice = chunk.get("high"); - TRI_ASSERT(highSlice.isString()); - - std::string const lowKey(lowSlice.copyString()); - std::string const highKey(highSlice.copyString()); - - LogicalCollection* coll = trx.documentCollection(); - std::unique_ptr iterator = - coll->getAllIterator(&trx, &mmdr, false); - iterator->next( - [&](DocumentIdentifierToken const& token) { - if (coll->readDocument(&trx, token, mmdr) == false) { - return; - } - VPackSlice doc(mmdr.vpack()); - VPackSlice key = doc.get(StaticStrings::KeyString); - if (key.compareString(lowKey.data(), lowKey.length()) < 0) { - trx.remove(collectionName, key, options); - } else if (key.compareString(highKey.data(), highKey.length()) > 0) { - trx.remove(collectionName, key, options); - } - }, - UINT64_MAX); - - trx.commit(); - } - - { - if (checkAborted()) { - return TRI_ERROR_REPLICATION_APPLIER_STOPPED; - } - - SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(_vocbase), col->cid(), - AccessMode::Type::WRITE); - - Result res = trx.begin(); - - if (!res.ok()) { - errorMsg = - std::string("unable to start transaction: ") + res.errorMessage(); - res.reset(res.errorNumber(), res.errorMessage()); - return res.errorNumber(); - } - - // We do not take responsibility for the index. - // The LogicalCollection is protected by trx. - // Neither it nor it's indexes can be invalidated - - size_t currentChunkId = 0; - - std::string lowKey; - std::string highKey; - std::string hashString; - uint64_t localHash = 0x012345678; - // chunk keys + revisionId - std::vector> markers; - bool foundLowKey = false; - - auto resetChunk = [&]() -> void { - sendExtendBatch(); - sendExtendBarrier(); - - progress = "processing keys chunk " + std::to_string(currentChunkId) + - " for collection '" + collectionName + "'"; - setProgress(progress); - - // read remote chunk - VPackSlice chunk = chunkSlice.at(currentChunkId); - if (!chunk.isObject()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": chunk is no object"; - THROW_ARANGO_EXCEPTION(TRI_ERROR_REPLICATION_INVALID_RESPONSE); - } - - VPackSlice const lowSlice = chunk.get("low"); - VPackSlice const highSlice = chunk.get("high"); - VPackSlice const hashSlice = chunk.get("hash"); - if (!lowSlice.isString() || !highSlice.isString() || - !hashSlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + - ": chunks in response have an invalid format"; - THROW_ARANGO_EXCEPTION(TRI_ERROR_REPLICATION_INVALID_RESPONSE); - } - - // now reset chunk information - markers.clear(); - lowKey = lowSlice.copyString(); - highKey = highSlice.copyString(); - hashString = hashSlice.copyString(); - localHash = 0x012345678; - foundLowKey = false; - }; - // set to first chunk - resetChunk(); - - std::function parseDoc = [&](VPackSlice doc, - VPackSlice key) { - - bool rangeUnequal = false; - bool nextChunk = false; - - int cmp1 = key.compareString(lowKey.data(), lowKey.length()); - int cmp2 = key.compareString(highKey.data(), highKey.length()); - - if (cmp1 < 0) { - // smaller values than lowKey mean they don't exist remotely - trx.remove(collectionName, key, options); - return; - } else if (cmp1 >= 0 && cmp2 <= 0) { - // we only need to hash we are in the range - if (cmp1 == 0) { - foundLowKey = true; - } - - markers.emplace_back(key.copyString(), TRI_ExtractRevisionId(doc)); - // don't bother hashing if we have't found lower key - if (foundLowKey) { - VPackSlice revision = doc.get(StaticStrings::RevString); - localHash ^= key.hashString(); - localHash ^= revision.hash(); - - if (cmp2 == 0) { // found highKey - rangeUnequal = std::to_string(localHash) != hashString; - nextChunk = true; - } - } else if (cmp2 == 0) { // found high key, but not low key - rangeUnequal = true; - nextChunk = true; - } - } else if (cmp2 > 0) { // higher than highKey - // current range was unequal and we did not find the - // high key. Load range and skip to next - rangeUnequal = true; - nextChunk = true; - } - - - TRI_ASSERT(!rangeUnequal || nextChunk); // A => B - if (nextChunk) {// we are out of range, see next chunk - if (rangeUnequal && currentChunkId < numChunks) { - int res = syncChunkRocksDB(&trx, keysId, currentChunkId, lowKey, - highKey, markers, errorMsg); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); - } - } - currentChunkId++; - if (currentChunkId < numChunks) { - resetChunk(); - // key is higher than upper bound, recheck the current document - if (cmp2 > 0) { - parseDoc(doc, key); - } - } - } - }; - - std::unique_ptr iterator = - col->getAllIterator(&trx, &mmdr, false); - iterator->next( - [&](DocumentIdentifierToken const& token) { - if (col->readDocument(&trx, token, mmdr) == false) { - return; - } - VPackSlice doc(mmdr.vpack()); - VPackSlice key = doc.get(StaticStrings::KeyString); - parseDoc(doc, key); - }, - UINT64_MAX); - - // we might have missed chunks, if the keys don't exist at all locally - while (currentChunkId < numChunks) { - int res = syncChunkRocksDB(&trx, keysId, currentChunkId, lowKey, - highKey, markers, errorMsg); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); - } - currentChunkId++; - if (currentChunkId < numChunks) { - resetChunk(); - } - } - - res = trx.commit(); - if (!res.ok()) { - return res.errorNumber(); - } - } - - return res; -} - -int InitialSyncer::syncChunkRocksDB( - SingleCollectionTransaction* trx, std::string const& keysId, - uint64_t chunkId, std::string const& lowString, - std::string const& highString, - std::vector> markers, - std::string& errorMsg) { - std::string const baseUrl = BaseUrl + "/keys"; - TRI_voc_tick_t const chunkSize = 5000; - std::string const& collectionName = trx->documentCollection()->name(); - PhysicalCollection* physical = trx->documentCollection()->getPhysical(); - OperationOptions options; - options.silent = true; - options.ignoreRevs = true; - options.isRestore = true; - - // no match - // must transfer keys for non-matching range - std::string url = baseUrl + "/" + keysId + - "?type=keys&chunk=" + std::to_string(chunkId) + - "&chunkSize=" + std::to_string(chunkSize) + - "&low=" + lowString;// send over low chunk for seeking - - std::string progress = - "fetching keys chunk '" + std::to_string(chunkId) + "' from " + url; - setProgress(progress); - - std::unique_ptr response( - _client->retryRequest(rest::RequestType::PUT, url, nullptr, 0)); - - if (response == nullptr || !response->isComplete()) { - errorMsg = "could not connect to master at " + _masterInfo._endpoint + - ": " + _client->getErrorMessage(); - - return TRI_ERROR_REPLICATION_NO_RESPONSE; - } - - TRI_ASSERT(response != nullptr); - - if (response->wasHttpError()) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + - ": " + response->getHttpReturnMessage(); - - return TRI_ERROR_REPLICATION_MASTER_ERROR; - } - - auto builder = std::make_shared(); - int res = parseResponse(builder, response.get()); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const responseBody = builder->slice(); - if (!responseBody.isArray()) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - transaction::BuilderLeaser keyBuilder(trx); - /*size_t nextStart = 0; - // delete all keys at start of the range - while (nextStart < markers.size()) { - std::string const& localKey = markers[nextStart].first; - - if ( localKey.compare(lowString) < 0) { - // we have a local key that is not present remotely - keyBuilder.clear(); - keyBuilder.openObject(); - keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey)); - keyBuilder.close(); - - trx.remove(collectionName, keyBuilder.slice(), options); - ++nextStart; - } else { - break; - } - }*/ - - std::vector toFetch; - - size_t const numKeys = static_cast(responseBody.length()); - if (numKeys == 0) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": response contains an empty chunk. Collection: " + collectionName + - " Chunk: " + std::to_string(chunkId); - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - TRI_ASSERT(numKeys > 0); - - size_t i = 0; - size_t nextStart = 0; - - for (VPackSlice const& pair : VPackArrayIterator(responseBody)) { - if (!pair.isArray() || pair.length() != 2) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + - ": response key pair is no valid array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - // key - VPackSlice const keySlice = pair.at(0); - if (!keySlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": response key is no string"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - // rid - if (markers.empty()) { - // no local markers - toFetch.emplace_back(i); - i++; - continue; - } - - // remove keys not present anymore - while (nextStart < markers.size()) { - std::string const& localKey = markers[nextStart].first; - - int res = keySlice.compareString(localKey); - if (res != 0) { - // we have a local key that is not present remotely - keyBuilder->clear(); - keyBuilder->openObject(); - keyBuilder->add(StaticStrings::KeyString, VPackValue(localKey)); - keyBuilder->close(); - - trx->remove(collectionName, keyBuilder->slice(), options); - ++nextStart; - } else { - // key match - break; - } - } - - // see if key exists - DocumentIdentifierToken token = physical->lookupKey(trx, keySlice); - if (token._data == 0) { - // key not found locally - toFetch.emplace_back(i); - } else if (TRI_RidToString(token._data) != pair.at(1).copyString()) { - // key found, but revision id differs - toFetch.emplace_back(i); - ++nextStart; - } else { - // a match - nothing to do! - ++nextStart; - } - - i++; - } - - // delete all keys at end of the range - while (nextStart < markers.size()) { - std::string const& localKey = markers[nextStart].first; - - TRI_ASSERT(localKey.compare(highString) > 0); - //if (localKey.compare(highString) > 0) { - // we have a local key that is not present remotely - keyBuilder->clear(); - keyBuilder->openObject(); - keyBuilder->add(StaticStrings::KeyString, VPackValue(localKey)); - keyBuilder->close(); - - trx->remove(collectionName, keyBuilder->slice(), options); - //} - ++nextStart; - } - - if (!toFetch.empty()) { - VPackBuilder keysBuilder; - keysBuilder.openArray(); - for (auto& it : toFetch) { - keysBuilder.add(VPackValue(it)); - } - keysBuilder.close(); - - std::string url = baseUrl + "/" + keysId + - "?type=docs&chunk=" + std::to_string(chunkId) + - "&chunkSize=" + std::to_string(chunkSize) + - "&low=" + lowString;// send over low chunk for seeking - progress = "fetching documents chunk " + std::to_string(chunkId) + - " for collection '" + collectionName + "' from " + url; - setProgress(progress); - - std::string const keyJsonString(keysBuilder.slice().toJson()); - - std::unique_ptr response( - _client->retryRequest(rest::RequestType::PUT, url, - keyJsonString.c_str(), keyJsonString.size())); - - if (response == nullptr || !response->isComplete()) { - errorMsg = "could not connect to master at " + _masterInfo._endpoint + - ": " + _client->getErrorMessage(); - - return TRI_ERROR_REPLICATION_NO_RESPONSE; - } - - TRI_ASSERT(response != nullptr); - - if (response->wasHttpError()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": HTTP " + - StringUtils::itoa(response->getHttpReturnCode()) + ": " + - response->getHttpReturnMessage(); - - return TRI_ERROR_REPLICATION_MASTER_ERROR; - } - - auto builder = std::make_shared(); - int res = parseResponse(builder, response.get()); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "got invalid response from master at " + - std::string(_masterInfo._endpoint) + ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const slice = builder->slice(); - if (!slice.isArray()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - for (auto const& it : VPackArrayIterator(slice)) { - if (!it.isObject()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": document is no object"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const keySlice = it.get(StaticStrings::KeyString); - - if (!keySlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": document key is invalid"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const revSlice = it.get(StaticStrings::RevString); - - if (!revSlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": document revision is invalid"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - DocumentIdentifierToken token = physical->lookupKey(trx, keySlice); - - if (!token._data) { - // INSERT - OperationResult opRes = trx->insert(collectionName, it, options); - res = opRes.code; - } else { - // UPDATE - OperationResult opRes = trx->update(collectionName, it, options); - res = opRes.code; - } - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - } - } - return TRI_ERROR_NO_ERROR; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief incrementally fetch data from a collection -//////////////////////////////////////////////////////////////////////////////// - -int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col, - std::string const& keysId, - std::string const& cid, - std::string const& collectionName, - TRI_voc_tick_t maxTick, - std::string& errorMsg) { - std::string progress = - "collecting local keys for collection '" + collectionName + "'"; - setProgress(progress); - - // fetch all local keys from primary index - std::vector markers; - - MMFilesDocumentDitch* ditch = nullptr; - - // acquire a replication ditch so no datafiles are thrown away from now on - // note: the ditch also protects against unloading the collection - { - SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(_vocbase), col->cid(), - AccessMode::Type::READ); - - Result res = trx.begin(); - - if (!res.ok()) { - errorMsg = - std::string("unable to start transaction: ") + res.errorMessage(); - res.reset(res.errorNumber(), errorMsg); - return res.errorNumber(); - } - - ditch = arangodb::MMFilesCollection::toMMFilesCollection(col) - ->ditches() - ->createMMFilesDocumentDitch(false, __FILE__, __LINE__); - - if (ditch == nullptr) { - return TRI_ERROR_OUT_OF_MEMORY; - } - } - - TRI_ASSERT(ditch != nullptr); - - TRI_DEFER(arangodb::MMFilesCollection::toMMFilesCollection(col) - ->ditches() - ->freeDitch(ditch)); - - { - SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(_vocbase), col->cid(), - AccessMode::Type::READ); - - Result res = trx.begin(); - - if (!res.ok()) { - errorMsg = - std::string("unable to start transaction: ") + res.errorMessage(); - res.reset(res.errorNumber(), errorMsg); - return res.errorNumber(); - } - - // We do not take responsibility for the index. - // The LogicalCollection is protected by trx. - // Neither it nor it's indexes can be invalidated - - markers.reserve(trx.documentCollection()->numberDocuments(&trx)); - - uint64_t iterations = 0; - ManagedDocumentResult mmdr; - trx.invokeOnAllElements( - trx.name(), [this, &trx, &mmdr, &markers, - &iterations](DocumentIdentifierToken const& token) { - if (trx.documentCollection()->readDocument(&trx, token, mmdr)) { - markers.emplace_back(mmdr.vpack()); - - if (++iterations % 10000 == 0) { - if (checkAborted()) { - return false; - } - } - } - return true; - }); - - if (checkAborted()) { - return TRI_ERROR_REPLICATION_APPLIER_STOPPED; - } - - sendExtendBatch(); - sendExtendBarrier(); - - std::string progress = "sorting " + std::to_string(markers.size()) + - " local key(s) for collection '" + collectionName + - "'"; - setProgress(progress); - - // sort all our local keys - std::sort( - markers.begin(), markers.end(), - [](uint8_t const* lhs, uint8_t const* rhs) -> bool { - VPackSlice const l(lhs); - VPackSlice const r(rhs); - - VPackValueLength lLength, rLength; - char const* lKey = l.get(StaticStrings::KeyString).getString(lLength); - char const* rKey = r.get(StaticStrings::KeyString).getString(rLength); - - size_t const length = - static_cast(lLength < rLength ? lLength : rLength); - int res = memcmp(lKey, rKey, length); - - if (res < 0) { - // left is smaller than right - return true; - } - if (res == 0 && lLength < rLength) { - // left is equal to right, but of shorter length - return true; - } - - return false; - }); - } - - if (checkAborted()) { - return TRI_ERROR_REPLICATION_APPLIER_STOPPED; - } - - sendExtendBatch(); - sendExtendBarrier(); - - std::vector toFetch; - - TRI_voc_tick_t const chunkSize = 5000; - std::string const baseUrl = BaseUrl + "/keys"; - - std::string url = - baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize); - progress = "fetching remote keys chunks for collection '" + collectionName + - "' from " + url; - setProgress(progress); - - std::unique_ptr response( - _client->retryRequest(rest::RequestType::GET, url, nullptr, 0)); - - if (response == nullptr || !response->isComplete()) { - errorMsg = "could not connect to master at " + _masterInfo._endpoint + - ": " + _client->getErrorMessage(); - - return TRI_ERROR_REPLICATION_NO_RESPONSE; - } - - TRI_ASSERT(response != nullptr); - - if (response->wasHttpError()) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) + - ": " + response->getHttpReturnMessage(); - - return TRI_ERROR_REPLICATION_MASTER_ERROR; - } - - auto builder = std::make_shared(); - int res = parseResponse(builder, response.get()); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "got invalid response from master at " + - std::string(_masterInfo._endpoint) + - ": invalid response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const slice = builder->slice(); - - if (!slice.isArray()) { - errorMsg = "got invalid response from master at " + _masterInfo._endpoint + - ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - OperationOptions options; - options.silent = true; - options.ignoreRevs = true; - options.isRestore = true; - - VPackBuilder keyBuilder; - size_t const n = static_cast(slice.length()); - - // remove all keys that are below first remote key or beyond last remote key - if (n > 0) { - // first chunk - SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(_vocbase), col->cid(), - AccessMode::Type::WRITE); - - Result res = trx.begin(); - - if (!res.ok()) { - errorMsg = - std::string("unable to start transaction: ") + res.errorMessage(); - res.reset(res.errorNumber(), errorMsg); - return res.errorNumber(); - } - - VPackSlice chunk = slice.at(0); - - TRI_ASSERT(chunk.isObject()); - auto lowSlice = chunk.get("low"); - TRI_ASSERT(lowSlice.isString()); - - std::string const lowKey(lowSlice.copyString()); - - for (size_t i = 0; i < markers.size(); ++i) { - VPackSlice const k(markers[i]); - - std::string const key(k.get(StaticStrings::KeyString).copyString()); - - if (key.compare(lowKey) >= 0) { - break; - } - - keyBuilder.clear(); - keyBuilder.openObject(); - keyBuilder.add(StaticStrings::KeyString, VPackValue(key)); - keyBuilder.close(); - - trx.remove(collectionName, keyBuilder.slice(), options); - } - - // last high - chunk = slice.at(n - 1); - TRI_ASSERT(chunk.isObject()); - - auto highSlice = chunk.get("high"); - TRI_ASSERT(highSlice.isString()); - - std::string const highKey(highSlice.copyString()); - - for (size_t i = markers.size(); i >= 1; --i) { - VPackSlice const k(markers[i - 1]); - - std::string const key(k.get(StaticStrings::KeyString).copyString()); - - if (key.compare(highKey) <= 0) { - break; - } - - keyBuilder.clear(); - keyBuilder.openObject(); - keyBuilder.add(StaticStrings::KeyString, VPackValue(key)); - keyBuilder.close(); - - trx.remove(collectionName, keyBuilder.slice(), options); - } - - trx.commit(); - } - - size_t nextStart = 0; - - // now process each chunk - for (size_t i = 0; i < n; ++i) { - if (checkAborted()) { - return TRI_ERROR_REPLICATION_APPLIER_STOPPED; - } - - SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(_vocbase), col->cid(), - AccessMode::Type::WRITE); - - Result res = trx.begin(); - - if (!res.ok()) { - errorMsg = - std::string("unable to start transaction: ") + res.errorMessage(); - res.reset(res.errorNumber(), res.errorMessage()); - return res.errorNumber(); - } - - trx.pinData(col->cid()); // will throw when it fails - - // We do not take responsibility for the index. - // The LogicalCollection is protected by trx. - // Neither it nor it's indexes can be invalidated - - // TODO Move to MMFiles - auto physical = static_cast( - trx.documentCollection()->getPhysical()); - auto idx = physical->primaryIndex(); - - size_t const currentChunkId = i; - progress = "processing keys chunk " + std::to_string(currentChunkId) + - " for collection '" + collectionName + "'"; - setProgress(progress); - - sendExtendBatch(); - sendExtendBarrier(); - - // read remote chunk - VPackSlice chunk = slice.at(i); - - if (!chunk.isObject()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": chunk is no object"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const lowSlice = chunk.get("low"); - VPackSlice const highSlice = chunk.get("high"); - VPackSlice const hashSlice = chunk.get("hash"); - - if (!lowSlice.isString() || !highSlice.isString() || - !hashSlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + - ": chunks in response have an invalid format"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - std::string const lowString = lowSlice.copyString(); - std::string const highString = highSlice.copyString(); - - size_t localFrom; - size_t localTo; - bool match = FindRange(markers, lowString, highString, localFrom, localTo); - - if (match) { - // now must hash the range - uint64_t hash = 0x012345678; - - for (size_t i = localFrom; i <= localTo; ++i) { - TRI_ASSERT(i < markers.size()); - VPackSlice const current(markers.at(i)); - hash ^= current.get(StaticStrings::KeyString).hashString(); - hash ^= current.get(StaticStrings::RevString).hash(); - } - - if (std::to_string(hash) != hashSlice.copyString()) { - match = false; - } - } - - if (match) { - // match - nextStart = localTo + 1; - } else { - // no match - // must transfer keys for non-matching range - std::string url = baseUrl + "/" + keysId + - "?type=keys&chunk=" + std::to_string(i) + - "&chunkSize=" + std::to_string(chunkSize); - progress = "fetching keys chunk " + std::to_string(currentChunkId) + - " for collection '" + collectionName + "' from " + url; - setProgress(progress); - - std::unique_ptr response( - _client->retryRequest(rest::RequestType::PUT, url, nullptr, 0)); - - if (response == nullptr || !response->isComplete()) { - errorMsg = "could not connect to master at " + _masterInfo._endpoint + - ": " + _client->getErrorMessage(); - - return TRI_ERROR_REPLICATION_NO_RESPONSE; - } - - TRI_ASSERT(response != nullptr); - - if (response->wasHttpError()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": HTTP " + - StringUtils::itoa(response->getHttpReturnCode()) + ": " + - response->getHttpReturnMessage(); - - return TRI_ERROR_REPLICATION_MASTER_ERROR; - } - - auto builder = std::make_shared(); - int res = parseResponse(builder, response.get()); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const slice = builder->slice(); - if (!slice.isArray()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - // delete all keys at start of the range - while (nextStart < markers.size()) { - VPackSlice const keySlice(markers[nextStart]); - std::string const localKey( - keySlice.get(StaticStrings::KeyString).copyString()); - - if (localKey.compare(lowString) < 0) { - // we have a local key that is not present remotely - keyBuilder.clear(); - keyBuilder.openObject(); - keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey)); - keyBuilder.close(); - - trx.remove(collectionName, keyBuilder.slice(), options); - ++nextStart; - } else { - break; - } - } - - toFetch.clear(); - - size_t const n = static_cast(slice.length()); - TRI_ASSERT(n > 0); - - for (size_t i = 0; i < n; ++i) { - VPackSlice const pair = slice.at(i); - - if (!pair.isArray() || pair.length() != 2) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + - ": response key pair is no valid array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - // key - VPackSlice const keySlice = pair.at(0); - - if (!keySlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": response key is no string"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - // rid - if (markers.empty()) { - // no local markers - toFetch.emplace_back(i); - continue; - } - - std::string const keyString = keySlice.copyString(); - - while (nextStart < markers.size()) { - VPackSlice const localKeySlice(markers[nextStart]); - std::string const localKey( - localKeySlice.get(StaticStrings::KeyString).copyString()); - - int res = localKey.compare(keyString); - - if (res != 0) { - // we have a local key that is not present remotely - keyBuilder.clear(); - keyBuilder.openObject(); - keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey)); - keyBuilder.close(); - - trx.remove(collectionName, keyBuilder.slice(), options); - ++nextStart; - } else { - // key match - break; - } - } - - MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice); - - if (!element) { - // key not found locally - toFetch.emplace_back(i); - } else if (TRI_RidToString(element.revisionId()) != - pair.at(1).copyString()) { - // key found, but revision id differs - toFetch.emplace_back(i); - ++nextStart; - } else { - // a match - nothing to do! - ++nextStart; - } - } - - // calculate next starting point - if (!markers.empty()) { - BinarySearch(markers, highString, nextStart); - - while (nextStart < markers.size()) { - VPackSlice const localKeySlice(markers[nextStart]); - std::string const localKey( - localKeySlice.get(StaticStrings::KeyString).copyString()); - - int res = localKey.compare(highString); - - if (res <= 0) { - ++nextStart; - } else { - break; - } - } - } - - if (!toFetch.empty()) { - VPackBuilder keysBuilder; - keysBuilder.openArray(); - for (auto& it : toFetch) { - keysBuilder.add(VPackValue(it)); - } - keysBuilder.close(); - - std::string url = baseUrl + "/" + keysId + - "?type=docs&chunk=" + std::to_string(currentChunkId) + - "&chunkSize=" + std::to_string(chunkSize); - progress = "fetching documents chunk " + - std::to_string(currentChunkId) + " for collection '" + - collectionName + "' from " + url; - setProgress(progress); - - std::string const keyJsonString(keysBuilder.slice().toJson()); - - std::unique_ptr response( - _client->retryRequest(rest::RequestType::PUT, url, - keyJsonString.c_str(), keyJsonString.size())); - - if (response == nullptr || !response->isComplete()) { - errorMsg = "could not connect to master at " + _masterInfo._endpoint + - ": " + _client->getErrorMessage(); - - return TRI_ERROR_REPLICATION_NO_RESPONSE; - } - - TRI_ASSERT(response != nullptr); - - if (response->wasHttpError()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": HTTP " + - StringUtils::itoa(response->getHttpReturnCode()) + ": " + - response->getHttpReturnMessage(); - - return TRI_ERROR_REPLICATION_MASTER_ERROR; - } - - auto builder = std::make_shared(); - int res = parseResponse(builder, response.get()); - - if (res != TRI_ERROR_NO_ERROR) { - errorMsg = "got invalid response from master at " + - std::string(_masterInfo._endpoint) + - ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const slice = builder->slice(); - if (!slice.isArray()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": response is no array"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - for (auto const& it : VPackArrayIterator(slice)) { - if (!it.isObject()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": document is no object"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const keySlice = it.get(StaticStrings::KeyString); - - if (!keySlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": document key is invalid"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - VPackSlice const revSlice = it.get(StaticStrings::RevString); - - if (!revSlice.isString()) { - errorMsg = "got invalid response from master at " + - _masterInfo._endpoint + ": document revision is invalid"; - - return TRI_ERROR_REPLICATION_INVALID_RESPONSE; - } - - MMFilesSimpleIndexElement element = idx->lookupKey(&trx, keySlice); - - if (!element) { - // INSERT - OperationResult opRes = trx.insert(collectionName, it, options); - res = opRes.code; - } else { - // UPDATE - OperationResult opRes = trx.update(collectionName, it, options); - res = opRes.code; - } - - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - } - } - } - - res = trx.commit(); - - if (!res.ok()) { - return res.errorNumber(); - } - } - - return res; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief changes the properties of a collection, based on the VelocyPack /// provided diff --git a/arangod/Replication/InitialSyncer.h b/arangod/Replication/InitialSyncer.h index 0769ecc80f..c7b4751dd8 100644 --- a/arangod/Replication/InitialSyncer.h +++ b/arangod/Replication/InitialSyncer.h @@ -37,12 +37,50 @@ struct TRI_vocbase_t; namespace arangodb { class LogicalCollection; +class InitialSyncer; +int handleSyncKeysMMFiles(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, std::string& errorMsg); + +int handleSyncKeysRocksDB(InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, std::string& errorMsg); + +int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx, + std::string const& keysId, uint64_t chunkId, + std::string const& lowString, + std::string const& highString, + std::vector> markers, + std::string& errorMsg); namespace httpclient { class SimpleHttpResult; } class InitialSyncer : public Syncer { + friend int ::arangodb::handleSyncKeysMMFiles( + arangodb::InitialSyncer& syncer, arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, TRI_voc_tick_t maxTick, + std::string& errorMsg); + + friend int ::arangodb::handleSyncKeysRocksDB( + InitialSyncer& syncer, arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, TRI_voc_tick_t maxTick, + std::string& errorMsg); + + friend int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx, + std::string const& keysId, uint64_t chunkId, + std::string const& lowString, + std::string const& highString, + std::vector> markers, + std::string& errorMsg); + private: ////////////////////////////////////////////////////////////////////////////// /// @brief apply phases @@ -64,7 +102,6 @@ class InitialSyncer : public Syncer { ~InitialSyncer(); public: - ////////////////////////////////////////////////////////////////////////////// /// @brief run method, performs a full synchronization ////////////////////////////////////////////////////////////////////////////// @@ -117,8 +154,7 @@ class InitialSyncer : public Syncer { if (_verbose) { LOG_TOPIC(INFO, Logger::REPLICATION) << msg; - } - else { + } else { LOG_TOPIC(DEBUG, Logger::REPLICATION) << msg; } @@ -161,8 +197,7 @@ class InitialSyncer : public Syncer { /// @brief apply the data from a collection dump ////////////////////////////////////////////////////////////////////////////// - int applyCollectionDump(transaction::Methods&, - std::string const&, + int applyCollectionDump(transaction::Methods&, std::string const&, httpclient::SimpleHttpResult*, uint64_t&, std::string&); @@ -185,22 +220,21 @@ class InitialSyncer : public Syncer { int handleCollectionSync(arangodb::LogicalCollection*, std::string const&, std::string const&, TRI_voc_tick_t, std::string&); - + ////////////////////////////////////////////////////////////////////////////// /// @brief incrementally fetch data from a collection ////////////////////////////////////////////////////////////////////////////// - + int handleSyncKeysRocksDB(arangodb::LogicalCollection* col, std::string const& keysId, std::string const& cid, - std::string const& collectionName, TRI_voc_tick_t maxTick, - std::string& errorMsg); + std::string const& collectionName, + TRI_voc_tick_t maxTick, std::string& errorMsg); ////////////////////////////////////////////////////////////////////////////// /// @brief incrementally fetch chunk data from a collection ////////////////////////////////////////////////////////////////////////////// - + int syncChunkRocksDB(SingleCollectionTransaction* trx, - std::string const& keysId, - uint64_t chunkId, + std::string const& keysId, uint64_t chunkId, std::string const& lowKey, std::string const& highKey, std::vector> markers, std::string& errorMsg); @@ -211,8 +245,8 @@ class InitialSyncer : public Syncer { int handleSyncKeysMMFiles(arangodb::LogicalCollection* col, std::string const& keysId, std::string const& cid, - std::string const& collectionName, TRI_voc_tick_t maxTick, - std::string& errorMsg); + std::string const& collectionName, + TRI_voc_tick_t maxTick, std::string& errorMsg); ////////////////////////////////////////////////////////////////////////////// /// @brief changes the properties of a collection, based on the VelocyPack @@ -226,24 +260,24 @@ class InitialSyncer : public Syncer { /// @brief handle the information about a collection ////////////////////////////////////////////////////////////////////////////// - int handleCollection(arangodb::velocypack::Slice const&, - arangodb::velocypack::Slice const&, bool, - std::string&, sync_phase_e); + int handleCollection(arangodb::velocypack::Slice const&, + arangodb::velocypack::Slice const&, bool, std::string&, + sync_phase_e); ////////////////////////////////////////////////////////////////////////////// /// @brief handle the inventory response of the master ////////////////////////////////////////////////////////////////////////////// - int handleInventoryResponse(arangodb::velocypack::Slice const&, - bool, std::string&); + int handleInventoryResponse(arangodb::velocypack::Slice const&, bool, + std::string&); ////////////////////////////////////////////////////////////////////////////// /// @brief iterate over all collections from an array and apply an action ////////////////////////////////////////////////////////////////////////////// int iterateCollections( - std::vector< - std::pair> const&, + std::vector> const&, bool, std::string&, sync_phase_e); private: diff --git a/arangod/Replication/Syncer.h b/arangod/Replication/Syncer.h index 55e54db914..b40731f3fa 100644 --- a/arangod/Replication/Syncer.h +++ b/arangod/Replication/Syncer.h @@ -58,6 +58,8 @@ class Syncer { Syncer(TRI_vocbase_t*, TRI_replication_applier_configuration_t const*); virtual ~Syncer(); + + TRI_vocbase_t* vocbase() { return _vocbase; } ////////////////////////////////////////////////////////////////////////////// /// @brief sleeps (nanoseconds) diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 8e149a09d0..6441c7d3a9 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -1454,3 +1454,8 @@ int RocksDBCollection::unlockRead() { return TRI_ERROR_NO_ERROR; } + +uint64_t RocksDBCollection::recalculateCounts(){ + THROW_ARANGO_NOT_YET_IMPLEMENTED(); + return 0; +} diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index 1681d1dd67..b3c87c0df1 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -39,7 +39,7 @@ class Result; class RocksDBPrimaryIndex; class RocksDBVPackIndex; struct RocksDBToken; - + class RocksDBCollection final : public PhysicalCollection { friend class RocksDBEngine; friend class RocksDBVPackIndex; @@ -47,19 +47,6 @@ class RocksDBCollection final : public PhysicalCollection { constexpr static double defaultLockTimeout = 10.0 * 60.0; public: - static inline RocksDBCollection* toRocksDBCollection( - PhysicalCollection* physical) { - auto rv = static_cast(physical); - TRI_ASSERT(rv != nullptr); - return rv; - } - - static inline RocksDBCollection* toRocksDBCollection( - LogicalCollection* logical) { - auto phys = logical->getPhysical(); - TRI_ASSERT(phys != nullptr); - return toRocksDBCollection(phys); - } public: explicit RocksDBCollection(LogicalCollection*, VPackSlice const& info); @@ -191,6 +178,9 @@ class RocksDBCollection final : public PhysicalCollection { int lockRead(double timeout = 0.0); int unlockRead(); + // recalculte counts for collection in case of failure + uint64_t recalculateCounts(); + private: /// @brief return engine-specific figures void figuresSpecific( @@ -234,6 +224,19 @@ class RocksDBCollection final : public PhysicalCollection { basics::ReadWriteLock _exclusiveLock; }; + +inline RocksDBCollection* toRocksDBCollection(PhysicalCollection* physical) { + auto rv = static_cast(physical); + TRI_ASSERT(rv != nullptr); + return rv; +} + +inline RocksDBCollection* toRocksDBCollection(LogicalCollection* logical) { + auto phys = logical->getPhysical(); + TRI_ASSERT(phys != nullptr); + return toRocksDBCollection(phys); +} + } #endif diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index 561b166367..d3cec08598 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -106,7 +106,7 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { } // acquire rocksdb collection - auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection); + auto rocksColl = toRocksDBCollection(_collection); while (true) { TRI_ASSERT(limit > 0); diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 0dff8b0e88..e764fc95c6 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -57,6 +57,7 @@ #include "RocksDBEngine/RocksDBV8Functions.h" #include "RocksDBEngine/RocksDBValue.h" #include "RocksDBEngine/RocksDBView.h" +#include "RocksDBEngine/RocksDBInitialSync.h" #include "VocBase/replication-applier.h" #include "VocBase/ticks.h" @@ -655,7 +656,7 @@ arangodb::Result RocksDBEngine::persistCollection( #ifdef ARANGODB_ENABLE_MAINTAINER_MODE if (result.ok()) { RocksDBCollection* rcoll = - RocksDBCollection::toRocksDBCollection(collection->getPhysical()); + toRocksDBCollection(collection->getPhysical()); TRI_ASSERT(rcoll->numberDocuments() == 0); } #endif @@ -704,7 +705,7 @@ arangodb::Result RocksDBEngine::dropCollection( // Cleanup data-mess RocksDBCollection* coll = - RocksDBCollection::toRocksDBCollection(collection->getPhysical()); + toRocksDBCollection(collection->getPhysical()); // Unregister counter _counterManager->removeCounter(coll->objectId()); @@ -1210,4 +1211,14 @@ RocksDBReplicationManager* RocksDBEngine::replicationManager() const { return _replicationManager.get(); } +int RocksDBEngine::handleSyncKeys(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, + std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, + std::string& errorMsg) { + return handleSyncKeysRocksDB(syncer, col, keysId, cid, collectionName, + maxTick, errorMsg); +} } // namespace diff --git a/arangod/RocksDBEngine/RocksDBEngine.h b/arangod/RocksDBEngine/RocksDBEngine.h index 1ff4a3ed7a..065bdd4777 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.h +++ b/arangod/RocksDBEngine/RocksDBEngine.h @@ -115,7 +115,11 @@ class RocksDBEngine final : public StorageEngine { int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, arangodb::velocypack::Slice slice, bool doSync) override; - + int handleSyncKeys(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, TRI_voc_tick_t maxTick, + std::string& errorMsg); // database, collection and index management // ----------------------------------------- diff --git a/arangod/RocksDBEngine/RocksDBInitialSync.h b/arangod/RocksDBEngine/RocksDBInitialSync.h new file mode 100644 index 0000000000..3af502ea36 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBInitialSync.h @@ -0,0 +1,595 @@ +#include "Replication/InitialSyncer.h" + +#include "Basics/StaticStrings.h" +#include "Basics/StringUtils.h" +#include "Indexes/IndexIterator.h" +#include "SimpleHttpClient/SimpleHttpClient.h" +#include "SimpleHttpClient/SimpleHttpResult.h" +#include "Transaction/Helpers.h" +#include "Utils/OperationOptions.h" +#include "VocBase/LogicalCollection.h" +#include "VocBase/ManagedDocumentResult.h" +#include "StorageEngine/PhysicalCollection.h" + +#include +#include +#include +#include + +namespace arangodb { +int syncChunkRocksDB(InitialSyncer& syncer, SingleCollectionTransaction* trx, + std::string const& keysId, uint64_t chunkId, + std::string const& lowString, + std::string const& highString, + std::vector> markers, + std::string& errorMsg) { + std::string const baseUrl = syncer.BaseUrl + "/keys"; + TRI_voc_tick_t const chunkSize = 5000; + std::string const& collectionName = trx->documentCollection()->name(); + PhysicalCollection* physical = trx->documentCollection()->getPhysical(); + OperationOptions options; + options.silent = true; + options.ignoreRevs = true; + options.isRestore = true; + + // no match + // must transfer keys for non-matching range + std::string url = baseUrl + "/" + keysId + "?type=keys&chunk=" + + std::to_string(chunkId) + "&chunkSize=" + + std::to_string(chunkSize) + "&low=" + lowString; + + std::string progress = + "fetching keys chunk '" + std::to_string(chunkId) + "' from " + url; + syncer.setProgress(progress); + + std::unique_ptr response( + syncer._client->retryRequest(rest::RequestType::PUT, url, nullptr, 0)); + + if (response == nullptr || !response->isComplete()) { + errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint + + ": " + syncer._client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint + + ": HTTP " + basics::StringUtils::itoa(response->getHttpReturnCode()) + + ": " + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + auto builder = std::make_shared(); + int res = syncer.parseResponse(builder, response.get()); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const responseBody = builder->slice(); + if (!responseBody.isArray()) { + errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint + + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + transaction::BuilderLeaser keyBuilder(trx); + /*size_t nextStart = 0; + // delete all keys at start of the range + while (nextStart < markers.size()) { + std::string const& localKey = markers[nextStart].first; + + if ( localKey.compare(lowString) < 0) { + // we have a local key that is not present remotely + keyBuilder.clear(); + keyBuilder.openObject(); + keyBuilder.add(StaticStrings::KeyString, VPackValue(localKey)); + keyBuilder.close(); + + trx.remove(collectionName, keyBuilder.slice(), options); + ++nextStart; + } else { + break; + } + }*/ + + std::vector toFetch; + + size_t const numKeys = static_cast(responseBody.length()); + if (numKeys == 0) { + errorMsg = "got invalid response from master at " + syncer._masterInfo._endpoint + + ": response contains an empty chunk. Collection: " + + collectionName + " Chunk: " + std::to_string(chunkId); + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + TRI_ASSERT(numKeys > 0); + + size_t i = 0; + size_t nextStart = 0; + + for (VPackSlice const& pair : VPackArrayIterator(responseBody)) { + if (!pair.isArray() || pair.length() != 2) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + + ": response key pair is no valid array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + // key + VPackSlice const keySlice = pair.at(0); + if (!keySlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response key is no string"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + // rid + if (markers.empty()) { + // no local markers + toFetch.emplace_back(i); + i++; + continue; + } + + // remove keys not present anymore + while (nextStart < markers.size()) { + std::string const& localKey = markers[nextStart].first; + + int res = keySlice.compareString(localKey); + if (res != 0) { + // we have a local key that is not present remotely + keyBuilder->clear(); + keyBuilder->openObject(); + keyBuilder->add(StaticStrings::KeyString, VPackValue(localKey)); + keyBuilder->close(); + + trx->remove(collectionName, keyBuilder->slice(), options); + ++nextStart; + } else { + // key match + break; + } + } + + // see if key exists + DocumentIdentifierToken token = physical->lookupKey(trx, keySlice); + if (token._data == 0) { + // key not found locally + toFetch.emplace_back(i); + } else if (TRI_RidToString(token._data) != pair.at(1).copyString()) { + // key found, but revision id differs + toFetch.emplace_back(i); + ++nextStart; + } else { + // a match - nothing to do! + ++nextStart; + } + + i++; + } + + // delete all keys at end of the range + while (nextStart < markers.size()) { + std::string const& localKey = markers[nextStart].first; + + TRI_ASSERT(localKey.compare(highString) > 0); + // if (localKey.compare(highString) > 0) { + // we have a local key that is not present remotely + keyBuilder->clear(); + keyBuilder->openObject(); + keyBuilder->add(StaticStrings::KeyString, VPackValue(localKey)); + keyBuilder->close(); + + trx->remove(collectionName, keyBuilder->slice(), options); + //} + ++nextStart; + } + + if (!toFetch.empty()) { + VPackBuilder keysBuilder; + keysBuilder.openArray(); + for (auto& it : toFetch) { + keysBuilder.add(VPackValue(it)); + } + keysBuilder.close(); + + std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" + + std::to_string(chunkId) + "&chunkSize=" + + std::to_string(chunkSize) + "&low=" + lowString ; + + progress = "fetching documents chunk " + std::to_string(chunkId) + + " for collection '" + collectionName + "' from " + url; + syncer.setProgress(progress); + + std::string const keyJsonString(keysBuilder.slice().toJson()); + + std::unique_ptr response( + syncer._client->retryRequest(rest::RequestType::PUT, url, + keyJsonString.c_str(), keyJsonString.size())); + + if (response == nullptr || !response->isComplete()) { + errorMsg = "could not connect to master at " + syncer._masterInfo._endpoint + + ": " + syncer._client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": HTTP " + + basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " + + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + auto builder = std::make_shared(); + int res = syncer.parseResponse(builder, response.get()); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "got invalid response from master at " + + std::string(syncer._masterInfo._endpoint) + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const slice = builder->slice(); + if (!slice.isArray()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + for (auto const& it : VPackArrayIterator(slice)) { + if (!it.isObject()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": document is no object"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const keySlice = it.get(StaticStrings::KeyString); + + if (!keySlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": document key is invalid"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const revSlice = it.get(StaticStrings::RevString); + + if (!revSlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": document revision is invalid"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + DocumentIdentifierToken token = physical->lookupKey(trx, keySlice); + + if (!token._data) { + // INSERT + OperationResult opRes = trx->insert(collectionName, it, options); + res = opRes.code; + } else { + // UPDATE + OperationResult opRes = trx->update(collectionName, it, options); + res = opRes.code; + } + + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } + } + return TRI_ERROR_NO_ERROR; +} + +int handleSyncKeysRocksDB(InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, std::string& errorMsg) { + std::string progress = + "collecting local keys for collection '" + collectionName + "'"; + syncer.setProgress(progress); + + if (syncer.checkAborted()) { + return TRI_ERROR_REPLICATION_APPLIER_STOPPED; + } + + syncer.sendExtendBatch(); + syncer.sendExtendBarrier(); + + TRI_voc_tick_t const chunkSize = 5000; + std::string const baseUrl = syncer.BaseUrl + "/keys"; + + std::string url = + baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize); + progress = "fetching remote keys chunks for collection '" + collectionName + + "' from " + url; + syncer.setProgress(progress); + + std::unique_ptr response( + syncer._client->retryRequest(rest::RequestType::GET, url, nullptr, 0)); + + if (response == nullptr || !response->isComplete()) { + errorMsg = "could not connect to master at " + + syncer._masterInfo._endpoint + ": " + + syncer._client->getErrorMessage(); + + return TRI_ERROR_REPLICATION_NO_RESPONSE; + } + + TRI_ASSERT(response != nullptr); + + if (response->wasHttpError()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": HTTP " + + basics::StringUtils::itoa(response->getHttpReturnCode()) + ": " + + response->getHttpReturnMessage(); + + return TRI_ERROR_REPLICATION_MASTER_ERROR; + } + + auto builder = std::make_shared(); + int res = syncer.parseResponse(builder, response.get()); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "got invalid response from master at " + + std::string(syncer._masterInfo._endpoint) + + ": invalid response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + VPackSlice const chunkSlice = builder->slice(); + + if (!chunkSlice.isArray()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": response is no array"; + + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } + + ManagedDocumentResult mmdr; + OperationOptions options; + options.silent = true; + options.ignoreRevs = true; + options.isRestore = true; + + VPackBuilder keyBuilder; + size_t const numChunks = static_cast(chunkSlice.length()); + + // remove all keys that are below first remote key or beyond last remote key + if (numChunks > 0) { + // first chunk + SingleCollectionTransaction trx( + transaction::StandaloneContext::Create(syncer._vocbase), col->cid(), + AccessMode::Type::WRITE); + + Result res = trx.begin(); + + if (!res.ok()) { + errorMsg = + std::string("unable to start transaction: ") + res.errorMessage(); + res.reset(res.errorNumber(), errorMsg); + return res.errorNumber(); + } + + VPackSlice chunk = chunkSlice.at(0); + + TRI_ASSERT(chunk.isObject()); + auto lowSlice = chunk.get("low"); + TRI_ASSERT(lowSlice.isString()); + + // last high + chunk = chunkSlice.at(numChunks - 1); + TRI_ASSERT(chunk.isObject()); + + auto highSlice = chunk.get("high"); + TRI_ASSERT(highSlice.isString()); + + std::string const lowKey(lowSlice.copyString()); + std::string const highKey(highSlice.copyString()); + + LogicalCollection* coll = trx.documentCollection(); + std::unique_ptr iterator = + coll->getAllIterator(&trx, &mmdr, false); + iterator->next( + [&](DocumentIdentifierToken const& token) { + if (coll->readDocument(&trx, token, mmdr) == false) { + return; + } + VPackSlice doc(mmdr.vpack()); + VPackSlice key = doc.get(StaticStrings::KeyString); + if (key.compareString(lowKey.data(), lowKey.length()) < 0) { + trx.remove(collectionName, key, options); + } else if (key.compareString(highKey.data(), highKey.length()) > 0) { + trx.remove(collectionName, key, options); + } + }, + UINT64_MAX); + + trx.commit(); + } + + { + if (syncer.checkAborted()) { + return TRI_ERROR_REPLICATION_APPLIER_STOPPED; + } + + SingleCollectionTransaction trx( + transaction::StandaloneContext::Create(syncer._vocbase), col->cid(), + AccessMode::Type::WRITE); + + Result res = trx.begin(); + + if (!res.ok()) { + errorMsg = + std::string("unable to start transaction: ") + res.errorMessage(); + res.reset(res.errorNumber(), res.errorMessage()); + return res.errorNumber(); + } + + // We do not take responsibility for the index. + // The LogicalCollection is protected by trx. + // Neither it nor it's indexes can be invalidated + + size_t currentChunkId = 0; + + std::string lowKey; + std::string highKey; + std::string hashString; + uint64_t localHash = 0x012345678; + // chunk keys + revisionId + std::vector> markers; + bool foundLowKey = false; + + auto resetChunk = [&]() -> void { + syncer.sendExtendBatch(); + syncer.sendExtendBarrier(); + + progress = "processing keys chunk " + std::to_string(currentChunkId) + + " for collection '" + collectionName + "'"; + syncer.setProgress(progress); + + // read remote chunk + VPackSlice chunk = chunkSlice.at(currentChunkId); + if (!chunk.isObject()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + ": chunk is no object"; + THROW_ARANGO_EXCEPTION(TRI_ERROR_REPLICATION_INVALID_RESPONSE); + } + + VPackSlice const lowSlice = chunk.get("low"); + VPackSlice const highSlice = chunk.get("high"); + VPackSlice const hashSlice = chunk.get("hash"); + if (!lowSlice.isString() || !highSlice.isString() || + !hashSlice.isString()) { + errorMsg = "got invalid response from master at " + + syncer._masterInfo._endpoint + + ": chunks in response have an invalid format"; + THROW_ARANGO_EXCEPTION(TRI_ERROR_REPLICATION_INVALID_RESPONSE); + } + + // now reset chunk information + markers.clear(); + lowKey = lowSlice.copyString(); + highKey = highSlice.copyString(); + hashString = hashSlice.copyString(); + localHash = 0x012345678; + foundLowKey = false; + }; + // set to first chunk + resetChunk(); + + std::function parseDoc = [&](VPackSlice doc, + VPackSlice key) { + + bool rangeUnequal = false; + bool nextChunk = false; + + int cmp1 = key.compareString(lowKey.data(), lowKey.length()); + int cmp2 = key.compareString(highKey.data(), highKey.length()); + + if (cmp1 < 0) { + // smaller values than lowKey mean they don't exist remotely + trx.remove(collectionName, key, options); + return; + } else if (cmp1 >= 0 && cmp2 <= 0) { + // we only need to hash we are in the range + if (cmp1 == 0) { + foundLowKey = true; + } + + markers.emplace_back(key.copyString(), TRI_ExtractRevisionId(doc)); + // don't bother hashing if we have't found lower key + if (foundLowKey) { + VPackSlice revision = doc.get(StaticStrings::RevString); + localHash ^= key.hashString(); + localHash ^= revision.hash(); + + if (cmp2 == 0) { // found highKey + rangeUnequal = std::to_string(localHash) != hashString; + nextChunk = true; + } + } else if (cmp2 == 0) { // found high key, but not low key + rangeUnequal = true; + nextChunk = true; + } + } else if (cmp2 > 0) { // higher than highKey + // current range was unequal and we did not find the + // high key. Load range and skip to next + rangeUnequal = true; + nextChunk = true; + } + + TRI_ASSERT(!rangeUnequal || nextChunk); // A => B + if (nextChunk) { // we are out of range, see next chunk + if (rangeUnequal && currentChunkId < numChunks) { + int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey, + highKey, markers, errorMsg); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + } + currentChunkId++; + if (currentChunkId < numChunks) { + resetChunk(); + // key is higher than upper bound, recheck the current document + if (cmp2 > 0) { + parseDoc(doc, key); + } + } + } + }; + + std::unique_ptr iterator = + col->getAllIterator(&trx, &mmdr, false); + iterator->next( + [&](DocumentIdentifierToken const& token) { + if (col->readDocument(&trx, token, mmdr) == false) { + return; + } + VPackSlice doc(mmdr.vpack()); + VPackSlice key = doc.get(StaticStrings::KeyString); + parseDoc(doc, key); + }, + UINT64_MAX); + + // we might have missed chunks, if the keys don't exist at all locally + while (currentChunkId < numChunks) { + int res = syncChunkRocksDB(syncer, &trx, keysId, currentChunkId, lowKey, highKey, + markers, errorMsg); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + currentChunkId++; + if (currentChunkId < numChunks) { + resetChunk(); + } + } + + res = trx.commit(); + if (!res.ok()) { + return res.errorNumber(); + } + } + + return res; +} +} diff --git a/arangod/RocksDBEngine/RocksDBKeyBounds.cpp b/arangod/RocksDBEngine/RocksDBKeyBounds.cpp index 1e00c9cb49..5d66afe570 100644 --- a/arangod/RocksDBEngine/RocksDBKeyBounds.cpp +++ b/arangod/RocksDBEngine/RocksDBKeyBounds.cpp @@ -102,6 +102,23 @@ rocksdb::Slice const RocksDBKeyBounds::end() const { return rocksdb::Slice(_endBuffer); } +uint64_t RocksDBKeyBounds::objectId() const { + RocksDBEntryType type = static_cast(_startBuffer[0]); + switch (type) { + case RocksDBEntryType::Document: + case RocksDBEntryType::PrimaryIndexValue: + case RocksDBEntryType::EdgeIndexValue: + case RocksDBEntryType::IndexValue: + case RocksDBEntryType::UniqueIndexValue: { + TRI_ASSERT(_startBuffer.size() >= (sizeof(char) + sizeof(uint64_t))); + return uint64FromPersistent(_startBuffer.data() + sizeof(char)); + } + + default: + THROW_ARANGO_EXCEPTION(TRI_ERROR_TYPE_ERROR); + } +} + // constructor for an empty bound. do not use for anything but to // default-construct a key bound! RocksDBKeyBounds::RocksDBKeyBounds() diff --git a/arangod/RocksDBEngine/RocksDBKeyBounds.h b/arangod/RocksDBEngine/RocksDBKeyBounds.h index 0802c3cda1..686f68c2ab 100644 --- a/arangod/RocksDBEngine/RocksDBKeyBounds.h +++ b/arangod/RocksDBEngine/RocksDBKeyBounds.h @@ -126,6 +126,14 @@ class RocksDBKeyBounds { /// iterators may check that the current key is less than this value. ////////////////////////////////////////////////////////////////////////////// rocksdb::Slice const end() const; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Returns the object ID for these bounds + /// + /// This method is only valid for certain types of bounds: Documents and + /// Index entries. + ////////////////////////////////////////////////////////////////////////////// + uint64_t objectId() const; private: RocksDBKeyBounds(); diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index ac16e542fa..3556580392 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -195,13 +195,15 @@ bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb, return true; } -void RocksDBAllIndexIterator::seek(std::string const& key) { +void RocksDBAllIndexIterator::seek(StringRef const& key) { TRI_ASSERT(_trx->state()->isRunning()); - + // don't want to get the index pointer just for this + uint64_t objectId = _bounds.objectId(); + RocksDBKey val = RocksDBKey::PrimaryIndexValue(objectId, key); if (_reverse) { - _iterator->SeekForPrev(key); + _iterator->SeekForPrev(val.string()); } else { - _iterator->Seek(key); + _iterator->Seek(val.string()); } } diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h index 669074946d..2e1490f274 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h @@ -88,7 +88,7 @@ class RocksDBAllIndexIterator final : public IndexIterator { // engine specific optimizations bool nextWithKey(TokenKeyCallback const& cb, size_t limit); - void seek(std::string const& key); + void seek(StringRef const& key); private: bool outOfRange() const; diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index ce613f9884..4ef27e1152 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -74,7 +74,7 @@ uint64_t RocksDBReplicationContext::count() const { TRI_ASSERT(_trx != nullptr); TRI_ASSERT(_collection != nullptr); RocksDBCollection* rcoll = - RocksDBCollection::toRocksDBCollection(_collection->getPhysical()); + toRocksDBCollection(_collection->getPhysical()); return rcoll->numberDocuments(_trx.get()); } @@ -266,13 +266,11 @@ arangodb::Result RocksDBReplicationContext::dumpKeys( // Position the iterator correctly size_t from = chunk * chunkSize; if (from != _lastIteratorOffset && !lowKey.empty()) { - primary->seek(lowKey); - _hasMore = true; + primary->seek(StringRef(lowKey)); _lastIteratorOffset = from; } else { // no low key supplied, we can not use seek if (from == 0 || !_hasMore || from < _lastIteratorOffset) { _iter->reset(); - _hasMore = true; _lastIteratorOffset = 0; } if (from > _lastIteratorOffset) { @@ -296,13 +294,11 @@ arangodb::Result RocksDBReplicationContext::dumpKeys( b.openArray(); // chunkSize is going to be ignored here - if (_hasMore) { - try { - _hasMore = primary->nextWithKey(cb, chunkSize); - _lastIteratorOffset++; - } catch (std::exception const& ex) { - return Result(TRI_ERROR_INTERNAL); - } + try { + _hasMore = primary->nextWithKey(cb, chunkSize); + _lastIteratorOffset++; + } catch (std::exception const& ex) { + return Result(TRI_ERROR_INTERNAL); } b.close(); @@ -322,13 +318,11 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments( // after calls to dumpKeys moved it forwards size_t from = chunk * chunkSize; if (from != _lastIteratorOffset && !lowKey.empty()) { - primary->seek(lowKey); - _hasMore = true; + primary->seek(StringRef(lowKey)); _lastIteratorOffset = from; } else { // no low key supplied, we can not use seek if (from == 0 || !_hasMore || from < _lastIteratorOffset) { _iter->reset(); - _hasMore = true; _lastIteratorOffset = 0; } if (from > _lastIteratorOffset) { @@ -353,8 +347,8 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments( b.add(current); }; - b.openArray(); bool hasMore = true; + b.openArray(); size_t oldPos = from; for (auto const& it : VPackArrayIterator(ids)) { if (!it.isNumber()) { @@ -362,6 +356,7 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments( } if (!hasMore) { LOG_TOPIC(ERR, Logger::REPLICATION) << "Not enough data"; + b.close(); return Result(TRI_ERROR_FAILED); } @@ -376,6 +371,7 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments( _lastIteratorOffset++; } b.close(); + _hasMore = hasMore; return Result(); } diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 881e6671e7..5a8a448198 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -285,6 +285,8 @@ bool RocksDBReplicationManager::garbageCollect(bool force) { MUTEX_LOCKER(mutexLocker, _lock); + auto oldSize = _contexts.size(); + for (auto it = _contexts.begin(); it != _contexts.end(); /* no hoisting */) { auto context = it->second; @@ -318,7 +320,7 @@ bool RocksDBReplicationManager::garbageCollect(bool force) { // FIXME effectively force should only be called on shutdown // nevertheless this is quite ugly - if (_contexts.size() == 0 && !force) { + if ((oldSize > 0) && (_contexts.size() == 0) && !force) { enableFileDeletions(); } } catch (...) { @@ -341,7 +343,7 @@ void RocksDBReplicationManager::disableFileDeletions() { void RocksDBReplicationManager::enableFileDeletions() { auto rocks = globalRocksDB(); - auto s = rocks->DisableFileDeletions(); + auto s = rocks->EnableFileDeletions(false); TRI_ASSERT(s.ok()); } diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 3d884c88c7..7f1a3e19bc 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -451,7 +451,9 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, if (tickStart <= lastTick && lastTick <= tickEnd) { handler->startNewBatch(batch.sequence); s = batch.writeBatchPtr->Iterate(handler.get()); - handler->endBatch(); + if (s.ok()) { + handler->endBatch(); + } } } else { LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan"; diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 2f2c890078..752e12cf57 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -903,6 +903,11 @@ void RocksDBRestReplicationHandler::handleCommandRestoreCollection() { force = StringUtils::boolean(value3); } + std::string const& value9 = + _request->value("ignoreDistributeShardsLikeErrors", found); + bool ignoreDistributeShardsLikeErrors = + found ? StringUtils::boolean(value9) : false; + uint64_t numberOfShards = 0; std::string const& value4 = _request->value("numberOfShards", found); @@ -921,9 +926,9 @@ void RocksDBRestReplicationHandler::handleCommandRestoreCollection() { int res; if (ServerState::instance()->isCoordinator()) { - res = processRestoreCollectionCoordinator(slice, overwrite, recycleIds, - force, numberOfShards, errorMsg, - replicationFactor); + res = processRestoreCollectionCoordinator( + slice, overwrite, recycleIds, force, numberOfShards, errorMsg, + replicationFactor, ignoreDistributeShardsLikeErrors); } else { res = processRestoreCollection(slice, overwrite, recycleIds, force, errorMsg); @@ -2352,7 +2357,7 @@ int RocksDBRestReplicationHandler::processRestoreCollection( int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator( VPackSlice const& collection, bool dropExisting, bool reuseId, bool force, uint64_t numberOfShards, std::string& errorMsg, - uint64_t replicationFactor) { + uint64_t replicationFactor, bool ignoreDistributeShardsLikeErrors) { if (!collection.isObject()) { errorMsg = "collection declaration is invalid"; @@ -2488,7 +2493,8 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator( "Cluster") ->createWaitsForSyncReplication(); auto col = ClusterMethods::createCollectionOnCoordinator( - collectionType, _vocbase, merged, true, createWaitsForSyncReplication); + collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors, + createWaitsForSyncReplication); TRI_ASSERT(col != nullptr); } catch (basics::Exception const& e) { // Error, report it. diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h index 786d84b438..930089b49c 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h @@ -260,7 +260,7 @@ class RocksDBRestReplicationHandler : public RestVocbaseBaseHandler { ////////////////////////////////////////////////////////////////////////////// int processRestoreCollectionCoordinator(VPackSlice const&, bool, bool, bool, - uint64_t, std::string&, uint64_t); + uint64_t, std::string&, uint64_t, bool); ////////////////////////////////////////////////////////////////////////////// /// @brief creates a collection, based on the VelocyPack provided TODO: MOVE diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 807cf61537..8beb274bfd 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -55,7 +55,9 @@ using namespace arangodb; // for the RocksDB engine we do not need any additional data struct RocksDBTransactionData final : public TransactionData {}; -RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function const& rollbackCallback) +RocksDBSavePoint::RocksDBSavePoint( + rocksdb::Transaction* trx, bool handled, + std::function const& rollbackCallback) : _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) { TRI_ASSERT(trx != nullptr); if (!_handled) { @@ -110,8 +112,8 @@ RocksDBTransactionState::~RocksDBTransactionState() { /// @brief start a transaction Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { - LOG_TRX(this, _nestingLevel) - << "beginning " << AccessMode::typeString(_type) << " transaction"; + LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type) + << " transaction"; Result result = useCollections(_nestingLevel); @@ -157,12 +159,13 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { _rocksTransaction->SetSnapshot(); _rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot(); - if (!isReadOnlyTransaction() && !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { + if (!isReadOnlyTransaction() && + !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { RocksDBLogValue header = - RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); + RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); _rocksTransaction->PutLogData(header.slice()); } - + } else { TRI_ASSERT(_status == transaction::Status::RUNNING); } @@ -173,8 +176,8 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { /// @brief commit a transaction Result RocksDBTransactionState::commitTransaction( transaction::Methods* activeTrx) { - LOG_TRX(this, _nestingLevel) - << "committing " << AccessMode::typeString(_type) << " transaction"; + LOG_TRX(this, _nestingLevel) << "committing " << AccessMode::typeString(_type) + << " transaction"; TRI_ASSERT(_status == transaction::Status::RUNNING); TRI_IF_FAILURE("TransactionWriteCommitMarker") { @@ -185,13 +188,14 @@ Result RocksDBTransactionState::commitTransaction( if (_nestingLevel == 0) { if (_rocksTransaction != nullptr) { - if (hasOperations()) { - // set wait for sync flag if required + // if (hasOperations()) { + if (_rocksTransaction->GetNumKeys() > 0) { + // set wait for sync flag if required if (waitForSync()) { _rocksWriteOptions.sync = true; _rocksTransaction->SetWriteOptions(_rocksWriteOptions); } - + // TODO wait for response on github issue to see how we can use the // sequence number result = rocksutils::convertStatus(_rocksTransaction->Commit()); @@ -231,10 +235,8 @@ Result RocksDBTransactionState::commitTransaction( } } else { // don't write anything if the transaction is empty - // TODO: calling Rollback() here does not work for some reason but it should. - // must investigate further!! - result = rocksutils::convertStatus(_rocksTransaction->Commit()); - + result = rocksutils::convertStatus(_rocksTransaction->Rollback()); + if (_cacheTx != nullptr) { // note: endTransaction() will delete _cacheTx! CacheManagerFeature::MANAGER->endTransaction(_cacheTx); @@ -256,8 +258,8 @@ Result RocksDBTransactionState::commitTransaction( /// @brief abort and rollback a transaction Result RocksDBTransactionState::abortTransaction( transaction::Methods* activeTrx) { - LOG_TRX(this, _nestingLevel) - << "aborting " << AccessMode::typeString(_type) << " transaction"; + LOG_TRX(this, _nestingLevel) << "aborting " << AccessMode::typeString(_type) + << " transaction"; TRI_ASSERT(_status == transaction::Status::RUNNING); Result result; @@ -265,7 +267,7 @@ Result RocksDBTransactionState::abortTransaction( if (_rocksTransaction != nullptr) { rocksdb::Status status = _rocksTransaction->Rollback(); result = rocksutils::convertStatus(status); - + if (_cacheTx != nullptr) { // note: endTransaction() will delete _cacheTx! CacheManagerFeature::MANAGER->endTransaction(_cacheTx); @@ -290,26 +292,25 @@ Result RocksDBTransactionState::abortTransaction( } void RocksDBTransactionState::prepareOperation( - TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, - StringRef const& key, TRI_voc_document_operation_e operationType) { - + TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, StringRef const& key, + TRI_voc_document_operation_e operationType) { TRI_ASSERT(!isReadOnlyTransaction()); - + bool singleOp = hasHint(transaction::Hints::Hint::SINGLE_OPERATION); // single operations should never call this method twice - TRI_ASSERT(!singleOp || _lastUsedCollection == 0); + TRI_ASSERT(!singleOp || _lastUsedCollection == 0); if (collectionId != _lastUsedCollection) { switch (operationType) { case TRI_VOC_DOCUMENT_OPERATION_INSERT: case TRI_VOC_DOCUMENT_OPERATION_UPDATE: case TRI_VOC_DOCUMENT_OPERATION_REPLACE: { if (singleOp) { - RocksDBLogValue logValue = RocksDBLogValue::SinglePut(_vocbase->id(), - collectionId); + RocksDBLogValue logValue = + RocksDBLogValue::SinglePut(_vocbase->id(), collectionId); _rocksTransaction->PutLogData(logValue.slice()); } else { RocksDBLogValue logValue = - RocksDBLogValue::DocumentOpsPrologue(collectionId); + RocksDBLogValue::DocumentOpsPrologue(collectionId); _rocksTransaction->PutLogData(logValue.slice()); } break; @@ -317,13 +318,12 @@ void RocksDBTransactionState::prepareOperation( case TRI_VOC_DOCUMENT_OPERATION_REMOVE: { if (singleOp) { TRI_ASSERT(!key.empty()); - RocksDBLogValue logValue = RocksDBLogValue::SingleRemove(_vocbase->id(), - collectionId, - key); + RocksDBLogValue logValue = + RocksDBLogValue::SingleRemove(_vocbase->id(), collectionId, key); _rocksTransaction->PutLogData(logValue.slice()); } else { RocksDBLogValue logValue = - RocksDBLogValue::DocumentOpsPrologue(collectionId); + RocksDBLogValue::DocumentOpsPrologue(collectionId); _rocksTransaction->PutLogData(logValue.slice()); } } break; @@ -332,11 +332,11 @@ void RocksDBTransactionState::prepareOperation( } _lastUsedCollection = collectionId; } - - // we need to log the remove log entry, if we don't have the single optimization + + // we need to log the remove log entry, if we don't have the single + // optimization if (!singleOp && operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { - RocksDBLogValue logValue = - RocksDBLogValue::DocumentRemove(key); + RocksDBLogValue logValue = RocksDBLogValue::DocumentRemove(key); _rocksTransaction->PutLogData(logValue.slice()); } } diff --git a/arangod/RocksDBEngine/RocksDBV8Functions.cpp b/arangod/RocksDBEngine/RocksDBV8Functions.cpp index 804c498bee..d61ae45d1e 100644 --- a/arangod/RocksDBEngine/RocksDBV8Functions.cpp +++ b/arangod/RocksDBEngine/RocksDBV8Functions.cpp @@ -27,10 +27,13 @@ #include "Cluster/ServerState.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBEngine.h" +#include "RocksDBEngine/RocksDBCollection.h" #include "StorageEngine/EngineSelectorFeature.h" +#include "VocBase/LogicalCollection.h" #include "V8/v8-conv.h" #include "V8/v8-globals.h" #include "V8/v8-utils.h" +#include "V8Server/v8-externals.h" #include @@ -99,6 +102,28 @@ static void JS_PropertiesWal(v8::FunctionCallbackInfo const& args) { TRI_V8_TRY_CATCH_END } +static void JS_RecalculateCounts( + v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + + arangodb::LogicalCollection* collection = + TRI_UnwrapClass(args.Holder(), + WRP_VOCBASE_COL_TYPE); + + if (collection == nullptr) { + TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection"); + } + + auto physical = toRocksDBCollection(collection); + + v8::Handle result = v8::Integer::New( + isolate, static_cast(physical->recalculateCounts())); + + TRI_V8_RETURN(result); + TRI_V8_TRY_CATCH_END +} + void RocksDBV8Functions::registerResources() { ISOLATE; v8::HandleScope scope(isolate); @@ -120,4 +145,6 @@ void RocksDBV8Functions::registerResources() { JS_PropertiesWal, true); TRI_AddGlobalFunctionVocbase(isolate, TRI_V8_ASCII_STRING("WAL_TRANSACTIONS"), JS_TransactionsWal, true); + TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("recalculate_counts"), + JS_RecalculateCounts); } diff --git a/arangod/StorageEngine/StorageEngine.h b/arangod/StorageEngine/StorageEngine.h index 339bc83c86..0f32935a5b 100644 --- a/arangod/StorageEngine/StorageEngine.h +++ b/arangod/StorageEngine/StorageEngine.h @@ -48,6 +48,7 @@ class PhysicalView; class Result; class TransactionCollection; class TransactionState; +class InitialSyncer; namespace rest { class RestHandlerFactory; @@ -406,6 +407,14 @@ class StorageEngine : public application_features::ApplicationFeature { virtual std::shared_ptr getReplicationApplierConfiguration(TRI_vocbase_t*, int& status) = 0; virtual int removeReplicationApplierConfiguration(TRI_vocbase_t* vocbase) = 0; virtual int saveReplicationApplierConfiguration(TRI_vocbase_t* vocbase, arangodb::velocypack::Slice slice, bool doSync) = 0; + + virtual int handleSyncKeys(arangodb::InitialSyncer& syncer, + arangodb::LogicalCollection* col, + std::string const& keysId, + std::string const& cid, + std::string const& collectionName, + TRI_voc_tick_t maxTick, + std::string& errorMsg) = 0; void getCapabilities(VPackBuilder& builder) const { builder.openObject(); diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index 0e7aa44520..81cc3779d3 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -747,9 +747,22 @@ void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result, std::unordered_set ignoreKeys{"allowUserKeys", "cid", "count", "objectId", - "statusString", "version"}; + "statusString", "version", + "distributeShardsLike"}; VPackBuilder params = toVelocyPackIgnore(ignoreKeys, false); - result.add(params.slice()); + { VPackObjectBuilder guard(&result); + for (auto const& p : VPackObjectIterator(params.slice())) { + result.add(p.key); + result.add(p.value); + } + if (!_distributeShardsLike.empty()) { + CollectionNameResolver resolver(_vocbase); + result.add("distributeShardsLike", + VPackValue(resolver.getCollectionNameCluster( + static_cast(basics::StringUtils::uint64( + distributeShardsLike()))))); + } + } result.add(VPackValue("indexes")); getIndexesVPack(result, false); diff --git a/lib/Basics/Thread.cpp b/lib/Basics/Thread.cpp index 88ca01b821..e848d0f96d 100644 --- a/lib/Basics/Thread.cpp +++ b/lib/Basics/Thread.cpp @@ -73,7 +73,9 @@ void Thread::startThread(void* arg) { LOCAL_THREAD_NUMBER = NEXT_THREAD_ID.fetch_add(1, std::memory_order_seq_cst); #endif + TRI_ASSERT(arg != nullptr); Thread* ptr = static_cast(arg); + TRI_ASSERT(ptr != nullptr); ptr->_threadNumber = LOCAL_THREAD_NUMBER; @@ -81,6 +83,13 @@ void Thread::startThread(void* arg) { try { ptr->runMe(); + } catch (std::exception const& ex) { + LOG_TOPIC(WARN, Logger::THREADS) << "caught exception in thread '" << ptr->_name + << "': " << ex.what(); + if (pushed) { + WorkMonitor::popThread(ptr); + } + throw; } catch (...) { if (pushed) { WorkMonitor::popThread(ptr); diff --git a/lib/Basics/threads-win32.cpp b/lib/Basics/threads-win32.cpp index 7509d81199..c9db0dbfa8 100644 --- a/lib/Basics/threads-win32.cpp +++ b/lib/Basics/threads-win32.cpp @@ -30,23 +30,32 @@ /// @brief data block for thread starter //////////////////////////////////////////////////////////////////////////////// -typedef struct thread_data_s { - void (*starter)(void*); +struct thread_data_t { + void (*_starter)(void*); void* _data; - char* _name; -} thread_data_t; + std::string _name; + + thread_data_t(void (*starter)(void*), void* data, char const* name) + : _starter(starter), + _data(data), + _name(name) {} +}; //////////////////////////////////////////////////////////////////////////////// /// @brief starter function for thread //////////////////////////////////////////////////////////////////////////////// static DWORD __stdcall ThreadStarter(void* data) { - thread_data_t* d; + TRI_ASSERT(data != nullptr); - d = (thread_data_t*)data; - d->starter(d->_data); + // this will automatically free the thread struct when leaving this function + std::unique_ptr d(static_cast(data)); - TRI_Free(TRI_CORE_MEM_ZONE, d); + try { + d->_starter(d->_data); + } catch (...) { + // we must not throw from here + } return 0; } @@ -63,26 +72,24 @@ void TRI_InitThread(TRI_thread_t* thread) { *thread = 0; } bool TRI_StartThread(TRI_thread_t* thread, TRI_tid_t* threadId, char const* name, void (*starter)(void*), void* data) { - thread_data_t* d = static_cast( - TRI_Allocate(TRI_CORE_MEM_ZONE, sizeof(thread_data_t), false)); + std::unique_ptr d; - if (d == nullptr) { + try { + d.reset(new thread_data_t(starter, data, name)); + } catch (...) { + LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not start thread: out of memory"; return false; } - d->starter = starter; - d->_data = data; - d->_name = TRI_DuplicateString(name); - + TRI_ASSERT(d != nullptr); *thread = CreateThread(0, // default security attributes 0, // use default stack size ThreadStarter, // thread function name - d, // argument to thread function + d.release(), // argument to thread function 0, // use default creation flags threadId); // returns the thread identifier if (*thread == 0) { - TRI_Free(TRI_CORE_MEM_ZONE, d); LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "could not start thread: " << strerror(errno) << " "; return false; }