diff --git a/CHANGELOG b/CHANGELOG index 369a703ca0..bde6d4438d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -113,7 +113,7 @@ devel when unused. Waiting for an unused V8 context will now also abort if no V8 context can be - acquired/created after 120 seconds. + acquired/created after 60 seconds. * improved diagnostic messages written to logfiles by supervisor process diff --git a/Installation/Jenkins/build.sh b/Installation/Jenkins/build.sh index b37f90e7a9..6f517eda04 100755 --- a/Installation/Jenkins/build.sh +++ b/Installation/Jenkins/build.sh @@ -254,6 +254,7 @@ while [ $# -gt 0 ]; do MAKE="cmake --build . --config ${BUILD_CONFIG}" PACKAGE_MAKE="cmake --build . --config ${BUILD_CONFIG} --target" CONFIGURE_OPTIONS="${CONFIGURE_OPTIONS} -DV8_TARGET_ARCHS=Release" + export _IsNativeEnvironment=true ;; --symsrv) diff --git a/arangod/Aql/ShortestPathBlock.h b/arangod/Aql/ShortestPathBlock.h index 4f2fcfe963..0f59d959a2 100644 --- a/arangod/Aql/ShortestPathBlock.h +++ b/arangod/Aql/ShortestPathBlock.h @@ -32,7 +32,6 @@ namespace arangodb { class ManagedDocumentResult; namespace graph { -class ConstantWeightShortestPathFinder; class ShortestPathFinder; class ShortestPathResult; } @@ -49,9 +48,6 @@ class ShortestPathBlock : public ExecutionBlock { friend struct EdgeWeightExpanderLocal; friend struct EdgeWeightExpanderCluster; - // TODO ONLY TEMPORARY - friend class graph::ConstantWeightShortestPathFinder; - public: ShortestPathBlock(ExecutionEngine* engine, ShortestPathNode const* ep); diff --git a/arangod/Cluster/ClusterFeature.cpp b/arangod/Cluster/ClusterFeature.cpp index a8e46814fe..33773b7207 100644 --- a/arangod/Cluster/ClusterFeature.cpp +++ b/arangod/Cluster/ClusterFeature.cpp @@ -133,6 +133,10 @@ void ClusterFeature::collectOptions(std::shared_ptr options) { options->addOption("--cluster.system-replication-factor", "replication factor for system collections", new UInt32Parameter(&_systemReplicationFactor)); + + options->addOption("--cluster.create-waits-for-sync-replication", + "active coordinator will wait for all replicas to create collection", + new BooleanParameter(&_createWaitsForSyncReplication)); } void ClusterFeature::validateOptions(std::shared_ptr options) { diff --git a/arangod/Cluster/ClusterFeature.h b/arangod/Cluster/ClusterFeature.h index fabec8ee02..a27eb0c786 100644 --- a/arangod/Cluster/ClusterFeature.h +++ b/arangod/Cluster/ClusterFeature.h @@ -62,6 +62,7 @@ class ClusterFeature : public application_features::ApplicationFeature { std::string _dbserverConfig; std::string _coordinatorConfig; uint32_t _systemReplicationFactor = 2; + bool _createWaitsForSyncReplication = false; private: void reportRole(ServerState::RoleEnum); @@ -76,6 +77,7 @@ class ClusterFeature : public application_features::ApplicationFeature { }; void setUnregisterOnShutdown(bool); + bool createWaitsForSyncReplication() { return _createWaitsForSyncReplication; }; void stop() override final; diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index f2a12dae23..a170b323e0 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1042,6 +1042,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, std::string const& collectionID, uint64_t numberOfShards, uint64_t replicationFactor, + bool waitForReplication, VPackSlice const& json, std::string& errorMsg, double timeout) { @@ -1100,19 +1101,10 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, [=](VPackSlice const& result) { if (result.isObject() && result.length() == (size_t)numberOfShards) { std::string tmpMsg = ""; - bool tmpHaveError = false; for (auto const& p : VPackObjectIterator(result)) { - if (replicationFactor == 0) { - VPackSlice servers = p.value.get("servers"); - if (!servers.isArray() || servers.length() < dbServers.size()) { - return true; - } - } - if (arangodb::basics::VelocyPackHelper::getBooleanValue( p.value, "error", false)) { - tmpHaveError = true; tmpMsg += " shardID:" + p.key.copyString() + ":"; tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue( p.value, "errorMessage", ""); @@ -1125,13 +1117,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, tmpMsg += ")"; } } + *errMsg = "Error in creation of collection:" + tmpMsg + " " + + __FILE__ + std::to_string(__LINE__); + *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; + return true; + } + + // wait that all followers have created our new collection + if (waitForReplication) { + uint64_t mutableReplicationFactor = replicationFactor; + if (mutableReplicationFactor == 0) { + mutableReplicationFactor = dbServers.size(); + } + + VPackSlice servers = p.value.get("servers"); + if (!servers.isArray() || servers.length() < mutableReplicationFactor) { + return true; + } } - } - if (tmpHaveError) { - *errMsg = "Error in creation of collection:" + tmpMsg + " " - + __FILE__ + std::to_string(__LINE__); - *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; - return true; } *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); } diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 0f4fa1634b..c9e8e3f4d1 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -349,6 +349,7 @@ class ClusterInfo { std::string const& collectionID, uint64_t numberOfShards, uint64_t replicationFactor, + bool waitForReplication, arangodb::velocypack::Slice const& json, std::string& errorMsg, double timeout); diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index df21f5e745..61190d6f0d 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -2264,12 +2264,13 @@ std::unique_ptr ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters, - bool ignoreDistributeShardsLikeErrors) { + bool ignoreDistributeShardsLikeErrors, + bool waitForSyncReplication) { auto col = std::make_unique(vocbase, parameters); // Collection is a temporary collection object that undergoes sanity checks etc. // It is not used anywhere and will be cleaned up after this call. // Persist collection will return the real object. - return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors); + return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors, waitForSyncReplication); } #endif @@ -2279,7 +2280,7 @@ ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType, std::unique_ptr ClusterMethods::persistCollectionInAgency( - LogicalCollection* col, bool ignoreDistributeShardsLikeErrors) { + LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication) { std::string distributeShardsLike = col->distributeShardsLike(); std::vector dbServers; std::vector avoid = col->avoidServers(); @@ -2364,7 +2365,7 @@ ClusterMethods::persistCollectionInAgency( std::string errorMsg; int myerrno = ci->createCollectionCoordinator( col->dbName(), col->cid_as_string(), - col->numberOfShards(), col->replicationFactor(), velocy.slice(), errorMsg, 240.0); + col->numberOfShards(), col->replicationFactor(), waitForSyncReplication, velocy.slice(), errorMsg, 240.0); if (myerrno != TRI_ERROR_NO_ERROR) { if (errorMsg.empty()) { diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 57e8095f2a..e1929bdc4e 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -258,7 +258,8 @@ class ClusterMethods { static std::unique_ptr createCollectionOnCoordinator( TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, arangodb::velocypack::Slice parameters, - bool ignoreDistributeShardsLikeErrors = false); + bool ignoreDistributeShardsLikeErrors, + bool waitForSyncReplication); private: @@ -267,7 +268,7 @@ class ClusterMethods { //////////////////////////////////////////////////////////////////////////////// static std::unique_ptr persistCollectionInAgency( - LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = false); + LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication); }; } // namespace arangodb diff --git a/arangod/MMFiles/MMFilesEngine.cpp b/arangod/MMFiles/MMFilesEngine.cpp index 12c506f3a4..260642b18d 100644 --- a/arangod/MMFiles/MMFilesEngine.cpp +++ b/arangod/MMFiles/MMFilesEngine.cpp @@ -152,7 +152,7 @@ MMFilesEngine::MMFilesEngine(application_features::ApplicationServer* server) MMFilesEngine::~MMFilesEngine() {} // perform a physical deletion of the database -Result MMFilesEngine::dropDatabase(Database* database) { +Result MMFilesEngine::dropDatabase(TRI_vocbase_t* database) { // delete persistent indexes for this database MMFilesPersistentIndexFeature::dropDatabase(database->id()); diff --git a/arangod/MMFiles/MMFilesEngine.h b/arangod/MMFiles/MMFilesEngine.h index 74647c0223..c2f845cf07 100644 --- a/arangod/MMFiles/MMFilesEngine.h +++ b/arangod/MMFiles/MMFilesEngine.h @@ -138,14 +138,14 @@ class MMFilesEngine final : public StorageEngine { void waitForSync(TRI_voc_tick_t tick) override; virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override; - Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override { + TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override { status = TRI_ERROR_NO_ERROR; return createDatabaseMMFiles(id, args); } int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) override; void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) override; - Result dropDatabase(Database* database) override; + Result dropDatabase(TRI_vocbase_t* database) override; void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override; // wal in recovery diff --git a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp index 028d0a275c..b01a938053 100644 --- a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp +++ b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp @@ -29,6 +29,7 @@ #include "Basics/conversions.h" #include "Basics/files.h" #include "Cluster/ClusterComm.h" +#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "GeneralServer/GeneralServer.h" @@ -1680,8 +1681,9 @@ int MMFilesRestReplicationHandler::processRestoreCollectionCoordinator( VPackSlice const merged = mergedBuilder.slice(); try { + bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature("Cluster")->createWaitsForSyncReplication(); auto col = ClusterMethods::createCollectionOnCoordinator( - collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors); + collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors, createWaitsForSyncReplication); TRI_ASSERT(col != nullptr); } catch (basics::Exception const& e) { // Error, report it. diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 9631123b5d..6721c216ac 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -1072,8 +1072,6 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col, sendExtendBatch(); sendExtendBarrier(); - std::vector toFetch; - TRI_voc_tick_t const chunkSize = 5000; std::string const baseUrl = BaseUrl + "/keys"; @@ -1258,7 +1256,7 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col, std::function parseDoc = [&](VPackSlice doc, VPackSlice key) { - bool rangeUneqal = false; + bool rangeUnequal = false; bool nextChunk = false; int cmp1 = key.compareString(lowKey.data(), lowKey.length()); @@ -1274,7 +1272,7 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col, if (cmp1 == 0) { foundLowKey = true; } else if (!foundLowKey && cmp1 > 0) { - rangeUneqal = true; + rangeUnequal = true; nextChunk = true; } @@ -1286,28 +1284,28 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col, markers.emplace_back(key.copyString(), TRI_ExtractRevisionId(doc)); if (cmp2 == 0) { // found highKey - rangeUneqal = std::to_string(localHash) != hashString; + rangeUnequal = std::to_string(localHash) != hashString; nextChunk = true; } } else if (cmp2 == 0) { - rangeUneqal = true; + rangeUnequal = true; nextChunk = true; } } else if (cmp2 > 0) { // higher than highKey // current range was unequal and we did not find the // high key. Load range and skip to next - rangeUneqal = true; + rangeUnequal = true; nextChunk = true; } - if (rangeUneqal) { + if (rangeUnequal) { int res = syncChunkRocksDB(&trx, keysId, currentChunkId, lowKey, highKey, markers, errorMsg); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } } - TRI_ASSERT(!rangeUneqal || (rangeUneqal && nextChunk)); // A => B + TRI_ASSERT(!rangeUnequal || nextChunk); // A => B if (nextChunk && currentChunkId + 1 < numChunks) { currentChunkId++; // we are out of range, see next chunk resetChunk(); diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 7b7f439d12..548c14ec9a 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -230,7 +230,12 @@ void RocksDBEngine::start() { } } -void RocksDBEngine::stop() {} +void RocksDBEngine::stop() { + if (!isEnabled()) { + return; + } + replicationManager()->dropAll(); +} void RocksDBEngine::unprepare() { if (!isEnabled()) { @@ -486,7 +491,7 @@ TRI_vocbase_t* RocksDBEngine::openDatabase( return openExistingDatabase(id, name, true, isUpgrade); } -RocksDBEngine::Database* RocksDBEngine::createDatabase( +TRI_vocbase_t* RocksDBEngine::createDatabase( TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) { status = TRI_ERROR_NO_ERROR; auto vocbase = std::make_unique(TRI_VOCBASE_TYPE_NORMAL, id, @@ -519,10 +524,6 @@ int RocksDBEngine::writeCreateCollectionMarker(TRI_voc_tick_t databaseId, void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) { - // probably not required - // THROW_ARANGO_NOT_YET_IMPLEMENTED(); - - // status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true); VPackBuilder builder; builder.openObject(); builder.add("id", VPackValue(std::to_string(vocbase->id()))); @@ -533,7 +534,8 @@ void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase, status = writeCreateDatabaseMarker(vocbase->id(), builder.slice()); } -Result RocksDBEngine::dropDatabase(Database* database) { +Result RocksDBEngine::dropDatabase(TRI_vocbase_t* database) { + replicationManager()->drop(database); return dropDatabase(database->id()); } diff --git a/arangod/RocksDBEngine/RocksDBEngine.h b/arangod/RocksDBEngine/RocksDBEngine.h index 6019a99940..d77ba30130 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.h +++ b/arangod/RocksDBEngine/RocksDBEngine.h @@ -123,14 +123,14 @@ class RocksDBEngine final : public StorageEngine { virtual TRI_vocbase_t* openDatabase( arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override; - Database* createDatabase(TRI_voc_tick_t id, + TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override; int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) override; void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) override; - Result dropDatabase(Database* database) override; + Result dropDatabase(TRI_vocbase_t* database) override; void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override; // wal in recovery diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 784281861e..5b640f9f0f 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -33,6 +33,7 @@ #include "Transaction/Helpers.h" #include "Transaction/StandaloneContext.h" #include "Transaction/UserTransaction.h" +#include "Utils/DatabaseGuard.h" #include "VocBase/replication-common.h" #include "VocBase/ticks.h" @@ -55,6 +56,7 @@ RocksDBReplicationContext::RocksDBReplicationContext() _mdr(), _customTypeHandler(), _vpackOptions(Options::Defaults), + _lastChunkOffset(0), _expires(TRI_microtime() + DefaultTTL), _isDeleted(false), _isUsed(true), @@ -390,10 +392,13 @@ void RocksDBReplicationContext::releaseDumpingResources() { _iter.reset(); } _collection = nullptr; + _guard.reset(); } std::unique_ptr RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) { + _guard.reset(new DatabaseGuard(vocbase)); + double lockTimeout = transaction::Methods::DefaultLockTimeout; std::shared_ptr ctx = transaction::StandaloneContext::Create(vocbase); @@ -401,6 +406,7 @@ RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) { ctx, {}, {}, {}, lockTimeout, false, true)); Result res = trx->begin(); if (!res.ok()) { + _guard.reset(); THROW_ARANGO_EXCEPTION(res); } _customTypeHandler = ctx->orderCustomTypeHandler(); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index 95135f37f5..e8a521188b 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -36,6 +36,7 @@ #include namespace arangodb { +class DatabaseGuard; class RocksDBReplicationContext { public: @@ -53,6 +54,13 @@ class RocksDBReplicationContext { TRI_voc_tick_t id() const; uint64_t lastTick() const; uint64_t count() const; + + TRI_vocbase_t* vocbase() const { + if (_trx == nullptr) { + return nullptr; + } + return _trx->vocbase(); + } // creates new transaction/snapshot void bind(TRI_vocbase_t*); @@ -113,7 +121,8 @@ class RocksDBReplicationContext { ManagedDocumentResult _mdr; std::shared_ptr _customTypeHandler; arangodb::velocypack::Options _vpackOptions; - uint64_t _lastChunkOffset = 0; + uint64_t _lastChunkOffset; + std::unique_ptr _guard; double _expires; bool _isDeleted; diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 49dca5036c..881e6671e7 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -238,6 +238,40 @@ bool RocksDBReplicationManager::containsUsedContext() { return false; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop contexts by database (at least mark them as deleted) +//////////////////////////////////////////////////////////////////////////////// + +void RocksDBReplicationManager::drop(TRI_vocbase_t* vocbase) { + { + MUTEX_LOCKER(mutexLocker, _lock); + + for (auto& context : _contexts) { + if (context.second->vocbase() == vocbase) { + context.second->deleted(); + } + } + } + + garbageCollect(true); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief drop all contexts (at least mark them as deleted) +//////////////////////////////////////////////////////////////////////////////// + +void RocksDBReplicationManager::dropAll() { + { + MUTEX_LOCKER(mutexLocker, _lock); + + for (auto& context : _contexts) { + context.second->deleted(); + } + } + + garbageCollect(true); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief run a garbage collection on the contexts //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.h b/arangod/RocksDBEngine/RocksDBReplicationManager.h index 3d48e7de97..0019066b01 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.h +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.h @@ -92,6 +92,18 @@ class RocksDBReplicationManager { bool containsUsedContext(); + ////////////////////////////////////////////////////////////////////////////// + /// @brief drop contexts by database (at least mark them as deleted) + ////////////////////////////////////////////////////////////////////////////// + + void drop(TRI_vocbase_t*); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief drop all contexts (at least mark them as deleted) + ////////////////////////////////////////////////////////////////////////////// + + void dropAll(); + ////////////////////////////////////////////////////////////////////////////// /// @brief run a garbage collection on the contexts ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index dba3ede5fd..7cd8f528bd 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -30,6 +30,7 @@ #include "Basics/conversions.h" #include "Basics/files.h" #include "Cluster/ClusterComm.h" +#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "GeneralServer/GeneralServer.h" @@ -1819,8 +1820,9 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator( VPackSlice const merged = mergedBuilder.slice(); try { + bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature("Cluster")->createWaitsForSyncReplication(); auto col = ClusterMethods::createCollectionOnCoordinator(collectionType, - _vocbase, merged); + _vocbase, merged, true, createWaitsForSyncReplication); TRI_ASSERT(col != nullptr); } catch (basics::Exception const& e) { // Error, report it. diff --git a/arangod/Statistics/ConnectionStatistics.h b/arangod/Statistics/ConnectionStatistics.h index c5128694b6..c218792699 100644 --- a/arangod/Statistics/ConnectionStatistics.h +++ b/arangod/Statistics/ConnectionStatistics.h @@ -65,7 +65,7 @@ class ConnectionStatistics { _error = false; } - static size_t const QUEUE_SIZE = 5000; + static size_t const QUEUE_SIZE = 64 * 1024 - 2; // current (1.62) boost maximum static Mutex _dataLock; diff --git a/arangod/Statistics/RequestStatistics.h b/arangod/Statistics/RequestStatistics.h index 5b4d4f263d..daac2c602e 100644 --- a/arangod/Statistics/RequestStatistics.h +++ b/arangod/Statistics/RequestStatistics.h @@ -152,7 +152,7 @@ class RequestStatistics { void trace_log(); private: - static size_t const QUEUE_SIZE = 1000; + static size_t const QUEUE_SIZE = 64 * 1024 - 2; // current (1.62) boost maximum static arangodb::Mutex _dataLock; diff --git a/arangod/StorageEngine/StorageEngine.h b/arangod/StorageEngine/StorageEngine.h index 495cbcf90e..e35737bff1 100644 --- a/arangod/StorageEngine/StorageEngine.h +++ b/arangod/StorageEngine/StorageEngine.h @@ -146,7 +146,6 @@ class StorageEngine : public application_features::ApplicationFeature { // TODO add pre / post conditions for functions - using Database = TRI_vocbase_t; using CollectionView = LogicalCollection; virtual void waitForSync(TRI_voc_tick_t tick) = 0; @@ -154,10 +153,10 @@ class StorageEngine : public application_features::ApplicationFeature { //// operations on databasea /// @brief opens a database - virtual Database* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) = 0; - Database* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade){ + virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) = 0; + TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade){ int status; - Database* rv = openDatabase(args, isUpgrade, status); + TRI_vocbase_t* rv = openDatabase(args, isUpgrade, status); TRI_ASSERT(status == TRI_ERROR_NO_ERROR); TRI_ASSERT(rv != nullptr); return rv; @@ -172,16 +171,16 @@ class StorageEngine : public application_features::ApplicationFeature { // the WAL entry for the database creation will be written *after* the call // to "createDatabase" returns // no way to acquire id within this function?! - virtual Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) = 0; - Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args ){ + virtual TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) = 0; + TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args ){ int status; - Database* rv = createDatabase(id, args, status); + TRI_vocbase_t* rv = createDatabase(id, args, status); TRI_ASSERT(status == TRI_ERROR_NO_ERROR); TRI_ASSERT(rv != nullptr); return rv; } - // @brief wirte create marker for database + // @brief write create marker for database virtual int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) = 0; // asks the storage engine to drop the specified database and persist the @@ -194,14 +193,14 @@ class StorageEngine : public application_features::ApplicationFeature { // // is done under a lock in database feature virtual void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) = 0; - void prepareDropDatabase(Database* db, bool useWriteMarker){ + void prepareDropDatabase(TRI_vocbase_t* db, bool useWriteMarker){ int status = 0; prepareDropDatabase(db, useWriteMarker, status); TRI_ASSERT(status == TRI_ERROR_NO_ERROR); }; // perform a physical deletion of the database - virtual Result dropDatabase(Database*) = 0; + virtual Result dropDatabase(TRI_vocbase_t*) = 0; /// @brief wait until a database directory disappears - not under lock in databaseFreature virtual void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) = 0; diff --git a/arangod/Utils/DatabaseGuard.h b/arangod/Utils/DatabaseGuard.h index 44d51c9a0b..3e585dfa54 100644 --- a/arangod/Utils/DatabaseGuard.h +++ b/arangod/Utils/DatabaseGuard.h @@ -40,7 +40,9 @@ class DatabaseGuard { explicit DatabaseGuard(TRI_vocbase_t* vocbase) : _vocbase(vocbase) { TRI_ASSERT(vocbase != nullptr); - _vocbase->use(); + if (!_vocbase->use()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + } } /// @brief create the guard, using a database id diff --git a/arangod/V8Server/V8DealerFeature.cpp b/arangod/V8Server/V8DealerFeature.cpp index 003f973956..08a3a266fa 100644 --- a/arangod/V8Server/V8DealerFeature.cpp +++ b/arangod/V8Server/V8DealerFeature.cpp @@ -566,7 +566,7 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase, TimedAction exitWhenNoContext([](double waitTime) { LOG_TOPIC(WARN, arangodb::Logger::V8) << "giving up waiting for unused V8 context after " << Logger::FIXED(waitTime) << " s"; - }, 120); + }, 60); V8Context* context = nullptr; diff --git a/arangod/V8Server/v8-vocindex.cpp b/arangod/V8Server/v8-vocindex.cpp index 3c15367cb3..08bbc8398b 100644 --- a/arangod/V8Server/v8-vocindex.cpp +++ b/arangod/V8Server/v8-vocindex.cpp @@ -27,6 +27,7 @@ #include "Basics/VelocyPackHelper.h" #include "Basics/conversions.h" #include "Basics/tri-strings.h" +#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterMethods.h" #include "Indexes/Index.h" @@ -669,12 +670,8 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - // ........................................................................... - // We require exactly 1 or exactly 2 arguments -- anything else is an error - // ........................................................................... - - if (args.Length() < 1 || args.Length() > 3) { - TRI_V8_THROW_EXCEPTION_USAGE("_create(, , )"); + if (args.Length() < 1 || args.Length() > 4) { + TRI_V8_THROW_EXCEPTION_USAGE("_create(, , , )"); } if (TRI_GetOperationModeServer() == TRI_VOCBASE_MODE_NO_CREATE) { @@ -682,7 +679,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, } // optional, third parameter can override collection type - if (args.Length() == 3 && args[2]->IsString()) { + if (args.Length() >= 3 && args[2]->IsString()) { std::string typeString = TRI_ObjectToString(args[2]); if (typeString == "edge") { collectionType = TRI_COL_TYPE_EDGE; @@ -691,6 +688,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, } } + PREVENT_EMBEDDED_TRANSACTION(); // extract the name @@ -725,9 +723,19 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, infoSlice = builder.slice(); if (ServerState::instance()->isCoordinator()) { + bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature("Cluster")->createWaitsForSyncReplication(); + + if (args.Length() >= 3 && args[args.Length()-1]->IsObject()) { + v8::Handle obj = args[args.Length()-1]->ToObject(); + auto v8WaitForSyncReplication = obj->Get(TRI_V8_ASCII_STRING("waitForSyncReplication")); + if (!v8WaitForSyncReplication->IsUndefined()) { + createWaitsForSyncReplication = TRI_ObjectToBoolean(v8WaitForSyncReplication); + } + } + std::unique_ptr col = ClusterMethods::createCollectionOnCoordinator(collectionType, vocbase, - infoSlice); + infoSlice, true, createWaitsForSyncReplication); TRI_V8_RETURN(WrapCollection(isolate, col.release())); } diff --git a/js/actions/_api/collection/app.js b/js/actions/_api/collection/app.js index a7126647e1..24e408aa7e 100644 --- a/js/actions/_api/collection/app.js +++ b/js/actions/_api/collection/app.js @@ -205,6 +205,15 @@ function post_api_collection (req, res) { } try { + var options = {}; + if (req.parameters.hasOwnProperty('waitForSyncReplication')) { + var value = req.parameters.waitForSyncReplication.toLowerCase(); + if (value === 'true' || value === 'yes' || value === 'on' || value === 'y' || value === '1') { + options.waitForSyncReplication = true; + } else { + options.waitForSyncReplication = false; + } + } var collection; if (typeof (r.type) === 'string') { if (r.type.toLowerCase() === 'edge' || r.type === '3') { @@ -212,9 +221,9 @@ function post_api_collection (req, res) { } } if (r.type === arangodb.ArangoCollection.TYPE_EDGE) { - collection = arangodb.db._createEdgeCollection(r.name, r.parameters); + collection = arangodb.db._createEdgeCollection(r.name, r.parameters, options); } else { - collection = arangodb.db._createDocumentCollection(r.name, r.parameters); + collection = arangodb.db._createDocumentCollection(r.name, r.parameters, options); } var result = {}; diff --git a/js/client/modules/@arangodb/arango-database.js b/js/client/modules/@arangodb/arango-database.js index 38396e98cf..d8d9e4f77a 100644 --- a/js/client/modules/@arangodb/arango-database.js +++ b/js/client/modules/@arangodb/arango-database.js @@ -339,7 +339,7 @@ ArangoDatabase.prototype._collection = function (id) { // / @brief creates a new collection // ////////////////////////////////////////////////////////////////////////////// -ArangoDatabase.prototype._create = function (name, properties, type) { +ArangoDatabase.prototype._create = function (name, properties, type, options) { var body = { 'name': name, 'type': ArangoCollection.TYPE_DOCUMENT @@ -355,12 +355,23 @@ ArangoDatabase.prototype._create = function (name, properties, type) { } }); } + + let urlAddon = ''; + if (typeof options === "object" && options !== null) { + if (options.hasOwnProperty('waitForSyncReplication')) { + if (options.waitForSyncReplication) { + urlAddon = '?waitForSyncReplication=1'; + } else { + urlAddon = '?waitForSyncReplication=0'; + } + } + } if (type !== undefined) { body.type = type; } - var requestResult = this._connection.POST(this._collectionurl(), + var requestResult = this._connection.POST(this._collectionurl() + urlAddon, JSON.stringify(body)); arangosh.checkRequestResult(requestResult);