diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 5bdd3b6dcd..28b3c936f5 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -258,8 +258,8 @@ class ClusterMethods { static std::unique_ptr createCollectionOnCoordinator( TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, arangodb::velocypack::Slice parameters, - bool ignoreDistributeShardsLikeErrors = true, - bool waitForSyncReplication = true); + bool ignoreDistributeShardsLikeErrors, + bool waitForSyncReplication); private: @@ -268,8 +268,8 @@ class ClusterMethods { //////////////////////////////////////////////////////////////////////////////// static std::unique_ptr persistCollectionInAgency( - LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = true, - bool waitForSyncReplication = true); + LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, + bool waitForSyncReplication); }; } // namespace arangodb diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 881e6671e7..5a8a448198 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -285,6 +285,8 @@ bool RocksDBReplicationManager::garbageCollect(bool force) { MUTEX_LOCKER(mutexLocker, _lock); + auto oldSize = _contexts.size(); + for (auto it = _contexts.begin(); it != _contexts.end(); /* no hoisting */) { auto context = it->second; @@ -318,7 +320,7 @@ bool RocksDBReplicationManager::garbageCollect(bool force) { // FIXME effectively force should only be called on shutdown // nevertheless this is quite ugly - if (_contexts.size() == 0 && !force) { + if ((oldSize > 0) && (_contexts.size() == 0) && !force) { enableFileDeletions(); } } catch (...) { @@ -341,7 +343,7 @@ void RocksDBReplicationManager::disableFileDeletions() { void RocksDBReplicationManager::enableFileDeletions() { auto rocks = globalRocksDB(); - auto s = rocks->DisableFileDeletions(); + auto s = rocks->EnableFileDeletions(false); TRI_ASSERT(s.ok()); } diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 2f2c890078..752e12cf57 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -903,6 +903,11 @@ void RocksDBRestReplicationHandler::handleCommandRestoreCollection() { force = StringUtils::boolean(value3); } + std::string const& value9 = + _request->value("ignoreDistributeShardsLikeErrors", found); + bool ignoreDistributeShardsLikeErrors = + found ? StringUtils::boolean(value9) : false; + uint64_t numberOfShards = 0; std::string const& value4 = _request->value("numberOfShards", found); @@ -921,9 +926,9 @@ void RocksDBRestReplicationHandler::handleCommandRestoreCollection() { int res; if (ServerState::instance()->isCoordinator()) { - res = processRestoreCollectionCoordinator(slice, overwrite, recycleIds, - force, numberOfShards, errorMsg, - replicationFactor); + res = processRestoreCollectionCoordinator( + slice, overwrite, recycleIds, force, numberOfShards, errorMsg, + replicationFactor, ignoreDistributeShardsLikeErrors); } else { res = processRestoreCollection(slice, overwrite, recycleIds, force, errorMsg); @@ -2352,7 +2357,7 @@ int RocksDBRestReplicationHandler::processRestoreCollection( int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator( VPackSlice const& collection, bool dropExisting, bool reuseId, bool force, uint64_t numberOfShards, std::string& errorMsg, - uint64_t replicationFactor) { + uint64_t replicationFactor, bool ignoreDistributeShardsLikeErrors) { if (!collection.isObject()) { errorMsg = "collection declaration is invalid"; @@ -2488,7 +2493,8 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator( "Cluster") ->createWaitsForSyncReplication(); auto col = ClusterMethods::createCollectionOnCoordinator( - collectionType, _vocbase, merged, true, createWaitsForSyncReplication); + collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors, + createWaitsForSyncReplication); TRI_ASSERT(col != nullptr); } catch (basics::Exception const& e) { // Error, report it. diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h index 786d84b438..930089b49c 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.h @@ -260,7 +260,7 @@ class RocksDBRestReplicationHandler : public RestVocbaseBaseHandler { ////////////////////////////////////////////////////////////////////////////// int processRestoreCollectionCoordinator(VPackSlice const&, bool, bool, bool, - uint64_t, std::string&, uint64_t); + uint64_t, std::string&, uint64_t, bool); ////////////////////////////////////////////////////////////////////////////// /// @brief creates a collection, based on the VelocyPack provided TODO: MOVE diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 807cf61537..8beb274bfd 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -55,7 +55,9 @@ using namespace arangodb; // for the RocksDB engine we do not need any additional data struct RocksDBTransactionData final : public TransactionData {}; -RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function const& rollbackCallback) +RocksDBSavePoint::RocksDBSavePoint( + rocksdb::Transaction* trx, bool handled, + std::function const& rollbackCallback) : _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) { TRI_ASSERT(trx != nullptr); if (!_handled) { @@ -110,8 +112,8 @@ RocksDBTransactionState::~RocksDBTransactionState() { /// @brief start a transaction Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { - LOG_TRX(this, _nestingLevel) - << "beginning " << AccessMode::typeString(_type) << " transaction"; + LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type) + << " transaction"; Result result = useCollections(_nestingLevel); @@ -157,12 +159,13 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { _rocksTransaction->SetSnapshot(); _rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot(); - if (!isReadOnlyTransaction() && !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { + if (!isReadOnlyTransaction() && + !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) { RocksDBLogValue header = - RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); + RocksDBLogValue::BeginTransaction(_vocbase->id(), _id); _rocksTransaction->PutLogData(header.slice()); } - + } else { TRI_ASSERT(_status == transaction::Status::RUNNING); } @@ -173,8 +176,8 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { /// @brief commit a transaction Result RocksDBTransactionState::commitTransaction( transaction::Methods* activeTrx) { - LOG_TRX(this, _nestingLevel) - << "committing " << AccessMode::typeString(_type) << " transaction"; + LOG_TRX(this, _nestingLevel) << "committing " << AccessMode::typeString(_type) + << " transaction"; TRI_ASSERT(_status == transaction::Status::RUNNING); TRI_IF_FAILURE("TransactionWriteCommitMarker") { @@ -185,13 +188,14 @@ Result RocksDBTransactionState::commitTransaction( if (_nestingLevel == 0) { if (_rocksTransaction != nullptr) { - if (hasOperations()) { - // set wait for sync flag if required + // if (hasOperations()) { + if (_rocksTransaction->GetNumKeys() > 0) { + // set wait for sync flag if required if (waitForSync()) { _rocksWriteOptions.sync = true; _rocksTransaction->SetWriteOptions(_rocksWriteOptions); } - + // TODO wait for response on github issue to see how we can use the // sequence number result = rocksutils::convertStatus(_rocksTransaction->Commit()); @@ -231,10 +235,8 @@ Result RocksDBTransactionState::commitTransaction( } } else { // don't write anything if the transaction is empty - // TODO: calling Rollback() here does not work for some reason but it should. - // must investigate further!! - result = rocksutils::convertStatus(_rocksTransaction->Commit()); - + result = rocksutils::convertStatus(_rocksTransaction->Rollback()); + if (_cacheTx != nullptr) { // note: endTransaction() will delete _cacheTx! CacheManagerFeature::MANAGER->endTransaction(_cacheTx); @@ -256,8 +258,8 @@ Result RocksDBTransactionState::commitTransaction( /// @brief abort and rollback a transaction Result RocksDBTransactionState::abortTransaction( transaction::Methods* activeTrx) { - LOG_TRX(this, _nestingLevel) - << "aborting " << AccessMode::typeString(_type) << " transaction"; + LOG_TRX(this, _nestingLevel) << "aborting " << AccessMode::typeString(_type) + << " transaction"; TRI_ASSERT(_status == transaction::Status::RUNNING); Result result; @@ -265,7 +267,7 @@ Result RocksDBTransactionState::abortTransaction( if (_rocksTransaction != nullptr) { rocksdb::Status status = _rocksTransaction->Rollback(); result = rocksutils::convertStatus(status); - + if (_cacheTx != nullptr) { // note: endTransaction() will delete _cacheTx! CacheManagerFeature::MANAGER->endTransaction(_cacheTx); @@ -290,26 +292,25 @@ Result RocksDBTransactionState::abortTransaction( } void RocksDBTransactionState::prepareOperation( - TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, - StringRef const& key, TRI_voc_document_operation_e operationType) { - + TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, StringRef const& key, + TRI_voc_document_operation_e operationType) { TRI_ASSERT(!isReadOnlyTransaction()); - + bool singleOp = hasHint(transaction::Hints::Hint::SINGLE_OPERATION); // single operations should never call this method twice - TRI_ASSERT(!singleOp || _lastUsedCollection == 0); + TRI_ASSERT(!singleOp || _lastUsedCollection == 0); if (collectionId != _lastUsedCollection) { switch (operationType) { case TRI_VOC_DOCUMENT_OPERATION_INSERT: case TRI_VOC_DOCUMENT_OPERATION_UPDATE: case TRI_VOC_DOCUMENT_OPERATION_REPLACE: { if (singleOp) { - RocksDBLogValue logValue = RocksDBLogValue::SinglePut(_vocbase->id(), - collectionId); + RocksDBLogValue logValue = + RocksDBLogValue::SinglePut(_vocbase->id(), collectionId); _rocksTransaction->PutLogData(logValue.slice()); } else { RocksDBLogValue logValue = - RocksDBLogValue::DocumentOpsPrologue(collectionId); + RocksDBLogValue::DocumentOpsPrologue(collectionId); _rocksTransaction->PutLogData(logValue.slice()); } break; @@ -317,13 +318,12 @@ void RocksDBTransactionState::prepareOperation( case TRI_VOC_DOCUMENT_OPERATION_REMOVE: { if (singleOp) { TRI_ASSERT(!key.empty()); - RocksDBLogValue logValue = RocksDBLogValue::SingleRemove(_vocbase->id(), - collectionId, - key); + RocksDBLogValue logValue = + RocksDBLogValue::SingleRemove(_vocbase->id(), collectionId, key); _rocksTransaction->PutLogData(logValue.slice()); } else { RocksDBLogValue logValue = - RocksDBLogValue::DocumentOpsPrologue(collectionId); + RocksDBLogValue::DocumentOpsPrologue(collectionId); _rocksTransaction->PutLogData(logValue.slice()); } } break; @@ -332,11 +332,11 @@ void RocksDBTransactionState::prepareOperation( } _lastUsedCollection = collectionId; } - - // we need to log the remove log entry, if we don't have the single optimization + + // we need to log the remove log entry, if we don't have the single + // optimization if (!singleOp && operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { - RocksDBLogValue logValue = - RocksDBLogValue::DocumentRemove(key); + RocksDBLogValue logValue = RocksDBLogValue::DocumentRemove(key); _rocksTransaction->PutLogData(logValue.slice()); } } diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index 0e7aa44520..81cc3779d3 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -747,9 +747,22 @@ void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result, std::unordered_set ignoreKeys{"allowUserKeys", "cid", "count", "objectId", - "statusString", "version"}; + "statusString", "version", + "distributeShardsLike"}; VPackBuilder params = toVelocyPackIgnore(ignoreKeys, false); - result.add(params.slice()); + { VPackObjectBuilder guard(&result); + for (auto const& p : VPackObjectIterator(params.slice())) { + result.add(p.key); + result.add(p.value); + } + if (!_distributeShardsLike.empty()) { + CollectionNameResolver resolver(_vocbase); + result.add("distributeShardsLike", + VPackValue(resolver.getCollectionNameCluster( + static_cast(basics::StringUtils::uint64( + distributeShardsLike()))))); + } + } result.add(VPackValue("indexes")); getIndexesVPack(result, false);