mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of ssh://github.com/ArangoDB/ArangoDB into devel
This commit is contained in:
commit
ed13ee7973
|
@ -113,7 +113,7 @@ devel
|
|||
when unused.
|
||||
|
||||
Waiting for an unused V8 context will now also abort if no V8 context can be
|
||||
acquired/created after 120 seconds.
|
||||
acquired/created after 60 seconds.
|
||||
|
||||
* improved diagnostic messages written to logfiles by supervisor process
|
||||
|
||||
|
|
|
@ -254,6 +254,7 @@ while [ $# -gt 0 ]; do
|
|||
MAKE="cmake --build . --config ${BUILD_CONFIG}"
|
||||
PACKAGE_MAKE="cmake --build . --config ${BUILD_CONFIG} --target"
|
||||
CONFIGURE_OPTIONS="${CONFIGURE_OPTIONS} -DV8_TARGET_ARCHS=Release"
|
||||
export _IsNativeEnvironment=true
|
||||
;;
|
||||
|
||||
--symsrv)
|
||||
|
|
|
@ -32,7 +32,6 @@ namespace arangodb {
|
|||
class ManagedDocumentResult;
|
||||
|
||||
namespace graph {
|
||||
class ConstantWeightShortestPathFinder;
|
||||
class ShortestPathFinder;
|
||||
class ShortestPathResult;
|
||||
}
|
||||
|
@ -49,9 +48,6 @@ class ShortestPathBlock : public ExecutionBlock {
|
|||
friend struct EdgeWeightExpanderLocal;
|
||||
friend struct EdgeWeightExpanderCluster;
|
||||
|
||||
// TODO ONLY TEMPORARY
|
||||
friend class graph::ConstantWeightShortestPathFinder;
|
||||
|
||||
public:
|
||||
ShortestPathBlock(ExecutionEngine* engine, ShortestPathNode const* ep);
|
||||
|
||||
|
|
|
@ -133,6 +133,10 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
options->addOption("--cluster.system-replication-factor",
|
||||
"replication factor for system collections",
|
||||
new UInt32Parameter(&_systemReplicationFactor));
|
||||
|
||||
options->addOption("--cluster.create-waits-for-sync-replication",
|
||||
"active coordinator will wait for all replicas to create collection",
|
||||
new BooleanParameter(&_createWaitsForSyncReplication));
|
||||
}
|
||||
|
||||
void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
||||
|
|
|
@ -62,6 +62,7 @@ class ClusterFeature : public application_features::ApplicationFeature {
|
|||
std::string _dbserverConfig;
|
||||
std::string _coordinatorConfig;
|
||||
uint32_t _systemReplicationFactor = 2;
|
||||
bool _createWaitsForSyncReplication = false;
|
||||
|
||||
private:
|
||||
void reportRole(ServerState::RoleEnum);
|
||||
|
@ -76,6 +77,7 @@ class ClusterFeature : public application_features::ApplicationFeature {
|
|||
};
|
||||
|
||||
void setUnregisterOnShutdown(bool);
|
||||
bool createWaitsForSyncReplication() { return _createWaitsForSyncReplication; };
|
||||
|
||||
void stop() override final;
|
||||
|
||||
|
|
|
@ -1042,6 +1042,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
std::string const& collectionID,
|
||||
uint64_t numberOfShards,
|
||||
uint64_t replicationFactor,
|
||||
bool waitForReplication,
|
||||
VPackSlice const& json,
|
||||
std::string& errorMsg,
|
||||
double timeout) {
|
||||
|
@ -1100,19 +1101,10 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
[=](VPackSlice const& result) {
|
||||
if (result.isObject() && result.length() == (size_t)numberOfShards) {
|
||||
std::string tmpMsg = "";
|
||||
bool tmpHaveError = false;
|
||||
|
||||
for (auto const& p : VPackObjectIterator(result)) {
|
||||
if (replicationFactor == 0) {
|
||||
VPackSlice servers = p.value.get("servers");
|
||||
if (!servers.isArray() || servers.length() < dbServers.size()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (arangodb::basics::VelocyPackHelper::getBooleanValue(
|
||||
p.value, "error", false)) {
|
||||
tmpHaveError = true;
|
||||
tmpMsg += " shardID:" + p.key.copyString() + ":";
|
||||
tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue(
|
||||
p.value, "errorMessage", "");
|
||||
|
@ -1125,13 +1117,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
tmpMsg += ")";
|
||||
}
|
||||
}
|
||||
*errMsg = "Error in creation of collection:" + tmpMsg + " "
|
||||
+ __FILE__ + std::to_string(__LINE__);
|
||||
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||
return true;
|
||||
}
|
||||
|
||||
// wait that all followers have created our new collection
|
||||
if (waitForReplication) {
|
||||
uint64_t mutableReplicationFactor = replicationFactor;
|
||||
if (mutableReplicationFactor == 0) {
|
||||
mutableReplicationFactor = dbServers.size();
|
||||
}
|
||||
|
||||
VPackSlice servers = p.value.get("servers");
|
||||
if (!servers.isArray() || servers.length() < mutableReplicationFactor) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tmpHaveError) {
|
||||
*errMsg = "Error in creation of collection:" + tmpMsg + " "
|
||||
+ __FILE__ + std::to_string(__LINE__);
|
||||
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||
return true;
|
||||
}
|
||||
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
|
||||
}
|
||||
|
|
|
@ -349,6 +349,7 @@ class ClusterInfo {
|
|||
std::string const& collectionID,
|
||||
uint64_t numberOfShards,
|
||||
uint64_t replicationFactor,
|
||||
bool waitForReplication,
|
||||
arangodb::velocypack::Slice const& json,
|
||||
std::string& errorMsg, double timeout);
|
||||
|
||||
|
|
|
@ -2264,12 +2264,13 @@ std::unique_ptr<LogicalCollection>
|
|||
ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
|
||||
TRI_vocbase_t* vocbase,
|
||||
VPackSlice parameters,
|
||||
bool ignoreDistributeShardsLikeErrors) {
|
||||
bool ignoreDistributeShardsLikeErrors,
|
||||
bool waitForSyncReplication) {
|
||||
auto col = std::make_unique<LogicalCollection>(vocbase, parameters);
|
||||
// Collection is a temporary collection object that undergoes sanity checks etc.
|
||||
// It is not used anywhere and will be cleaned up after this call.
|
||||
// Persist collection will return the real object.
|
||||
return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors);
|
||||
return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors, waitForSyncReplication);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -2279,7 +2280,7 @@ ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
|
|||
|
||||
std::unique_ptr<LogicalCollection>
|
||||
ClusterMethods::persistCollectionInAgency(
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors) {
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication) {
|
||||
std::string distributeShardsLike = col->distributeShardsLike();
|
||||
std::vector<std::string> dbServers;
|
||||
std::vector<std::string> avoid = col->avoidServers();
|
||||
|
@ -2364,7 +2365,7 @@ ClusterMethods::persistCollectionInAgency(
|
|||
std::string errorMsg;
|
||||
int myerrno = ci->createCollectionCoordinator(
|
||||
col->dbName(), col->cid_as_string(),
|
||||
col->numberOfShards(), col->replicationFactor(), velocy.slice(), errorMsg, 240.0);
|
||||
col->numberOfShards(), col->replicationFactor(), waitForSyncReplication, velocy.slice(), errorMsg, 240.0);
|
||||
|
||||
if (myerrno != TRI_ERROR_NO_ERROR) {
|
||||
if (errorMsg.empty()) {
|
||||
|
|
|
@ -258,7 +258,8 @@ class ClusterMethods {
|
|||
static std::unique_ptr<LogicalCollection> createCollectionOnCoordinator(
|
||||
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase,
|
||||
arangodb::velocypack::Slice parameters,
|
||||
bool ignoreDistributeShardsLikeErrors = false);
|
||||
bool ignoreDistributeShardsLikeErrors,
|
||||
bool waitForSyncReplication);
|
||||
|
||||
private:
|
||||
|
||||
|
@ -267,7 +268,7 @@ class ClusterMethods {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static std::unique_ptr<LogicalCollection> persistCollectionInAgency(
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = false);
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication);
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -152,7 +152,7 @@ MMFilesEngine::MMFilesEngine(application_features::ApplicationServer* server)
|
|||
MMFilesEngine::~MMFilesEngine() {}
|
||||
|
||||
// perform a physical deletion of the database
|
||||
Result MMFilesEngine::dropDatabase(Database* database) {
|
||||
Result MMFilesEngine::dropDatabase(TRI_vocbase_t* database) {
|
||||
// delete persistent indexes for this database
|
||||
MMFilesPersistentIndexFeature::dropDatabase(database->id());
|
||||
|
||||
|
|
|
@ -138,14 +138,14 @@ class MMFilesEngine final : public StorageEngine {
|
|||
void waitForSync(TRI_voc_tick_t tick) override;
|
||||
|
||||
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override;
|
||||
Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override {
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override {
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
return createDatabaseMMFiles(id, args);
|
||||
}
|
||||
int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) override;
|
||||
|
||||
void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) override;
|
||||
Result dropDatabase(Database* database) override;
|
||||
Result dropDatabase(TRI_vocbase_t* database) override;
|
||||
void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override;
|
||||
|
||||
// wal in recovery
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "Basics/conversions.h"
|
||||
#include "Basics/files.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "Cluster/ClusterFeature.h"
|
||||
#include "Cluster/ClusterMethods.h"
|
||||
#include "Cluster/FollowerInfo.h"
|
||||
#include "GeneralServer/GeneralServer.h"
|
||||
|
@ -1680,8 +1681,9 @@ int MMFilesRestReplicationHandler::processRestoreCollectionCoordinator(
|
|||
VPackSlice const merged = mergedBuilder.slice();
|
||||
|
||||
try {
|
||||
bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->createWaitsForSyncReplication();
|
||||
auto col = ClusterMethods::createCollectionOnCoordinator(
|
||||
collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors);
|
||||
collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors, createWaitsForSyncReplication);
|
||||
TRI_ASSERT(col != nullptr);
|
||||
} catch (basics::Exception const& e) {
|
||||
// Error, report it.
|
||||
|
|
|
@ -1072,8 +1072,6 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
|
|||
sendExtendBatch();
|
||||
sendExtendBarrier();
|
||||
|
||||
std::vector<size_t> toFetch;
|
||||
|
||||
TRI_voc_tick_t const chunkSize = 5000;
|
||||
std::string const baseUrl = BaseUrl + "/keys";
|
||||
|
||||
|
@ -1258,7 +1256,7 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
|
|||
std::function<void(VPackSlice, VPackSlice)> parseDoc = [&](VPackSlice doc,
|
||||
VPackSlice key) {
|
||||
|
||||
bool rangeUneqal = false;
|
||||
bool rangeUnequal = false;
|
||||
bool nextChunk = false;
|
||||
|
||||
int cmp1 = key.compareString(lowKey.data(), lowKey.length());
|
||||
|
@ -1274,7 +1272,7 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
|
|||
if (cmp1 == 0) {
|
||||
foundLowKey = true;
|
||||
} else if (!foundLowKey && cmp1 > 0) {
|
||||
rangeUneqal = true;
|
||||
rangeUnequal = true;
|
||||
nextChunk = true;
|
||||
}
|
||||
|
||||
|
@ -1286,28 +1284,28 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
|
|||
markers.emplace_back(key.copyString(), TRI_ExtractRevisionId(doc));
|
||||
|
||||
if (cmp2 == 0) { // found highKey
|
||||
rangeUneqal = std::to_string(localHash) != hashString;
|
||||
rangeUnequal = std::to_string(localHash) != hashString;
|
||||
nextChunk = true;
|
||||
}
|
||||
} else if (cmp2 == 0) {
|
||||
rangeUneqal = true;
|
||||
rangeUnequal = true;
|
||||
nextChunk = true;
|
||||
}
|
||||
} else if (cmp2 > 0) { // higher than highKey
|
||||
// current range was unequal and we did not find the
|
||||
// high key. Load range and skip to next
|
||||
rangeUneqal = true;
|
||||
rangeUnequal = true;
|
||||
nextChunk = true;
|
||||
}
|
||||
|
||||
if (rangeUneqal) {
|
||||
if (rangeUnequal) {
|
||||
int res = syncChunkRocksDB(&trx, keysId, currentChunkId, lowKey,
|
||||
highKey, markers, errorMsg);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
}
|
||||
TRI_ASSERT(!rangeUneqal || (rangeUneqal && nextChunk)); // A => B
|
||||
TRI_ASSERT(!rangeUnequal || nextChunk); // A => B
|
||||
if (nextChunk && currentChunkId + 1 < numChunks) {
|
||||
currentChunkId++; // we are out of range, see next chunk
|
||||
resetChunk();
|
||||
|
|
|
@ -230,7 +230,12 @@ void RocksDBEngine::start() {
|
|||
}
|
||||
}
|
||||
|
||||
void RocksDBEngine::stop() {}
|
||||
void RocksDBEngine::stop() {
|
||||
if (!isEnabled()) {
|
||||
return;
|
||||
}
|
||||
replicationManager()->dropAll();
|
||||
}
|
||||
|
||||
void RocksDBEngine::unprepare() {
|
||||
if (!isEnabled()) {
|
||||
|
@ -486,7 +491,7 @@ TRI_vocbase_t* RocksDBEngine::openDatabase(
|
|||
return openExistingDatabase(id, name, true, isUpgrade);
|
||||
}
|
||||
|
||||
RocksDBEngine::Database* RocksDBEngine::createDatabase(
|
||||
TRI_vocbase_t* RocksDBEngine::createDatabase(
|
||||
TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) {
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_NORMAL, id,
|
||||
|
@ -519,10 +524,6 @@ int RocksDBEngine::writeCreateCollectionMarker(TRI_voc_tick_t databaseId,
|
|||
|
||||
void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase,
|
||||
bool useWriteMarker, int& status) {
|
||||
// probably not required
|
||||
// THROW_ARANGO_NOT_YET_IMPLEMENTED();
|
||||
|
||||
// status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true);
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.add("id", VPackValue(std::to_string(vocbase->id())));
|
||||
|
@ -533,7 +534,8 @@ void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase,
|
|||
status = writeCreateDatabaseMarker(vocbase->id(), builder.slice());
|
||||
}
|
||||
|
||||
Result RocksDBEngine::dropDatabase(Database* database) {
|
||||
Result RocksDBEngine::dropDatabase(TRI_vocbase_t* database) {
|
||||
replicationManager()->drop(database);
|
||||
return dropDatabase(database->id());
|
||||
}
|
||||
|
||||
|
|
|
@ -123,14 +123,14 @@ class RocksDBEngine final : public StorageEngine {
|
|||
virtual TRI_vocbase_t* openDatabase(
|
||||
arangodb::velocypack::Slice const& parameters, bool isUpgrade,
|
||||
int&) override;
|
||||
Database* createDatabase(TRI_voc_tick_t id,
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id,
|
||||
arangodb::velocypack::Slice const& args,
|
||||
int& status) override;
|
||||
int writeCreateDatabaseMarker(TRI_voc_tick_t id,
|
||||
VPackSlice const& slice) override;
|
||||
void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker,
|
||||
int& status) override;
|
||||
Result dropDatabase(Database* database) override;
|
||||
Result dropDatabase(TRI_vocbase_t* database) override;
|
||||
void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override;
|
||||
|
||||
// wal in recovery
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "Transaction/Helpers.h"
|
||||
#include "Transaction/StandaloneContext.h"
|
||||
#include "Transaction/UserTransaction.h"
|
||||
#include "Utils/DatabaseGuard.h"
|
||||
#include "VocBase/replication-common.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
|
@ -55,6 +56,7 @@ RocksDBReplicationContext::RocksDBReplicationContext()
|
|||
_mdr(),
|
||||
_customTypeHandler(),
|
||||
_vpackOptions(Options::Defaults),
|
||||
_lastChunkOffset(0),
|
||||
_expires(TRI_microtime() + DefaultTTL),
|
||||
_isDeleted(false),
|
||||
_isUsed(true),
|
||||
|
@ -390,10 +392,13 @@ void RocksDBReplicationContext::releaseDumpingResources() {
|
|||
_iter.reset();
|
||||
}
|
||||
_collection = nullptr;
|
||||
_guard.reset();
|
||||
}
|
||||
|
||||
std::unique_ptr<transaction::Methods>
|
||||
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
|
||||
_guard.reset(new DatabaseGuard(vocbase));
|
||||
|
||||
double lockTimeout = transaction::Methods::DefaultLockTimeout;
|
||||
std::shared_ptr<transaction::StandaloneContext> ctx =
|
||||
transaction::StandaloneContext::Create(vocbase);
|
||||
|
@ -401,6 +406,7 @@ RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
|
|||
ctx, {}, {}, {}, lockTimeout, false, true));
|
||||
Result res = trx->begin();
|
||||
if (!res.ok()) {
|
||||
_guard.reset();
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
_customTypeHandler = ctx->orderCustomTypeHandler();
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include <velocypack/Slice.h>
|
||||
|
||||
namespace arangodb {
|
||||
class DatabaseGuard;
|
||||
|
||||
class RocksDBReplicationContext {
|
||||
public:
|
||||
|
@ -53,6 +54,13 @@ class RocksDBReplicationContext {
|
|||
TRI_voc_tick_t id() const;
|
||||
uint64_t lastTick() const;
|
||||
uint64_t count() const;
|
||||
|
||||
TRI_vocbase_t* vocbase() const {
|
||||
if (_trx == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
return _trx->vocbase();
|
||||
}
|
||||
|
||||
// creates new transaction/snapshot
|
||||
void bind(TRI_vocbase_t*);
|
||||
|
@ -113,7 +121,8 @@ class RocksDBReplicationContext {
|
|||
ManagedDocumentResult _mdr;
|
||||
std::shared_ptr<arangodb::velocypack::CustomTypeHandler> _customTypeHandler;
|
||||
arangodb::velocypack::Options _vpackOptions;
|
||||
uint64_t _lastChunkOffset = 0;
|
||||
uint64_t _lastChunkOffset;
|
||||
std::unique_ptr<DatabaseGuard> _guard;
|
||||
|
||||
double _expires;
|
||||
bool _isDeleted;
|
||||
|
|
|
@ -238,6 +238,40 @@ bool RocksDBReplicationManager::containsUsedContext() {
|
|||
return false;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop contexts by database (at least mark them as deleted)
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RocksDBReplicationManager::drop(TRI_vocbase_t* vocbase) {
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
|
||||
for (auto& context : _contexts) {
|
||||
if (context.second->vocbase() == vocbase) {
|
||||
context.second->deleted();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
garbageCollect(true);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop all contexts (at least mark them as deleted)
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RocksDBReplicationManager::dropAll() {
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
|
||||
for (auto& context : _contexts) {
|
||||
context.second->deleted();
|
||||
}
|
||||
}
|
||||
|
||||
garbageCollect(true);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief run a garbage collection on the contexts
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -92,6 +92,18 @@ class RocksDBReplicationManager {
|
|||
|
||||
bool containsUsedContext();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop contexts by database (at least mark them as deleted)
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void drop(TRI_vocbase_t*);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief drop all contexts (at least mark them as deleted)
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void dropAll();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief run a garbage collection on the contexts
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "Basics/conversions.h"
|
||||
#include "Basics/files.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "Cluster/ClusterFeature.h"
|
||||
#include "Cluster/ClusterMethods.h"
|
||||
#include "Cluster/FollowerInfo.h"
|
||||
#include "GeneralServer/GeneralServer.h"
|
||||
|
@ -1819,8 +1820,9 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator(
|
|||
VPackSlice const merged = mergedBuilder.slice();
|
||||
|
||||
try {
|
||||
bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->createWaitsForSyncReplication();
|
||||
auto col = ClusterMethods::createCollectionOnCoordinator(collectionType,
|
||||
_vocbase, merged);
|
||||
_vocbase, merged, true, createWaitsForSyncReplication);
|
||||
TRI_ASSERT(col != nullptr);
|
||||
} catch (basics::Exception const& e) {
|
||||
// Error, report it.
|
||||
|
|
|
@ -65,7 +65,7 @@ class ConnectionStatistics {
|
|||
_error = false;
|
||||
}
|
||||
|
||||
static size_t const QUEUE_SIZE = 5000;
|
||||
static size_t const QUEUE_SIZE = 64 * 1024 - 2; // current (1.62) boost maximum
|
||||
|
||||
static Mutex _dataLock;
|
||||
|
||||
|
|
|
@ -152,7 +152,7 @@ class RequestStatistics {
|
|||
void trace_log();
|
||||
|
||||
private:
|
||||
static size_t const QUEUE_SIZE = 1000;
|
||||
static size_t const QUEUE_SIZE = 64 * 1024 - 2; // current (1.62) boost maximum
|
||||
|
||||
static arangodb::Mutex _dataLock;
|
||||
|
||||
|
|
|
@ -146,7 +146,6 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
|
||||
// TODO add pre / post conditions for functions
|
||||
|
||||
using Database = TRI_vocbase_t;
|
||||
using CollectionView = LogicalCollection;
|
||||
|
||||
virtual void waitForSync(TRI_voc_tick_t tick) = 0;
|
||||
|
@ -154,10 +153,10 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
//// operations on databasea
|
||||
|
||||
/// @brief opens a database
|
||||
virtual Database* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) = 0;
|
||||
Database* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade){
|
||||
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) = 0;
|
||||
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade){
|
||||
int status;
|
||||
Database* rv = openDatabase(args, isUpgrade, status);
|
||||
TRI_vocbase_t* rv = openDatabase(args, isUpgrade, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
TRI_ASSERT(rv != nullptr);
|
||||
return rv;
|
||||
|
@ -172,16 +171,16 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
// no way to acquire id within this function?!
|
||||
virtual Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) = 0;
|
||||
Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args ){
|
||||
virtual TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) = 0;
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args ){
|
||||
int status;
|
||||
Database* rv = createDatabase(id, args, status);
|
||||
TRI_vocbase_t* rv = createDatabase(id, args, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
TRI_ASSERT(rv != nullptr);
|
||||
return rv;
|
||||
}
|
||||
|
||||
// @brief wirte create marker for database
|
||||
// @brief write create marker for database
|
||||
virtual int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) = 0;
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
|
@ -194,14 +193,14 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
//
|
||||
// is done under a lock in database feature
|
||||
virtual void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) = 0;
|
||||
void prepareDropDatabase(Database* db, bool useWriteMarker){
|
||||
void prepareDropDatabase(TRI_vocbase_t* db, bool useWriteMarker){
|
||||
int status = 0;
|
||||
prepareDropDatabase(db, useWriteMarker, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
};
|
||||
|
||||
// perform a physical deletion of the database
|
||||
virtual Result dropDatabase(Database*) = 0;
|
||||
virtual Result dropDatabase(TRI_vocbase_t*) = 0;
|
||||
|
||||
/// @brief wait until a database directory disappears - not under lock in databaseFreature
|
||||
virtual void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) = 0;
|
||||
|
|
|
@ -40,7 +40,9 @@ class DatabaseGuard {
|
|||
explicit DatabaseGuard(TRI_vocbase_t* vocbase)
|
||||
: _vocbase(vocbase) {
|
||||
TRI_ASSERT(vocbase != nullptr);
|
||||
_vocbase->use();
|
||||
if (!_vocbase->use()) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief create the guard, using a database id
|
||||
|
|
|
@ -566,7 +566,7 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase,
|
|||
|
||||
TimedAction exitWhenNoContext([](double waitTime) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::V8) << "giving up waiting for unused V8 context after " << Logger::FIXED(waitTime) << " s";
|
||||
}, 120);
|
||||
}, 60);
|
||||
|
||||
|
||||
V8Context* context = nullptr;
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Basics/conversions.h"
|
||||
#include "Basics/tri-strings.h"
|
||||
#include "Cluster/ClusterFeature.h"
|
||||
#include "Cluster/ClusterInfo.h"
|
||||
#include "Cluster/ClusterMethods.h"
|
||||
#include "Indexes/Index.h"
|
||||
|
@ -669,12 +670,8 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
|
|||
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||
}
|
||||
|
||||
// ...........................................................................
|
||||
// We require exactly 1 or exactly 2 arguments -- anything else is an error
|
||||
// ...........................................................................
|
||||
|
||||
if (args.Length() < 1 || args.Length() > 3) {
|
||||
TRI_V8_THROW_EXCEPTION_USAGE("_create(<name>, <properties>, <type>)");
|
||||
if (args.Length() < 1 || args.Length() > 4) {
|
||||
TRI_V8_THROW_EXCEPTION_USAGE("_create(<name>, <properties>, <type>, <options>)");
|
||||
}
|
||||
|
||||
if (TRI_GetOperationModeServer() == TRI_VOCBASE_MODE_NO_CREATE) {
|
||||
|
@ -682,7 +679,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
|
|||
}
|
||||
|
||||
// optional, third parameter can override collection type
|
||||
if (args.Length() == 3 && args[2]->IsString()) {
|
||||
if (args.Length() >= 3 && args[2]->IsString()) {
|
||||
std::string typeString = TRI_ObjectToString(args[2]);
|
||||
if (typeString == "edge") {
|
||||
collectionType = TRI_COL_TYPE_EDGE;
|
||||
|
@ -691,6 +688,7 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
PREVENT_EMBEDDED_TRANSACTION();
|
||||
|
||||
// extract the name
|
||||
|
@ -725,9 +723,19 @@ static void CreateVocBase(v8::FunctionCallbackInfo<v8::Value> const& args,
|
|||
infoSlice = builder.slice();
|
||||
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
bool createWaitsForSyncReplication = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->createWaitsForSyncReplication();
|
||||
|
||||
if (args.Length() >= 3 && args[args.Length()-1]->IsObject()) {
|
||||
v8::Handle<v8::Object> obj = args[args.Length()-1]->ToObject();
|
||||
auto v8WaitForSyncReplication = obj->Get(TRI_V8_ASCII_STRING("waitForSyncReplication"));
|
||||
if (!v8WaitForSyncReplication->IsUndefined()) {
|
||||
createWaitsForSyncReplication = TRI_ObjectToBoolean(v8WaitForSyncReplication);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<LogicalCollection> col =
|
||||
ClusterMethods::createCollectionOnCoordinator(collectionType, vocbase,
|
||||
infoSlice);
|
||||
infoSlice, true, createWaitsForSyncReplication);
|
||||
TRI_V8_RETURN(WrapCollection(isolate, col.release()));
|
||||
}
|
||||
|
||||
|
|
|
@ -205,6 +205,15 @@ function post_api_collection (req, res) {
|
|||
}
|
||||
|
||||
try {
|
||||
var options = {};
|
||||
if (req.parameters.hasOwnProperty('waitForSyncReplication')) {
|
||||
var value = req.parameters.waitForSyncReplication.toLowerCase();
|
||||
if (value === 'true' || value === 'yes' || value === 'on' || value === 'y' || value === '1') {
|
||||
options.waitForSyncReplication = true;
|
||||
} else {
|
||||
options.waitForSyncReplication = false;
|
||||
}
|
||||
}
|
||||
var collection;
|
||||
if (typeof (r.type) === 'string') {
|
||||
if (r.type.toLowerCase() === 'edge' || r.type === '3') {
|
||||
|
@ -212,9 +221,9 @@ function post_api_collection (req, res) {
|
|||
}
|
||||
}
|
||||
if (r.type === arangodb.ArangoCollection.TYPE_EDGE) {
|
||||
collection = arangodb.db._createEdgeCollection(r.name, r.parameters);
|
||||
collection = arangodb.db._createEdgeCollection(r.name, r.parameters, options);
|
||||
} else {
|
||||
collection = arangodb.db._createDocumentCollection(r.name, r.parameters);
|
||||
collection = arangodb.db._createDocumentCollection(r.name, r.parameters, options);
|
||||
}
|
||||
|
||||
var result = {};
|
||||
|
|
|
@ -339,7 +339,7 @@ ArangoDatabase.prototype._collection = function (id) {
|
|||
// / @brief creates a new collection
|
||||
// //////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ArangoDatabase.prototype._create = function (name, properties, type) {
|
||||
ArangoDatabase.prototype._create = function (name, properties, type, options) {
|
||||
var body = {
|
||||
'name': name,
|
||||
'type': ArangoCollection.TYPE_DOCUMENT
|
||||
|
@ -355,12 +355,23 @@ ArangoDatabase.prototype._create = function (name, properties, type) {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
let urlAddon = '';
|
||||
if (typeof options === "object" && options !== null) {
|
||||
if (options.hasOwnProperty('waitForSyncReplication')) {
|
||||
if (options.waitForSyncReplication) {
|
||||
urlAddon = '?waitForSyncReplication=1';
|
||||
} else {
|
||||
urlAddon = '?waitForSyncReplication=0';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (type !== undefined) {
|
||||
body.type = type;
|
||||
}
|
||||
|
||||
var requestResult = this._connection.POST(this._collectionurl(),
|
||||
var requestResult = this._connection.POST(this._collectionurl() + urlAddon,
|
||||
JSON.stringify(body));
|
||||
|
||||
arangosh.checkRequestResult(requestResult);
|
||||
|
|
Loading…
Reference in New Issue