From 1002928b5fd59c3eb81fcf68ca400210f1396ff3 Mon Sep 17 00:00:00 2001 From: Jan Date: Fri, 19 Oct 2018 15:26:23 +0200 Subject: [PATCH] fix https://github.com/arangodb/release-3.4/issues/99 (#6951) --- arangod/ClusterEngine/ClusterCollection.cpp | 19 +++++-- arangod/ClusterEngine/ClusterEngine.cpp | 26 +++++----- arangod/ClusterEngine/ClusterEngine.h | 12 ++++- arangod/ClusterEngine/ClusterIndex.cpp | 29 +++++++++-- arangod/ClusterEngine/Common.h | 3 +- .../IResearch/IResearchLinkCoordinator.cpp | 4 +- arangod/IResearch/IResearchLinkCoordinator.h | 5 +- arangod/MMFiles/MMFilesEngine.cpp | 1 + .../DatabaseReplicationApplier.cpp | 10 ++-- .../Replication/GlobalReplicationApplier.cpp | 9 ++-- arangod/Replication/ReplicationApplier.cpp | 20 ++++++-- arangod/Replication/TailingSyncer.cpp | 9 ++++ arangod/Replication/TailingSyncer.h | 4 ++ .../RocksDBEngine/RocksDBIncrementalSync.cpp | 46 ++++++++++++----- .../RocksDBRestReplicationHandler.cpp | 50 +++++++------------ arangod/RocksDBEngine/RocksDBWalAccess.cpp | 24 +++++---- .../StorageEngine/EngineSelectorFeature.cpp | 1 - arangod/V8Server/v8-collection.cpp | 1 + arangod/VocBase/Methods/Version.cpp | 5 +- tests/IResearch/common.cpp | 2 + tests/js/common/shell/shell-database.js | 9 ---- .../js/server/replication/replication-aql.js | 9 ++-- .../replication/replication-fuzz-global.js | 9 ++-- .../js/server/replication/replication-fuzz.js | 9 ++-- .../server/replication/replication-random.js | 9 ++-- 25 files changed, 203 insertions(+), 122 deletions(-) diff --git a/arangod/ClusterEngine/ClusterCollection.cpp b/arangod/ClusterEngine/ClusterCollection.cpp index cc5b190f89..72b2e92118 100644 --- a/arangod/ClusterEngine/ClusterCollection.cpp +++ b/arangod/ClusterEngine/ClusterCollection.cpp @@ -95,7 +95,7 @@ ClusterCollection::ClusterCollection( TRI_ERROR_BAD_PARAMETER, "volatile collections are unsupported in the RocksDB engine"); } - } else { + } else if (_engineType != ClusterEngineType::MockEngine) { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); } @@ -144,7 +144,6 @@ Result ClusterCollection::updateProperties(VPackSlice const& slice, // duplicate all the error handling of the storage engines if (_engineType == ClusterEngineType::MMFilesEngine) { // duplicate the error validation - // validation uint32_t tmp = Helper::getNumericValue( slice, "indexBuckets", @@ -195,12 +194,22 @@ Result ClusterCollection::updateProperties(VPackSlice const& slice, merge.add("cacheEnabled", VPackValue(Helper::readBooleanValue(slice, "cacheEnabled", def))); - } else { + } else if (_engineType != ClusterEngineType::MockEngine) { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); } merge.close(); - _info = VPackCollection::merge(_info.slice(), merge.slice(), true); + TRI_ASSERT(merge.slice().isObject()); + TRI_ASSERT(merge.isClosed()); + + TRI_ASSERT(_info.slice().isObject()); + TRI_ASSERT(_info.isClosed()); + + VPackBuilder tmp = VPackCollection::merge(_info.slice(), merge.slice(), true); + _info = std::move(tmp); + + TRI_ASSERT(_info.slice().isObject()); + TRI_ASSERT(_info.isClosed()); READ_LOCKER(guard, _indexesLock); for (std::shared_ptr& idx : _indexes) { @@ -242,7 +251,7 @@ void ClusterCollection::getPropertiesVPack(velocypack::Builder& result) const { result.add("cacheEnabled", VPackValue(Helper::readBooleanValue( _info.slice(), "cacheEnabled", false))); - } else { + } else if (_engineType != ClusterEngineType::MockEngine) { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); } diff --git a/arangod/ClusterEngine/ClusterEngine.cpp b/arangod/ClusterEngine/ClusterEngine.cpp index c49806ec25..cf2d1bf328 100644 --- a/arangod/ClusterEngine/ClusterEngine.cpp +++ b/arangod/ClusterEngine/ClusterEngine.cpp @@ -64,6 +64,9 @@ using namespace arangodb; using namespace arangodb::application_features; using namespace arangodb::options; +// fall back to the using the mock storage engine +bool ClusterEngine::Mocking = false; + // create the storage engine ClusterEngine::ClusterEngine(application_features::ApplicationServer& server) : StorageEngine( @@ -86,13 +89,19 @@ bool ClusterEngine::isMMFiles() const { return _actualEngine && _actualEngine->name() == MMFilesEngine::FeatureName; } +bool ClusterEngine::isMock() const { + return ClusterEngine::Mocking || (_actualEngine && _actualEngine->name() == "Mock"); +} + ClusterEngineType ClusterEngine::engineType() const { TRI_ASSERT(_actualEngine != nullptr); - if (_actualEngine->name() == MMFilesEngine::FeatureName) { + if (isMMFiles()) { return ClusterEngineType::MMFilesEngine; - } else if (_actualEngine->name() == RocksDBEngine::FeatureName) { + } else if (isRocksDB()) { return ClusterEngineType::RocksDBEngine; + } else if (isMock()) { + return ClusterEngineType::MockEngine; } TRI_ASSERT(false); @@ -105,13 +114,6 @@ ClusterEngineType ClusterEngine::engineType() const { // preparation phase for storage engine. can be used for internal setup. // the storage engine must not start any threads here or write any files void ClusterEngine::prepare() { - // get base path from DatabasePathFeature - auto databasePathFeature = - application_features::ApplicationServer::getFeature( - "DatabasePath"); - _basePath = databasePathFeature->directory(); - - TRI_ASSERT(!_basePath.empty()); if (!ServerState::instance()->isCoordinator()) { setEnabled(false); } @@ -216,10 +218,6 @@ int ClusterEngine::getViews( return TRI_ERROR_NO_ERROR; } -std::string ClusterEngine::versionFilename(TRI_voc_tick_t id) const { - return _basePath + TRI_DIR_SEPARATOR_CHAR + "VERSION-" + std::to_string(id); -} - VPackBuilder ClusterEngine::getReplicationApplierConfiguration( TRI_vocbase_t& vocbase, int& status @@ -391,7 +389,7 @@ void ClusterEngine::addOptimizerRules() { MMFilesOptimizerRules::registerResources(); } else if (engineType() == ClusterEngineType::RocksDBEngine) { RocksDBOptimizerRules::registerResources(); - } else { + } else if (engineType() != ClusterEngineType::MockEngine) { TRI_ASSERT(false); } } diff --git a/arangod/ClusterEngine/ClusterEngine.h b/arangod/ClusterEngine/ClusterEngine.h index efa57636a8..8c2e380772 100644 --- a/arangod/ClusterEngine/ClusterEngine.h +++ b/arangod/ClusterEngine/ClusterEngine.h @@ -58,6 +58,7 @@ class ClusterEngine final : public StorageEngine { StorageEngine* actualEngine() const { return _actualEngine; } bool isRocksDB() const; bool isMMFiles() const; + bool isMock() const; ClusterEngineType engineType() const; // storage engine overrides @@ -127,9 +128,13 @@ class ClusterEngine final : public StorageEngine { arangodb::velocypack::Builder& result ) override; - std::string versionFilename(TRI_voc_tick_t id) const override; + std::string versionFilename(TRI_voc_tick_t id) const override { + // the cluster engine does not have any versioning information + return std::string(); + } std::string databasePath(TRI_vocbase_t const* vocbase) const override { - return _basePath; + // the cluster engine does not have any database path + return std::string(); } std::string collectionPath( TRI_vocbase_t const& vocbase, @@ -342,6 +347,9 @@ class ClusterEngine final : public StorageEngine { static std::string const EngineName; static std::string const FeatureName; + // mock mode + static bool Mocking; + private: /// path to arangodb data dir std::string _basePath; diff --git a/arangod/ClusterEngine/ClusterIndex.cpp b/arangod/ClusterEngine/ClusterIndex.cpp index 79d7b0b310..3cc7a66b64 100644 --- a/arangod/ClusterEngine/ClusterIndex.cpp +++ b/arangod/ClusterEngine/ClusterIndex.cpp @@ -63,6 +63,7 @@ ClusterIndex::ClusterIndex( _info(info), _clusterSelectivity(/* default */0.1) { TRI_ASSERT(_info.slice().isObject()); + TRI_ASSERT(_info.isClosed()); } ClusterIndex::~ClusterIndex() {} @@ -109,6 +110,8 @@ bool ClusterIndex::hasSelectivityEstimate() const { _indexType == Index::TRI_IDX_TYPE_HASH_INDEX || _indexType == Index::TRI_IDX_TYPE_SKIPLIST_INDEX || _indexType == Index::TRI_IDX_TYPE_PERSISTENT_INDEX; + } else if (_engineType == ClusterEngineType::MockEngine) { + return false; } TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, @@ -137,6 +140,8 @@ bool ClusterIndex::isPersistent() const { return _indexType == Index::TRI_IDX_TYPE_PERSISTENT_INDEX; } else if (_engineType == ClusterEngineType::RocksDBEngine) { return true; + } else if (_engineType == ClusterEngineType::MockEngine) { + return false; } TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, @@ -153,6 +158,8 @@ bool ClusterIndex::isSorted() const { _indexType == Index::TRI_IDX_TYPE_SKIPLIST_INDEX || _indexType == Index::TRI_IDX_TYPE_PERSISTENT_INDEX || _indexType == Index::TRI_IDX_TYPE_FULLTEXT_INDEX; + } else if (_engineType == ClusterEngineType::MockEngine) { + return false; } TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, @@ -176,7 +183,12 @@ void ClusterIndex::updateProperties(velocypack::Slice const& slice) { } merge.close(); - _info = VPackCollection::merge(_info.slice(), merge.slice(), true); + TRI_ASSERT(merge.slice().isObject()); + TRI_ASSERT(_info.slice().isObject()); + VPackBuilder tmp = VPackCollection::merge(_info.slice(), merge.slice(), true); + _info = std::move(tmp); + TRI_ASSERT(_info.slice().isObject()); + TRI_ASSERT(_info.isClosed()); } bool ClusterIndex::hasCoveringIterator() const { @@ -190,7 +202,6 @@ bool ClusterIndex::hasCoveringIterator() const { return false; } - bool ClusterIndex::matchesDefinition(VPackSlice const& info) const { // TODO implement faster version of this return Index::Compare(_info.slice(), info); @@ -227,7 +238,7 @@ bool ClusterIndex::supportsFilterCondition( } else if (_engineType == ClusterEngineType::RocksDBEngine) { return PersistentIndexAttributeMatcher::supportsFilterCondition(allIndexes, this, node, reference, itemsInIndex, estimatedItems, estimatedCost); - } + } break; } case TRI_IDX_TYPE_EDGE_INDEX: { @@ -256,6 +267,10 @@ bool ClusterIndex::supportsFilterCondition( case TRI_IDX_TYPE_UNKNOWN: break; } + + if (_engineType == ClusterEngineType::MockEngine) { + return false; + } TRI_ASSERT(false); return false; } @@ -312,6 +327,10 @@ bool ClusterIndex::supportsSortCondition( case TRI_IDX_TYPE_UNKNOWN: break; } + + if (_engineType == ClusterEngineType::MockEngine) { + return false; + } TRI_ASSERT(false); return false; } @@ -365,6 +384,10 @@ aql::AstNode* ClusterIndex::specializeCondition( case TRI_IDX_TYPE_UNKNOWN: break; } + + if (_engineType == ClusterEngineType::MockEngine) { + return node; + } TRI_ASSERT(false); return node; } diff --git a/arangod/ClusterEngine/Common.h b/arangod/ClusterEngine/Common.h index f004803e0e..f01899cfa3 100644 --- a/arangod/ClusterEngine/Common.h +++ b/arangod/ClusterEngine/Common.h @@ -27,7 +27,8 @@ namespace arangodb { enum class ClusterEngineType { MMFilesEngine, - RocksDBEngine + RocksDBEngine, + MockEngine }; } // namespace arangodb diff --git a/arangod/IResearch/IResearchLinkCoordinator.cpp b/arangod/IResearch/IResearchLinkCoordinator.cpp index f290d5e9ac..d7c323edb1 100644 --- a/arangod/IResearch/IResearchLinkCoordinator.cpp +++ b/arangod/IResearch/IResearchLinkCoordinator.cpp @@ -27,11 +27,13 @@ #include "IResearchFeature.h" #include "IResearchViewCoordinator.h" #include "VelocyPackHelper.h" +#include "ClusterEngine/ClusterEngine.h" #include "Cluster/ClusterInfo.h" #include "Basics/StringUtils.h" #include "Logger/Logger.h" #include "RocksDBEngine/RocksDBColumnFamily.h" #include "RocksDBEngine/RocksDBIndex.h" +#include "StorageEngine/EngineSelectorFeature.h" #include "VocBase/LogicalCollection.h" #include "velocypack/Builder.h" #include "velocypack/Slice.h" @@ -63,7 +65,7 @@ namespace iresearch { IResearchLinkCoordinator::IResearchLinkCoordinator( TRI_idx_iid_t id, LogicalCollection& collection -): arangodb::Index(id, collection, IResearchLinkHelper::emptyIndexSlice()) { +): arangodb::ClusterIndex(id, collection, static_cast(arangodb::EngineSelectorFeature::ENGINE)->engineType(), arangodb::Index::TRI_IDX_TYPE_IRESEARCH_LINK, IResearchLinkHelper::emptyIndexSlice()) { TRI_ASSERT(ServerState::instance()->isCoordinator()); _unique = false; // cannot be unique since multiple fields are indexed _sparse = true; // always sparse diff --git a/arangod/IResearch/IResearchLinkCoordinator.h b/arangod/IResearch/IResearchLinkCoordinator.h index 1b0fde7b82..8a25b3edb5 100644 --- a/arangod/IResearch/IResearchLinkCoordinator.h +++ b/arangod/IResearch/IResearchLinkCoordinator.h @@ -25,7 +25,8 @@ #define ARANGODB_IRESEARCH__IRESEARCH_LINK_COORDINATOR_H 1 #include "Indexes/Index.h" -#include "IResearchLinkMeta.h" +#include "ClusterEngine/ClusterIndex.h" +#include "IResearch/IResearchLinkMeta.h" namespace arangodb { namespace iresearch { @@ -36,7 +37,7 @@ class IResearchViewCoordinator; /// @brief common base class for functionality required to link an ArangoDB /// LogicalCollection with an IResearchView on a coordinator in cluster //////////////////////////////////////////////////////////////////////////////// -class IResearchLinkCoordinator final: public arangodb::Index { +class IResearchLinkCoordinator final: public arangodb::ClusterIndex { public: DECLARE_SHARED_PTR(Index); diff --git a/arangod/MMFiles/MMFilesEngine.cpp b/arangod/MMFiles/MMFilesEngine.cpp index 554fd28ec6..aaa4c988c9 100644 --- a/arangod/MMFiles/MMFilesEngine.cpp +++ b/arangod/MMFiles/MMFilesEngine.cpp @@ -874,6 +874,7 @@ std::string MMFilesEngine::createCollection( LogicalCollection const& collection ) { auto path = databasePath(&vocbase); + TRI_ASSERT(!path.empty()); // sanity check if (sizeof(MMFilesDatafileHeaderMarker) + sizeof(MMFilesDatafileFooterMarker) > diff --git a/arangod/Replication/DatabaseReplicationApplier.cpp b/arangod/Replication/DatabaseReplicationApplier.cpp index c0aa6117c6..361bdfa84a 100644 --- a/arangod/Replication/DatabaseReplicationApplier.cpp +++ b/arangod/Replication/DatabaseReplicationApplier.cpp @@ -171,10 +171,12 @@ std::shared_ptr DatabaseReplicationApplier::buildTailingSyncer(TR std::string DatabaseReplicationApplier::getStateFilename() const { StorageEngine* engine = EngineSelectorFeature::ENGINE; - - return arangodb::basics::FileUtils::buildFilename( - engine->databasePath(&_vocbase), "REPLICATION-APPLIER-STATE" - ); + + std::string const path = engine->databasePath(&_vocbase); + if (path.empty()) { + return std::string(); + } + return arangodb::basics::FileUtils::buildFilename(path, "REPLICATION-APPLIER-STATE"); } } // arangodb diff --git a/arangod/Replication/GlobalReplicationApplier.cpp b/arangod/Replication/GlobalReplicationApplier.cpp index 381549d039..e00f283237 100644 --- a/arangod/Replication/GlobalReplicationApplier.cpp +++ b/arangod/Replication/GlobalReplicationApplier.cpp @@ -106,8 +106,11 @@ std::string GlobalReplicationApplier::getStateFilename() const { arangodb::SystemDatabaseFeature >(); auto vocbase = sysDbFeature->use(); + + std::string const path = engine->databasePath(vocbase.get()); + if (path.empty()) { + return std::string(); + } - return arangodb::basics::FileUtils::buildFilename( - engine->databasePath(vocbase.get()), "GLOBAL-REPLICATION-APPLIER-STATE" - ); + return arangodb::basics::FileUtils::buildFilename(path, "GLOBAL-REPLICATION-APPLIER-STATE"); } diff --git a/arangod/Replication/ReplicationApplier.cpp b/arangod/Replication/ReplicationApplier.cpp index d691d9507d..1b52bb632a 100644 --- a/arangod/Replication/ReplicationApplier.cpp +++ b/arangod/Replication/ReplicationApplier.cpp @@ -386,12 +386,16 @@ void ReplicationApplier::removeState() { if (!applies()) { return; } + + std::string const filename = getStateFilename(); + if (filename.empty()) { + // will happen during testing and for coordinator engine + return; + } WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock); _state.reset(false); - std::string const filename = getStateFilename(); - if (TRI_ExistsFile(filename.c_str())) { LOG_TOPIC(TRACE, Logger::REPLICATION) << "removing replication state file '" << filename << "' for " << _databaseName; @@ -436,6 +440,11 @@ bool ReplicationApplier::loadState() { } std::string const filename = getStateFilename(); + if (filename.empty()) { + // will happen during testing and for coordinator engine + return false; + } + LOG_TOPIC(TRACE, Logger::REPLICATION) << "looking for replication state file '" << filename << "' for " << _databaseName; @@ -486,11 +495,16 @@ void ReplicationApplier::persistState(bool doSync) { if (!applies()) { return; } + + std::string const filename = getStateFilename(); + if (filename.empty()) { + // will happen during testing and for coordinator engine + return; + } VPackBuilder builder; _state.toVelocyPack(builder, false); - std::string const filename = getStateFilename(); LOG_TOPIC(TRACE, Logger::REPLICATION) << "saving replication applier state to file '" << filename << "' for " << _databaseName; diff --git a/arangod/Replication/TailingSyncer.cpp b/arangod/Replication/TailingSyncer.cpp index b7476c1bfa..9ff6d0d58e 100644 --- a/arangod/Replication/TailingSyncer.cpp +++ b/arangod/Replication/TailingSyncer.cpp @@ -114,6 +114,9 @@ TailingSyncer::TailingSyncer( // FIXME: move this into engine code std::string const& engineName = EngineSelectorFeature::ENGINE->typeName(); _supportsSingleOperations = (engineName == "mmfiles"); + + // Replication for RocksDB expects only one open transaction at a time + _supportsMultipleOpenTransactions = (engineName != "rocksdb"); } TailingSyncer::~TailingSyncer() { abortOngoingTransactions(); } @@ -500,6 +503,8 @@ Result TailingSyncer::startTransaction(VPackSlice const& slice) { LOG_TOPIC(TRACE, Logger::REPLICATION) << "starting replication transaction " << tid; + TRI_ASSERT(_ongoingTransactions.empty() || _supportsMultipleOpenTransactions); + auto trx = std::make_unique(*vocbase); Result res = trx->begin(); @@ -572,6 +577,8 @@ Result TailingSyncer::commitTransaction(VPackSlice const& slice) { Result res = trx->commit(); _ongoingTransactions.erase(it); + + TRI_ASSERT(_ongoingTransactions.empty() || _supportsMultipleOpenTransactions); return res; } @@ -1618,6 +1625,8 @@ Result TailingSyncer::fetchOpenTransactions(TRI_voc_tick_t fromTick, _ongoingTransactions.emplace(StringUtils::uint64(it.copyString()), nullptr); } + TRI_ASSERT(_ongoingTransactions.size() <= 1 || _supportsMultipleOpenTransactions); + { std::string const progress = "fetched initial master state for from tick " + diff --git a/arangod/Replication/TailingSyncer.h b/arangod/Replication/TailingSyncer.h index cc2e2b35ea..56c0ffde07 100644 --- a/arangod/Replication/TailingSyncer.h +++ b/arangod/Replication/TailingSyncer.h @@ -197,6 +197,10 @@ class TailingSyncer : public Syncer { /// @brief whether or not master & slave can work in parallel bool _workInParallel; + /// @brief max parallel open transactions + /// this will be set to false for RocksDB, and to true for MMFiles + bool _supportsMultipleOpenTransactions; + /// @brief which transactions were open and need to be treated specially std::unordered_map> _ongoingTransactions; diff --git a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp index fa1a35284a..3a37532dd5 100644 --- a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp +++ b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp @@ -106,8 +106,9 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice, builder.clear(); builder.add(velocypack::ValuePair(docKey.data(), docKey.size(), velocypack::ValueType::String)); - trx.remove(col->name(), builder.slice(), options); - ++stats.numDocsRemoved; + if (trx.remove(col->name(), builder.slice(), options).ok()) { + ++stats.numDocsRemoved; + } // continue iteration return true; } @@ -133,8 +134,9 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice, builder.clear(); builder.add(velocypack::ValuePair(docKey.data(), docKey.size(), velocypack::ValueType::String)); - trx.remove(col->name(), builder.slice(), options); - ++stats.numDocsRemoved; + if (trx.remove(col->name(), builder.slice(), options).ok()) { + ++stats.numDocsRemoved; + } } // continue iteration until end @@ -272,8 +274,9 @@ Result syncChunkRocksDB( // we have a local key that is not present remotely keyBuilder->clear(); keyBuilder->add(VPackValue(localKey)); - trx->remove(collectionName, keyBuilder->slice(), options); - ++stats.numDocsRemoved; + if (trx->remove(collectionName, keyBuilder->slice(), options).ok()) { + ++stats.numDocsRemoved; + } ++nextStart; } else if (res == 0) { @@ -316,8 +319,9 @@ Result syncChunkRocksDB( // we have a local key that is not present remotely keyBuilder->clear(); keyBuilder->add(VPackValue(localKey)); - trx->remove(collectionName, keyBuilder->slice(), options); - ++stats.numDocsRemoved; + if (trx->remove(collectionName, keyBuilder->slice(), options).ok()) { + ++stats.numDocsRemoved; + } } ++nextStart; } @@ -439,6 +443,9 @@ Result syncChunkRocksDB( if (conflictId.isSet()) { physical->readDocumentWithCallback(trx, conflictId, [&](LocalDocumentId const&, VPackSlice doc) { res = trx->remove(collectionName, doc, options).result; + if (res.ok()) { + ++stats.numDocsRemoved; + } }); } return res; @@ -467,6 +474,8 @@ Result syncChunkRocksDB( return opRes.result; } } + + ++stats.numDocsInserted; } else { // REPLACE TRI_ASSERT(options.indexOperationMode == Index::OperationMode::internal); @@ -490,7 +499,6 @@ Result syncChunkRocksDB( } } } - ++stats.numDocsInserted; } if (foundLength >= toFetch.size()) { @@ -586,6 +594,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, } size_t const numChunks = static_cast(chunkSlice.length()); + uint64_t const numberDocumentsRemovedBeforeStart = stats.numDocsRemoved; { if (syncer.isAborted()) { @@ -610,7 +619,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, res.errorNumber(), std::string("unable to start transaction: ") + res.errorMessage()); } - + // We do not take responsibility for the index. // The LogicalCollection is protected by trx. // Neither it nor its indexes can be invalidated @@ -683,8 +692,10 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, // smaller values than lowKey mean they don't exist remotely tempBuilder.clear(); tempBuilder.add(VPackValue(docKey)); - trx.remove(col->name(), tempBuilder.slice(), options); - ++stats.numDocsRemoved; + + if (trx.remove(col->name(), tempBuilder.slice(), options).ok()) { + ++stats.numDocsRemoved; + } return; } @@ -749,8 +760,10 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, LogicalCollection* coll = trx.documentCollection(); auto iterator = createPrimaryIndexIterator(&trx, coll); + uint64_t documentsFound = 0; iterator.next( [&](rocksdb::Slice const& rocksKey, rocksdb::Slice const& rocksValue) { + ++documentsFound; std::string docKey = RocksDBKey::primaryKey(rocksKey).toString(); TRI_voc_rid_t docRev; if (!RocksDBValue::revisionId(rocksValue, docRev)) { @@ -779,6 +792,15 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, } } + { + uint64_t numberDocumentsAfterSync = documentsFound + stats.numDocsInserted - (stats.numDocsRemoved - numberDocumentsRemovedBeforeStart); + uint64_t numberDocumentsDueToCounter = col->numberDocuments(&trx, transaction::CountType::Normal); + syncer.setProgress(std::string("number of remaining documents in collection '") + col->name() + "' " + std::to_string(numberDocumentsAfterSync) + ", number of documents due to collection count: " + std::to_string(numberDocumentsDueToCounter)); + if (numberDocumentsAfterSync != numberDocumentsDueToCounter) { + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "number of remaining documents in collection '" + col->name() + "' is " + std::to_string(numberDocumentsAfterSync) + " and differs from number of documents returned by collection count " + std::to_string(numberDocumentsDueToCounter); + } + } + res = trx.commit(); if (!res.ok()) { return res; diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index d9c623ef26..5b6324ac46 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -219,7 +219,6 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { } bool includeSystem = _request->parsedValue("includeSystem", true); - // TODO: determine good default value? uint64_t chunkSize = _request->parsedValue("chunkSize", 1024 * 1024); grantTemporaryRights(); @@ -462,19 +461,14 @@ void RocksDBRestReplicationHandler::handleCommandGetKeys() { } static uint64_t const DefaultChunkSize = 5000; - uint64_t chunkSize = DefaultChunkSize; // determine chunk size - bool found; - std::string const& value = _request->value("chunkSize", found); + uint64_t chunkSize = _request->parsedValue("chunkSize", DefaultChunkSize); - if (found) { - chunkSize = StringUtils::uint64(value); - if (chunkSize < 100) { - chunkSize = DefaultChunkSize; - } else if (chunkSize > 20000) { - chunkSize = 20000; - } + if (chunkSize < 100) { + chunkSize = DefaultChunkSize; + } else if (chunkSize > 20000) { + chunkSize = 20000; } //first suffix needs to be the batch id @@ -515,36 +509,28 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() { } static uint64_t const DefaultChunkSize = 5000; - uint64_t chunkSize = DefaultChunkSize; // determine chunk size - bool found; - std::string const& value1 = _request->value("chunkSize", found); + uint64_t chunkSize = _request->parsedValue("chunkSize", DefaultChunkSize); - if (found) { - chunkSize = StringUtils::uint64(value1); - if (chunkSize < 100) { - chunkSize = DefaultChunkSize; - } else if (chunkSize > 20000) { - chunkSize = 20000; - } + if (chunkSize < 100) { + chunkSize = DefaultChunkSize; + } else if (chunkSize > 20000) { + chunkSize = 20000; } // chunk is supplied by old clients, low is an optimization // for rocksdb, because seeking should be cheaper - std::string const& value2 = _request->value("chunk", found); - size_t chunk = 0; - if (found) { - chunk = static_cast(StringUtils::uint64(value2)); - } - std::string const& lowKey = _request->value("low", found); + size_t chunk = static_cast(_request->parsedValue("chunk", uint64_t(0))); - std::string const& value3 = _request->value("type", found); + bool found; + std::string const& lowKey = _request->value("low", found); + std::string const& value = _request->value("type", found); bool keys = true; - if (value3 == "keys") { + if (value == "keys") { keys = true; - } else if (value3 == "docs") { + } else if (value == "docs") { keys = false; } else { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, @@ -554,9 +540,9 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() { size_t offsetInChunk = 0; size_t maxChunkSize = SIZE_MAX; - std::string const& value4 = _request->value("offset", found); + std::string const& value2 = _request->value("offset", found); if (found) { - offsetInChunk = static_cast(StringUtils::uint64(value4)); + offsetInChunk = static_cast(StringUtils::uint64(value2)); // "offset" was introduced with ArangoDB 3.3. if the client sends it, // it means we can adapt the result size dynamically and the client // may refetch data for the same chunk diff --git a/arangod/RocksDBEngine/RocksDBWalAccess.cpp b/arangod/RocksDBEngine/RocksDBWalAccess.cpp index 3a0525e757..613a912b9c 100644 --- a/arangod/RocksDBEngine/RocksDBWalAccess.cpp +++ b/arangod/RocksDBEngine/RocksDBWalAccess.cpp @@ -113,12 +113,16 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC _lastWrittenSequence(0) {} bool Continue() override { + if (_stopOnNext) { + return false; + } + if (_responseSize > _maxResponseSize) { // it should only be possible to be in the middle of a huge batch, // if and only if we are in one big transaction. We may not stop - // while if (_state == TRANSACTION && _removedDocRid == 0) { - return false; + // this will make us process one more marker still + _stopOnNext = true; } } return true; @@ -128,8 +132,8 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC // rocksdb does not count LogData towards sequence-number RocksDBLogType type = RocksDBLogValue::type(blob); - //LOG_TOPIC(ERR, Logger::ENGINES) << "[LOG] " << _currentSequence - // << " " << rocksDBLogTypeName(type); + // LOG_TOPIC(WARN, Logger::REPLICATION) << "[LOG] " << _currentSequence + // << " " << rocksDBLogTypeName(type); switch (type) { case RocksDBLogType::DatabaseCreate: resetTransientState(); // finish ongoing trx @@ -402,8 +406,7 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key, rocksdb::Slice const& value) override { incTick(); - - //LOG_TOPIC(ERR, Logger::ENGINES) << "[PUT] cf: " << column_family_id + // LOG_TOPIC(WARN, Logger::ENGINES) << "[PUT] cf: " << column_family_id // << ", key:" << key.ToString() << " value: " << value.ToString(); if (column_family_id == _definitionsCF) { @@ -634,6 +637,7 @@ public: } void startNewBatch(rocksdb::SequenceNumber startSequence) { + TRI_ASSERT(!_stopOnNext); // starting new write batch _startSequence = startSequence; _currentSequence = startSequence; @@ -645,7 +649,7 @@ public: } uint64_t endBatch() { - TRI_ASSERT(_removedDocRid == 0); + TRI_ASSERT(_removedDocRid == 0 || _stopOnNext); resetTransientState(); return _currentSequence; } @@ -720,6 +724,7 @@ public: TRI_voc_tick_t _currentTrxId = 0; TRI_voc_tick_t _trxDbId = 0; // remove eventually TRI_voc_rid_t _removedDocRid = 0; + bool _stopOnNext = false; }; // iterates over WAL starting at 'from' and returns up to 'chunkSize' documents @@ -735,6 +740,7 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize, if (chunkSize < 16384) { // we need to have some sensible minimum chunkSize = 16384; } + // pre 3.4 breaking up write batches is not supported size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX; @@ -815,7 +821,7 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize, if (!s.ok()) { result.Result::reset(convertStatus(s, rocksutils::StatusHint::wal)); } - //LOG_TOPIC(WARN, Logger::ENGINES) << "2. firstTick: " << firstTick << " lastWrittenTick: " << lastWrittenTick - //<< " latestTick: " << latestTick; + // LOG_TOPIC(WARN, Logger::REPLICATION) << "2. firstTick: " << firstTick << " lastWrittenTick: " << lastWrittenTick + // << " latestTick: " << latestTick; return result; } diff --git a/arangod/StorageEngine/EngineSelectorFeature.cpp b/arangod/StorageEngine/EngineSelectorFeature.cpp index b76658df3e..e03dfff467 100644 --- a/arangod/StorageEngine/EngineSelectorFeature.cpp +++ b/arangod/StorageEngine/EngineSelectorFeature.cpp @@ -88,7 +88,6 @@ void EngineSelectorFeature::prepare() { TRI_ASSERT(_engine != "auto"); if (ServerState::instance()->isCoordinator()) { - ClusterEngine* ce = application_features::ApplicationServer::getFeature("ClusterEngine"); ENGINE = ce; diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index ae1d3864ed..c76f509d89 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1321,6 +1321,7 @@ static void JS_PropertiesVocbaseCol( TRI_V8_THROW_EXCEPTION(res); } } + TRI_ASSERT(builder.isClosed()); Result res = methods::Collections::updateProperties(consoleColl, builder.slice()); if (res.fail() && ServerState::instance()->isCoordinator()) { TRI_V8_THROW_EXCEPTION(res); diff --git a/arangod/VocBase/Methods/Version.cpp b/arangod/VocBase/Methods/Version.cpp index 189ea5e3ca..8b364ba500 100644 --- a/arangod/VocBase/Methods/Version.cpp +++ b/arangod/VocBase/Methods/Version.cpp @@ -155,7 +155,10 @@ Result Version::write(TRI_vocbase_t* vocbase, TRI_ASSERT(engine != nullptr); std::string versionFile = engine->versionFilename(vocbase->id()); - TRI_ASSERT(!versionFile.empty()); + if (versionFile.empty()) { + // cluster engine + return Result(); + } VPackOptions opts; opts.buildUnindexedObjects = true; diff --git a/tests/IResearch/common.cpp b/tests/IResearch/common.cpp index cddee4f921..386d9c87b5 100644 --- a/tests/IResearch/common.cpp +++ b/tests/IResearch/common.cpp @@ -31,6 +31,7 @@ #include "Aql/ExecutionPlan.h" #include "Aql/ExpressionContext.h" #include "Aql/Ast.h" +#include "ClusterEngine/ClusterEngine.h" #include "Basics/files.h" #include "RestServer/DatabasePathFeature.h" #include "V8/v8-utils.h" @@ -130,6 +131,7 @@ namespace tests { void init(bool withICU /*= false*/) { arangodb::transaction::Methods::clearDataSourceRegistrationCallbacks(); + ClusterEngine::Mocking = true; } // @Note: once V8 is initialized all 'CATCH' errors will result in SIGILL diff --git a/tests/js/common/shell/shell-database.js b/tests/js/common/shell/shell-database.js index fc4ad8583a..6287b0a22a 100644 --- a/tests/js/common/shell/shell-database.js +++ b/tests/js/common/shell/shell-database.js @@ -69,15 +69,6 @@ function DatabaseSuite () { assertEqual("_system", internal.db._name()); }, -//////////////////////////////////////////////////////////////////////////////// -/// @brief test _path function -//////////////////////////////////////////////////////////////////////////////// - - testPath : function () { - assertTrue(typeof internal.db._path() === "string"); - assertTrue(internal.db._path() !== ""); - }, - //////////////////////////////////////////////////////////////////////////////// /// @brief test _isSystem function //////////////////////////////////////////////////////////////////////////////// diff --git a/tests/js/server/replication/replication-aql.js b/tests/js/server/replication/replication-aql.js index 6f870a2704..6c7a8cc9be 100644 --- a/tests/js/server/replication/replication-aql.js +++ b/tests/js/server/replication/replication-aql.js @@ -112,24 +112,23 @@ function ReplicationSuite() { var slaveState = replication.applier.state(); if (slaveState.state.lastError.errorNum > 0) { - console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError)); break; } if (!slaveState.state.running) { - console.log("slave is not running"); + console.topic("replication=error", "slave is not running"); break; } if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 || compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // || - // compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) { - console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); + console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); break; } if (!printed) { - console.log("waiting for slave to catch up"); + console.topic("replication=debug", "waiting for slave to catch up"); printed = true; } internal.wait(0.5, false); diff --git a/tests/js/server/replication/replication-fuzz-global.js b/tests/js/server/replication/replication-fuzz-global.js index 06e248e4cb..3f477b7c79 100644 --- a/tests/js/server/replication/replication-fuzz-global.js +++ b/tests/js/server/replication/replication-fuzz-global.js @@ -96,24 +96,23 @@ function ReplicationSuite() { let slaveState = replication.globalApplier.state(); if (slaveState.state.lastError.errorNum > 0) { - console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError)); break; } if (!slaveState.state.running) { - console.log("slave is not running"); + console.topic("replication=error", "slave is not running"); break; } if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 || compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // || - // compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) { - console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); + console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); break; } if (!printed) { - console.log("waiting for slave to catch up"); + console.topic("replication=debug", "waiting for slave to catch up"); printed = true; } internal.wait(0.5, false); diff --git a/tests/js/server/replication/replication-fuzz.js b/tests/js/server/replication/replication-fuzz.js index a4ecb98938..faa3fe81f9 100644 --- a/tests/js/server/replication/replication-fuzz.js +++ b/tests/js/server/replication/replication-fuzz.js @@ -112,24 +112,23 @@ function ReplicationSuite() { var slaveState = replication.applier.state(); if (slaveState.state.lastError.errorNum > 0) { - console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError)); break; } if (!slaveState.state.running) { - console.log("slave is not running"); + console.topic("replication=error", "slave is not running"); break; } if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 || compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // || - // compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) { - console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); + console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); break; } if (!printed) { - console.log("waiting for slave to catch up"); + console.topic("replication=debug", "waiting for slave to catch up"); printed = true; } internal.wait(0.5, false); diff --git a/tests/js/server/replication/replication-random.js b/tests/js/server/replication/replication-random.js index 65f3ab93e0..a3a5347122 100644 --- a/tests/js/server/replication/replication-random.js +++ b/tests/js/server/replication/replication-random.js @@ -114,24 +114,23 @@ function ReplicationSuite() { var slaveState = replication.applier.state(); if (slaveState.state.lastError.errorNum > 0) { - console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError)); break; } if (!slaveState.state.running) { - console.log("slave is not running"); + console.topic("replication=error", "slave is not running"); break; } if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 || compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // || - // compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) { - console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); + console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick); break; } if (!printed) { - console.log("waiting for slave to catch up"); + console.topic("replication=debug", "waiting for slave to catch up"); printed = true; } internal.wait(0.5, false);