From dad5a1429edbd1e1e43abecc543a0cfdc3e5d63b Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Wed, 26 Apr 2017 09:53:57 +0200 Subject: [PATCH] Add waitForSyncReplication as a _create() option --- arangod/Cluster/ClusterFeature.cpp | 4 +++ arangod/Cluster/ClusterFeature.h | 2 ++ arangod/Cluster/ClusterInfo.cpp | 31 +++++++++++-------- arangod/Cluster/ClusterInfo.h | 1 + arangod/Cluster/ClusterMethods.cpp | 9 +++--- arangod/Cluster/ClusterMethods.h | 5 +-- .../MMFiles/MMFilesRestReplicationHandler.cpp | 4 ++- .../RocksDBRestReplicationHandler.cpp | 4 ++- arangod/V8Server/v8-vocindex.cpp | 24 +++++++++----- js/actions/_api/collection/app.js | 13 ++++++-- .../modules/@arangodb/arango-database.js | 15 +++++++-- 11 files changed, 79 insertions(+), 33 deletions(-) diff --git a/arangod/Cluster/ClusterFeature.cpp b/arangod/Cluster/ClusterFeature.cpp index a8e46814fe..33773b7207 100644 --- a/arangod/Cluster/ClusterFeature.cpp +++ b/arangod/Cluster/ClusterFeature.cpp @@ -133,6 +133,10 @@ void ClusterFeature::collectOptions(std::shared_ptr options) { options->addOption("--cluster.system-replication-factor", "replication factor for system collections", new UInt32Parameter(&_systemReplicationFactor)); + + options->addOption("--cluster.create-waits-for-sync-replication", + "active coordinator will wait for all replicas to create collection", + new BooleanParameter(&_createWaitsForSyncReplication)); } void ClusterFeature::validateOptions(std::shared_ptr options) { diff --git a/arangod/Cluster/ClusterFeature.h b/arangod/Cluster/ClusterFeature.h index 7a50e0ddb2..3e2de79199 100644 --- a/arangod/Cluster/ClusterFeature.h +++ b/arangod/Cluster/ClusterFeature.h @@ -58,6 +58,7 @@ class ClusterFeature : public application_features::ApplicationFeature { std::string _dbserverConfig; std::string _coordinatorConfig; uint32_t _systemReplicationFactor = 2; + bool _createWaitsForSyncReplication = true; private: void reportRole(ServerState::RoleEnum); @@ -72,6 +73,7 @@ class ClusterFeature : public application_features::ApplicationFeature { }; void setUnregisterOnShutdown(bool); + bool createWaitsForSyncReplication() { return _createWaitsForSyncReplication; }; void stop() override final; diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index f2a12dae23..a2c2927346 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1042,6 +1042,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, std::string const& collectionID, uint64_t numberOfShards, uint64_t replicationFactor, + bool waitForReplication, VPackSlice const& json, std::string& errorMsg, double timeout) { @@ -1103,13 +1104,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, bool tmpHaveError = false; for (auto const& p : VPackObjectIterator(result)) { - if (replicationFactor == 0) { - VPackSlice servers = p.value.get("servers"); - if (!servers.isArray() || servers.length() < dbServers.size()) { - return true; - } - } - if (arangodb::basics::VelocyPackHelper::getBooleanValue( p.value, "error", false)) { tmpHaveError = true; @@ -1125,13 +1119,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, tmpMsg += ")"; } } + *errMsg = "Error in creation of collection:" + tmpMsg + " " + + __FILE__ + std::to_string(__LINE__); + *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; + return true; + } + + // wait that all followers have created our new collection + if (waitForReplication) { + uint64_t mutableReplicationFactor = replicationFactor; + if (mutableReplicationFactor == 0) { + mutableReplicationFactor = dbServers.size(); + } + + VPackSlice servers = p.value.get("servers"); + if (!servers.isArray() || servers.length() < mutableReplicationFactor) { + return true; + } } - } - if (tmpHaveError) { - *errMsg = "Error in creation of collection:" + tmpMsg + " " - + __FILE__ + std::to_string(__LINE__); - *dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; - return true; } *dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg); } diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 0f4fa1634b..c9e8e3f4d1 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -349,6 +349,7 @@ class ClusterInfo { std::string const& collectionID, uint64_t numberOfShards, uint64_t replicationFactor, + bool waitForReplication, arangodb::velocypack::Slice const& json, std::string& errorMsg, double timeout); diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index df21f5e745..61190d6f0d 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -2264,12 +2264,13 @@ std::unique_ptr ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters, - bool ignoreDistributeShardsLikeErrors) { + bool ignoreDistributeShardsLikeErrors, + bool waitForSyncReplication) { auto col = std::make_unique(vocbase, parameters); // Collection is a temporary collection object that undergoes sanity checks etc. // It is not used anywhere and will be cleaned up after this call. // Persist collection will return the real object. - return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors); + return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors, waitForSyncReplication); } #endif @@ -2279,7 +2280,7 @@ ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType, std::unique_ptr ClusterMethods::persistCollectionInAgency( - LogicalCollection* col, bool ignoreDistributeShardsLikeErrors) { + LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication) { std::string distributeShardsLike = col->distributeShardsLike(); std::vector dbServers; std::vector avoid = col->avoidServers(); @@ -2364,7 +2365,7 @@ ClusterMethods::persistCollectionInAgency( std::string errorMsg; int myerrno = ci->createCollectionCoordinator( col->dbName(), col->cid_as_string(), - col->numberOfShards(), col->replicationFactor(), velocy.slice(), errorMsg, 240.0); + col->numberOfShards(), col->replicationFactor(), waitForSyncReplication, velocy.slice(), errorMsg, 240.0); if (myerrno != TRI_ERROR_NO_ERROR) { if (errorMsg.empty()) { diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 57e8095f2a..e1929bdc4e 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -258,7 +258,8 @@ class ClusterMethods { static std::unique_ptr createCollectionOnCoordinator( TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, arangodb::velocypack::Slice parameters, - bool ignoreDistributeShardsLikeErrors = false); + bool ignoreDistributeShardsLikeErrors, + bool waitForSyncReplication); private: @@ -267,7 +268,7 @@ class ClusterMethods { //////////////////////////////////////////////////////////////////////////////// static std::unique_ptr persistCollectionInAgency( - LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = false); + LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication); }; } // namespace arangodb diff --git a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp index 028d0a275c..b01a938053 100644 --- a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp +++ b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp @@ -29,6 +29,7 @@ #include "Basics/conversions.h" #include "Basics/files.h" #include "Cluster/ClusterComm.h" +#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "GeneralServer/GeneralServer.h" @@ -1680,8 +1681,9 @@ int MMFilesRestReplicationHandler::processRestoreCollectionCoordinator( VPackSlice const merged = mergedBuilder.slice(); try { + bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature("Cluster")->createWaitsForSyncReplication(); auto col = ClusterMethods::createCollectionOnCoordinator( - collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors); + collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors, createWaitsForSyncReplication); TRI_ASSERT(col != nullptr); } catch (basics::Exception const& e) { // Error, report it. diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index dba3ede5fd..7cd8f528bd 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -30,6 +30,7 @@ #include "Basics/conversions.h" #include "Basics/files.h" #include "Cluster/ClusterComm.h" +#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "GeneralServer/GeneralServer.h" @@ -1819,8 +1820,9 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator( VPackSlice const merged = mergedBuilder.slice(); try { + bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature("Cluster")->createWaitsForSyncReplication(); auto col = ClusterMethods::createCollectionOnCoordinator(collectionType, - _vocbase, merged); + _vocbase, merged, true, createWaitsForSyncReplication); TRI_ASSERT(col != nullptr); } catch (basics::Exception const& e) { // Error, report it. diff --git a/arangod/V8Server/v8-vocindex.cpp b/arangod/V8Server/v8-vocindex.cpp index 3c15367cb3..08bbc8398b 100644 --- a/arangod/V8Server/v8-vocindex.cpp +++ b/arangod/V8Server/v8-vocindex.cpp @@ -27,6 +27,7 @@ #include "Basics/VelocyPackHelper.h" #include "Basics/conversions.h" #include "Basics/tri-strings.h" +#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterMethods.h" #include "Indexes/Index.h" @@ -669,12 +670,8 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - // ........................................................................... - // We require exactly 1 or exactly 2 arguments -- anything else is an error - // ........................................................................... - - if (args.Length() < 1 || args.Length() > 3) { - TRI_V8_THROW_EXCEPTION_USAGE("_create(, , )"); + if (args.Length() < 1 || args.Length() > 4) { + TRI_V8_THROW_EXCEPTION_USAGE("_create(, , , )"); } if (TRI_GetOperationModeServer() == TRI_VOCBASE_MODE_NO_CREATE) { @@ -682,7 +679,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, } // optional, third parameter can override collection type - if (args.Length() == 3 && args[2]->IsString()) { + if (args.Length() >= 3 && args[2]->IsString()) { std::string typeString = TRI_ObjectToString(args[2]); if (typeString == "edge") { collectionType = TRI_COL_TYPE_EDGE; @@ -691,6 +688,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, } } + PREVENT_EMBEDDED_TRANSACTION(); // extract the name @@ -725,9 +723,19 @@ static void CreateVocBase(v8::FunctionCallbackInfo const& args, infoSlice = builder.slice(); if (ServerState::instance()->isCoordinator()) { + bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature("Cluster")->createWaitsForSyncReplication(); + + if (args.Length() >= 3 && args[args.Length()-1]->IsObject()) { + v8::Handle obj = args[args.Length()-1]->ToObject(); + auto v8WaitForSyncReplication = obj->Get(TRI_V8_ASCII_STRING("waitForSyncReplication")); + if (!v8WaitForSyncReplication->IsUndefined()) { + createWaitsForSyncReplication = TRI_ObjectToBoolean(v8WaitForSyncReplication); + } + } + std::unique_ptr col = ClusterMethods::createCollectionOnCoordinator(collectionType, vocbase, - infoSlice); + infoSlice, true, createWaitsForSyncReplication); TRI_V8_RETURN(WrapCollection(isolate, col.release())); } diff --git a/js/actions/_api/collection/app.js b/js/actions/_api/collection/app.js index 78806d7f6a..3cf8f386ba 100644 --- a/js/actions/_api/collection/app.js +++ b/js/actions/_api/collection/app.js @@ -206,6 +206,15 @@ function post_api_collection (req, res) { } try { + var options = {}; + if (req.parameters.hasOwnProperty('waitForSyncReplication')) { + var value = req.parameters.waitForSyncReplication.toLowerCase(); + if (value === 'true' || value === 'yes' || value === 'on' || value === 'y' || value === '1') { + options.waitForSyncReplication = true; + } else { + options.waitForSyncReplication = false; + } + } var collection; if (typeof (r.type) === 'string') { if (r.type.toLowerCase() === 'edge' || r.type === '3') { @@ -213,9 +222,9 @@ function post_api_collection (req, res) { } } if (r.type === arangodb.ArangoCollection.TYPE_EDGE) { - collection = arangodb.db._createEdgeCollection(r.name, r.parameters); + collection = arangodb.db._createEdgeCollection(r.name, r.parameters, options); } else { - collection = arangodb.db._createDocumentCollection(r.name, r.parameters); + collection = arangodb.db._createDocumentCollection(r.name, r.parameters, options); } var result = {}; diff --git a/js/client/modules/@arangodb/arango-database.js b/js/client/modules/@arangodb/arango-database.js index 38396e98cf..d8d9e4f77a 100644 --- a/js/client/modules/@arangodb/arango-database.js +++ b/js/client/modules/@arangodb/arango-database.js @@ -339,7 +339,7 @@ ArangoDatabase.prototype._collection = function (id) { // / @brief creates a new collection // ////////////////////////////////////////////////////////////////////////////// -ArangoDatabase.prototype._create = function (name, properties, type) { +ArangoDatabase.prototype._create = function (name, properties, type, options) { var body = { 'name': name, 'type': ArangoCollection.TYPE_DOCUMENT @@ -355,12 +355,23 @@ ArangoDatabase.prototype._create = function (name, properties, type) { } }); } + + let urlAddon = ''; + if (typeof options === "object" && options !== null) { + if (options.hasOwnProperty('waitForSyncReplication')) { + if (options.waitForSyncReplication) { + urlAddon = '?waitForSyncReplication=1'; + } else { + urlAddon = '?waitForSyncReplication=0'; + } + } + } if (type !== undefined) { body.type = type; } - var requestResult = this._connection.POST(this._collectionurl(), + var requestResult = this._connection.POST(this._collectionurl() + urlAddon, JSON.stringify(body)); arangosh.checkRequestResult(requestResult);