1
0
Fork 0

Add waitForSyncReplication as a _create() option

This commit is contained in:
Andreas Streichardt 2017-04-26 09:53:57 +02:00
parent 1ac30babf5
commit dad5a1429e
11 changed files with 79 additions and 33 deletions

View File

@ -133,6 +133,10 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addOption("--cluster.system-replication-factor",
"replication factor for system collections",
new UInt32Parameter(&_systemReplicationFactor));
options->addOption("--cluster.create-waits-for-sync-replication",
"active coordinator will wait for all replicas to create collection",
new BooleanParameter(&_createWaitsForSyncReplication));
}
void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {

View File

@ -58,6 +58,7 @@ class ClusterFeature : public application_features::ApplicationFeature {
std::string _dbserverConfig;
std::string _coordinatorConfig;
uint32_t _systemReplicationFactor = 2;
bool _createWaitsForSyncReplication = true;
private:
void reportRole(ServerState::RoleEnum);
@ -72,6 +73,7 @@ class ClusterFeature : public application_features::ApplicationFeature {
};
void setUnregisterOnShutdown(bool);
bool createWaitsForSyncReplication() { return _createWaitsForSyncReplication; };
void stop() override final;

View File

@ -1042,6 +1042,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
std::string const& collectionID,
uint64_t numberOfShards,
uint64_t replicationFactor,
bool waitForReplication,
VPackSlice const& json,
std::string& errorMsg,
double timeout) {
@ -1103,13 +1104,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
bool tmpHaveError = false;
for (auto const& p : VPackObjectIterator(result)) {
if (replicationFactor == 0) {
VPackSlice servers = p.value.get("servers");
if (!servers.isArray() || servers.length() < dbServers.size()) {
return true;
}
}
if (arangodb::basics::VelocyPackHelper::getBooleanValue(
p.value, "error", false)) {
tmpHaveError = true;
@ -1125,13 +1119,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
tmpMsg += ")";
}
}
*errMsg = "Error in creation of collection:" + tmpMsg + " "
+ __FILE__ + std::to_string(__LINE__);
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
return true;
}
// wait that all followers have created our new collection
if (waitForReplication) {
uint64_t mutableReplicationFactor = replicationFactor;
if (mutableReplicationFactor == 0) {
mutableReplicationFactor = dbServers.size();
}
VPackSlice servers = p.value.get("servers");
if (!servers.isArray() || servers.length() < mutableReplicationFactor) {
return true;
}
}
}
if (tmpHaveError) {
*errMsg = "Error in creation of collection:" + tmpMsg + " "
+ __FILE__ + std::to_string(__LINE__);
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
return true;
}
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
}

View File

@ -349,6 +349,7 @@ class ClusterInfo {
std::string const& collectionID,
uint64_t numberOfShards,
uint64_t replicationFactor,
bool waitForReplication,
arangodb::velocypack::Slice const& json,
std::string& errorMsg, double timeout);

View File

@ -2264,12 +2264,13 @@ std::unique_ptr<LogicalCollection>
ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
TRI_vocbase_t* vocbase,
VPackSlice parameters,
bool ignoreDistributeShardsLikeErrors) {
bool ignoreDistributeShardsLikeErrors,
bool waitForSyncReplication) {
auto col = std::make_unique<LogicalCollection>(vocbase, parameters);
// Collection is a temporary collection object that undergoes sanity checks etc.
// It is not used anywhere and will be cleaned up after this call.
// Persist collection will return the real object.
return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors);
return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors, waitForSyncReplication);
}
#endif
@ -2279,7 +2280,7 @@ ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
std::unique_ptr<LogicalCollection>
ClusterMethods::persistCollectionInAgency(
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors) {
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication) {
std::string distributeShardsLike = col->distributeShardsLike();
std::vector<std::string> dbServers;
std::vector<std::string> avoid = col->avoidServers();
@ -2364,7 +2365,7 @@ ClusterMethods::persistCollectionInAgency(
std::string errorMsg;
int myerrno = ci->createCollectionCoordinator(
col->dbName(), col->cid_as_string(),
col->numberOfShards(), col->replicationFactor(), velocy.slice(), errorMsg, 240.0);
col->numberOfShards(), col->replicationFactor(), waitForSyncReplication, velocy.slice(), errorMsg, 240.0);
if (myerrno != TRI_ERROR_NO_ERROR) {
if (errorMsg.empty()) {

View File

@ -258,7 +258,8 @@ class ClusterMethods {
static std::unique_ptr<LogicalCollection> createCollectionOnCoordinator(
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase,
arangodb::velocypack::Slice parameters,
bool ignoreDistributeShardsLikeErrors = false);
bool ignoreDistributeShardsLikeErrors,
bool waitForSyncReplication);
private:
@ -267,7 +268,7 @@ class ClusterMethods {
////////////////////////////////////////////////////////////////////////////////
static std::unique_ptr<LogicalCollection> persistCollectionInAgency(
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = false);
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication);
};
} // namespace arangodb

View File

@ -29,6 +29,7 @@
#include "Basics/conversions.h"
#include "Basics/files.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/FollowerInfo.h"
#include "GeneralServer/GeneralServer.h"
@ -1680,8 +1681,9 @@ int MMFilesRestReplicationHandler::processRestoreCollectionCoordinator(
VPackSlice const merged = mergedBuilder.slice();
try {
bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->createWaitsForSyncReplication();
auto col = ClusterMethods::createCollectionOnCoordinator(
collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors);
collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors, createWaitsForSyncReplication);
TRI_ASSERT(col != nullptr);
} catch (basics::Exception const& e) {
// Error, report it.

View File

@ -30,6 +30,7 @@
#include "Basics/conversions.h"
#include "Basics/files.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/FollowerInfo.h"
#include "GeneralServer/GeneralServer.h"
@ -1819,8 +1820,9 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator(
VPackSlice const merged = mergedBuilder.slice();
try {
bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->createWaitsForSyncReplication();
auto col = ClusterMethods::createCollectionOnCoordinator(collectionType,
_vocbase, merged);
_vocbase, merged, true, createWaitsForSyncReplication);
TRI_ASSERT(col != nullptr);
} catch (basics::Exception const& e) {
// Error, report it.

View File

@ -27,6 +27,7 @@
#include "Basics/VelocyPackHelper.h"
#include "Basics/conversions.h"
#include "Basics/tri-strings.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#include "Indexes/Index.h"
@ -669,12 +670,8 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
// ...........................................................................
// We require exactly 1 or exactly 2 arguments -- anything else is an error
// ...........................................................................
if (args.Length() < 1 || args.Length() > 3) {
TRI_V8_THROW_EXCEPTION_USAGE("_create(<name>, <properties>, <type>)");
if (args.Length() < 1 || args.Length() > 4) {
TRI_V8_THROW_EXCEPTION_USAGE("_create(<name>, <properties>, <type>, <options>)");
}
if (TRI_GetOperationModeServer() == TRI_VOCBASE_MODE_NO_CREATE) {
@ -682,7 +679,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
}
// optional, third parameter can override collection type
if (args.Length() == 3 && args[2]->IsString()) {
if (args.Length() >= 3 && args[2]->IsString()) {
std::string typeString = TRI_ObjectToString(args[2]);
if (typeString == "edge") {
collectionType = TRI_COL_TYPE_EDGE;
@ -691,6 +688,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
}
}
PREVENT_EMBEDDED_TRANSACTION();
// extract the name
@ -725,9 +723,19 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
infoSlice = builder.slice();
if (ServerState::instance()->isCoordinator()) {
bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->createWaitsForSyncReplication();
if (args.Length() >= 3 && args[args.Length()-1]->IsObject()) {
v8::Handle<v8::Object> obj = args[args.Length()-1]->ToObject();
auto v8WaitForSyncReplication = obj->Get(TRI_V8_ASCII_STRING("waitForSyncReplication"));
if (!v8WaitForSyncReplication->IsUndefined()) {
createWaitsForSyncReplication = TRI_ObjectToBoolean(v8WaitForSyncReplication);
}
}
std::unique_ptr<LogicalCollection> col =
ClusterMethods::createCollectionOnCoordinator(collectionType, vocbase,
infoSlice);
infoSlice, true, createWaitsForSyncReplication);
TRI_V8_RETURN(WrapCollection(isolate, col.release()));
}

View File

@ -206,6 +206,15 @@ function post_api_collection (req, res) {
}
try {
var options = {};
if (req.parameters.hasOwnProperty('waitForSyncReplication')) {
var value = req.parameters.waitForSyncReplication.toLowerCase();
if (value === 'true' || value === 'yes' || value === 'on' || value === 'y' || value === '1') {
options.waitForSyncReplication = true;
} else {
options.waitForSyncReplication = false;
}
}
var collection;
if (typeof (r.type) === 'string') {
if (r.type.toLowerCase() === 'edge' || r.type === '3') {
@ -213,9 +222,9 @@ function post_api_collection (req, res) {
}
}
if (r.type === arangodb.ArangoCollection.TYPE_EDGE) {
collection = arangodb.db._createEdgeCollection(r.name, r.parameters);
collection = arangodb.db._createEdgeCollection(r.name, r.parameters, options);
} else {
collection = arangodb.db._createDocumentCollection(r.name, r.parameters);
collection = arangodb.db._createDocumentCollection(r.name, r.parameters, options);
}
var result = {};

View File

@ -339,7 +339,7 @@ ArangoDatabase.prototype._collection = function (id) {
// / @brief creates a new collection
// //////////////////////////////////////////////////////////////////////////////
ArangoDatabase.prototype._create = function (name, properties, type) {
ArangoDatabase.prototype._create = function (name, properties, type, options) {
var body = {
'name': name,
'type': ArangoCollection.TYPE_DOCUMENT
@ -355,12 +355,23 @@ ArangoDatabase.prototype._create = function (name, properties, type) {
}
});
}
let urlAddon = '';
if (typeof options === "object" && options !== null) {
if (options.hasOwnProperty('waitForSyncReplication')) {
if (options.waitForSyncReplication) {
urlAddon = '?waitForSyncReplication=1';
} else {
urlAddon = '?waitForSyncReplication=0';
}
}
}
if (type !== undefined) {
body.type = type;
}
var requestResult = this._connection.POST(this._collectionurl(),
var requestResult = this._connection.POST(this._collectionurl() + urlAddon,
JSON.stringify(body));
arangosh.checkRequestResult(requestResult);