diff --git a/arangod/Aql/QueryCursor.cpp b/arangod/Aql/QueryCursor.cpp index d5db2b049d..7ae257355e 100644 --- a/arangod/Aql/QueryCursor.cpp +++ b/arangod/Aql/QueryCursor.cpp @@ -45,10 +45,14 @@ using namespace arangodb; using namespace arangodb::aql; -QueryResultCursor::QueryResultCursor(TRI_vocbase_t* vocbase, CursorId id, - aql::QueryResult&& result, - size_t batchSize, double ttl, - bool hasCount) +QueryResultCursor::QueryResultCursor( + TRI_vocbase_t& vocbase, + CursorId id, + aql::QueryResult&& result, + size_t batchSize, + double ttl, + bool hasCount +) : Cursor(id, batchSize, ttl, hasCount), _guard(vocbase), _result(std::move(result)), @@ -140,23 +144,33 @@ Result QueryResultCursor::dump(VPackBuilder& builder) { return TRI_ERROR_NO_ERROR; } -QueryStreamCursor::QueryStreamCursor(TRI_vocbase_t* vocbase, CursorId id, - std::string const& query, - std::shared_ptr bindVars, - std::shared_ptr opts, - size_t batchSize, double ttl) +QueryStreamCursor::QueryStreamCursor( + TRI_vocbase_t& vocbase, + CursorId id, + std::string const& query, + std::shared_ptr bindVars, + std::shared_ptr opts, + size_t batchSize, + double ttl +) : Cursor(id, batchSize, ttl, /*hasCount*/ false), _guard(vocbase), _queryString(query) { TRI_ASSERT(QueryRegistryFeature::QUERY_REGISTRY != nullptr); auto prevLockHeaders = CollectionLockState::_noLockHeaders; TRI_DEFER(CollectionLockState::_noLockHeaders = prevLockHeaders); - - _query = std::make_unique(false, _guard.database(), - aql::QueryString(_queryString.c_str(), _queryString.length()), - std::move(bindVars), std::move(opts), arangodb::aql::PART_MAIN); + + _query = std::make_unique( + false, + &(_guard.database()), + aql::QueryString(_queryString.c_str(), _queryString.length()), + std::move(bindVars), + std::move(opts), + arangodb::aql::PART_MAIN + ); _query->prepare(QueryRegistryFeature::QUERY_REGISTRY, aql::Query::DontCache); TRI_ASSERT(_query->state() == aql::QueryExecutionState::ValueType::EXECUTION); + // If we have set _noLockHeaders, we need to unset it: if (CollectionLockState::_noLockHeaders != nullptr && CollectionLockState::_noLockHeaders == _query->engine()->lockedShards()) { @@ -184,8 +198,9 @@ Result QueryStreamCursor::dump(VPackBuilder& builder) { TRI_DEFER(CollectionLockState::_noLockHeaders = prevLockHeaders); // we do have a query string... pass query to WorkMonitor - AqlWorkStack work(_guard.database(), _query->id(), _queryString.data(), - _queryString.size()); + AqlWorkStack work( + &(_guard.database()), _query->id(), _queryString.data(), _queryString.size() + ); LOG_TOPIC(TRACE, Logger::QUERIES) << "executing query " << _id << ": '" << _queryString.substr(1024) << "'"; diff --git a/arangod/Aql/QueryCursor.h b/arangod/Aql/QueryCursor.h index 1e29f922e7..23b23c36b7 100644 --- a/arangod/Aql/QueryCursor.h +++ b/arangod/Aql/QueryCursor.h @@ -39,12 +39,17 @@ class Query; /// Should be used in conjunction with the RestCursorHandler class QueryResultCursor final : public arangodb::Cursor { public: - QueryResultCursor(TRI_vocbase_t*, CursorId, aql::QueryResult&&, size_t, - double ttl, bool hasCount); + QueryResultCursor( + TRI_vocbase_t& vocbase, + CursorId id, + aql::QueryResult&& result, + size_t batchSize, + double ttl, + bool hasCount + ); ~QueryResultCursor() = default; - public: aql::QueryResult const* result() const { return &_result; } CursorType type() const override final { return CURSOR_VPACK; } @@ -79,14 +84,18 @@ class QueryResultCursor final : public arangodb::Cursor { /// cursor is deleted (or query exhausted) class QueryStreamCursor final : public arangodb::Cursor { public: - QueryStreamCursor(TRI_vocbase_t*, CursorId, std::string const& query, - std::shared_ptr bindVars, - std::shared_ptr opts, size_t, - double ttl); + QueryStreamCursor( + TRI_vocbase_t& vocbase, + CursorId id, + std::string const& query, + std::shared_ptr bindVars, + std::shared_ptr opts, + size_t batchSize, + double ttl + ); ~QueryStreamCursor(); - public: CursorType type() const override final { return CURSOR_VPACK; } size_t count() const override final { return 0; } @@ -104,4 +113,4 @@ class QueryStreamCursor final : public arangodb::Cursor { } // aql } // arangodb -#endif +#endif \ No newline at end of file diff --git a/arangod/Cluster/DBServerAgencySync.cpp b/arangod/Cluster/DBServerAgencySync.cpp index 54d3fcb359..cc84fb96cf 100644 --- a/arangod/Cluster/DBServerAgencySync.cpp +++ b/arangod/Cluster/DBServerAgencySync.cpp @@ -58,10 +58,9 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "DBServerAgencySync::execute starting"; DatabaseFeature* database = ApplicationServer::getFeature("Database"); - TRI_vocbase_t* const vocbase = database->systemDatabase(); - DBServerAgencySyncResult result; + if (vocbase == nullptr) { LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "DBServerAgencySync::execute no vocbase"; @@ -71,9 +70,7 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { auto clusterInfo = ClusterInfo::instance(); auto plan = clusterInfo->getPlan(); auto current = clusterInfo->getCurrent(); - - DatabaseGuard guard(vocbase); - + DatabaseGuard guard(*vocbase); double startTime = TRI_microtime(); V8Context* context = V8DealerFeature::DEALER->enterContext(vocbase, true, V8DealerFeature::ANY_CONTEXT_OR_PRIORITY); @@ -81,16 +78,17 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { LOG_TOPIC(WARN, arangodb::Logger::HEARTBEAT) << "DBServerAgencySync::execute: no V8 context"; return result; } - + TRI_DEFER(V8DealerFeature::DEALER->exitContext(context)); double now = TRI_microtime(); + if (now - startTime > 5.0) { LOG_TOPIC(WARN, arangodb::Logger::HEARTBEAT) << "DBServerAgencySync::execute took " << Logger::FIXED(now - startTime) << " to get free V8 context, starting handlePlanChange now"; } auto isolate = context->_isolate; - + try { v8::HandleScope scope(isolate); diff --git a/arangod/MMFiles/MMFilesCollectorThread.cpp b/arangod/MMFiles/MMFilesCollectorThread.cpp index 38872d3682..eaee9b9a86 100644 --- a/arangod/MMFiles/MMFilesCollectorThread.cpp +++ b/arangod/MMFiles/MMFilesCollectorThread.cpp @@ -597,11 +597,9 @@ void MMFilesCollectorThread::clearQueuedOperations() { for (auto const& cache : operations) { { arangodb::DatabaseGuard dbGuard(cache->databaseId); - TRI_vocbase_t* vocbase = dbGuard.database(); - TRI_ASSERT(vocbase != nullptr); - - arangodb::CollectionGuard collectionGuard(vocbase, cache->collectionId, - true); + arangodb::CollectionGuard collectionGuard( + &(dbGuard.database()), cache->collectionId, true + ); arangodb::LogicalCollection* collection = collectionGuard.collection(); TRI_ASSERT(collection != nullptr); @@ -732,10 +730,10 @@ void MMFilesCollectorThread::processCollectionMarker( /// @brief process all operations for a single collection int MMFilesCollectorThread::processCollectionOperations(MMFilesCollectorCache* cache) { arangodb::DatabaseGuard dbGuard(cache->databaseId); - TRI_vocbase_t* vocbase = dbGuard.database(); - TRI_ASSERT(vocbase != nullptr); - - arangodb::CollectionGuard collectionGuard(vocbase, cache->collectionId, true); + auto& vocbase = dbGuard.database(); + arangodb::CollectionGuard collectionGuard( + &vocbase, cache->collectionId, true + ); arangodb::LogicalCollection* collection = collectionGuard.collection(); TRI_ASSERT(collection != nullptr); @@ -983,10 +981,9 @@ int MMFilesCollectorThread::transferMarkers(MMFilesWalLogfile* logfile, // prepare database and collection arangodb::DatabaseGuard dbGuard(databaseId); - TRI_vocbase_t* vocbase = dbGuard.database(); - TRI_ASSERT(vocbase != nullptr); - - arangodb::CollectionGuard collectionGuard(vocbase, collectionId, true); + arangodb::CollectionGuard collectionGuard( + &(dbGuard.database()), collectionId, true + ); arangodb::LogicalCollection* collection = collectionGuard.collection(); TRI_ASSERT(collection != nullptr); @@ -1006,13 +1003,14 @@ int MMFilesCollectorThread::transferMarkers(MMFilesWalLogfile* logfile, int res = TRI_ERROR_INTERNAL; uint64_t numBytesTransferred = 0; + try { auto en = static_cast(engine); res = en->transferMarkers(collection, cache.get(), operations, numBytesTransferred); - + LOG_TOPIC(TRACE, Logger::COLLECTOR) << "wal collector transferred markers for '" << collection->name() << ", number of bytes transferred: " << numBytesTransferred; - + if (res == TRI_ERROR_NO_ERROR && !cache->operations->empty()) { queueOperations(logfile, cache); } diff --git a/arangod/MMFiles/MMFilesExportCursor.cpp b/arangod/MMFiles/MMFilesExportCursor.cpp index 769e8dde99..43a30c6107 100644 --- a/arangod/MMFiles/MMFilesExportCursor.cpp +++ b/arangod/MMFiles/MMFilesExportCursor.cpp @@ -32,7 +32,7 @@ #include #include -using namespace arangodb; +namespace arangodb { MMFilesExportCursor::MMFilesExportCursor(TRI_vocbase_t* vocbase, CursorId id, arangodb::MMFilesCollectionExport* ex, size_t batchSize, @@ -73,7 +73,7 @@ VPackSlice MMFilesExportCursor::next() { size_t MMFilesExportCursor::count() const { return _size; } Result MMFilesExportCursor::dump(VPackBuilder& builder) { - auto ctx = transaction::StandaloneContext::Create(_guard.database()); + auto ctx = transaction::StandaloneContext::Create(&(_guard.database())); VPackOptions const* oldOptions = builder.options; builder.options = ctx->getVPackOptions(); @@ -144,5 +144,7 @@ Result MMFilesExportCursor::dump(VPackBuilder& builder) { } std::shared_ptr MMFilesExportCursor::context() const { - return transaction::StandaloneContext::Create(_guard.database()); // likely not used + return transaction::StandaloneContext::Create(&(_guard.database())); // likely not used } + +} // arangodb diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index b836fcd51b..e7871d6181 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -51,10 +51,14 @@ using namespace arangodb::basics; const char* arangodb::pregel::ExecutionStateNames[6] = { "none", "running", "done", "canceled", "in error", "recovering"}; -Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase, - std::vector const& vertexCollections, - std::vector const& edgeCollections, - std::string const& algoName, VPackSlice const& config) +Conductor::Conductor( + uint64_t executionNumber, + TRI_vocbase_t& vocbase, + std::vector const& vertexCollections, + std::vector const& edgeCollections, + std::string const& algoName, + VPackSlice const& config +) : _vocbaseGuard(vocbase), _executionNumber(executionNumber), _algorithm(AlgoRegistry::createAlgorithm(algoName, config)), @@ -243,12 +247,15 @@ void Conductor::finishedWorkerStartup(VPackSlice const& data) { } _computationStartTimeSecs = TRI_microtime(); + if (_startGlobalStep()) { // listens for changing primary DBServers on each collection shard RecoveryManager* mngr = PregelFeature::instance()->recoveryManager(); + if (mngr) { - mngr->monitorCollections(_vocbaseGuard.database()->name(), - _vertexCollections, this); + mngr->monitorCollections( + _vocbaseGuard.database().name(), _vertexCollections, this + ); } } } @@ -523,7 +530,7 @@ int Conductor::_initializeWorkers(std::string const& suffix, _callbackMutex.assertLockedByCurrentThread(); std::string const path = - Utils::baseUrl(_vocbaseGuard.database()->name(), Utils::workerPrefix) + + Utils::baseUrl(_vocbaseGuard.database().name(), Utils::workerPrefix) + suffix; // int64_t vertexCount = 0, edgeCount = 0; @@ -534,12 +541,22 @@ int Conductor::_initializeWorkers(std::string const& suffix, // resolve plan id's and shards on the servers for (CollectionID const& collectionID : _vertexCollections) { - resolveInfo(_vocbaseGuard.database(), collectionID, collectionPlanIdMap, - vertexMap, shardList); // store or + resolveInfo( + &(_vocbaseGuard.database()), + collectionID, + collectionPlanIdMap, + vertexMap, + shardList + ); // store or } for (CollectionID const& collectionID : _edgeCollections) { - resolveInfo(_vocbaseGuard.database(), collectionID, collectionPlanIdMap, - edgeMap, shardList); // store or + resolveInfo( + &(_vocbaseGuard.database()), + collectionID, + collectionPlanIdMap, + edgeMap, + shardList + ); // store or } _dbServers.clear(); @@ -610,22 +627,23 @@ int Conductor::_initializeWorkers(std::string const& suffix, if (ServerState::instance()->getRole() == ServerState::ROLE_SINGLE) { TRI_ASSERT(vertexMap.size() == 1); PregelFeature* feature = PregelFeature::instance(); - std::shared_ptr worker = feature->worker(_executionNumber); + if (worker) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "a worker with this execution number already exists."); } - - TRI_vocbase_t* vocbase = _vocbaseGuard.database(); - auto created = AlgoRegistry::createWorker(vocbase, b.slice()); + + auto created = + AlgoRegistry::createWorker(&(_vocbaseGuard.database()), b.slice()); + TRI_ASSERT(created.get() != nullptr); PregelFeature::instance()->addWorker(std::move(created), _executionNumber); worker = PregelFeature::instance()->worker(_executionNumber); TRI_ASSERT (worker); worker->setupWorker(); + return TRI_ERROR_NO_ERROR; - } else { auto body = std::make_shared(b.toJson()); requests.emplace_back("server:" + server, rest::RequestType::POST, path, @@ -744,16 +762,20 @@ int Conductor::_sendToAllDBServers(std::string const& path, if (ServerState::instance()->isRunningInCluster() == false) { if (handle) { VPackBuilder response; - PregelFeature::handleWorkerRequest(_vocbaseGuard.database(), path, - message.slice(), response); + + PregelFeature::handleWorkerRequest( + &(_vocbaseGuard.database()), path, message.slice(), response + ); handle(response.slice()); } else { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER; scheduler->post([this, path, message] { VPackBuilder response; - PregelFeature::handleWorkerRequest(_vocbaseGuard.database(), path, - message.slice(), response); + + PregelFeature::handleWorkerRequest( + &(_vocbaseGuard.database()), path, message.slice(), response + ); }); } return TRI_ERROR_NO_ERROR; @@ -761,14 +783,17 @@ int Conductor::_sendToAllDBServers(std::string const& path, // cluster case std::shared_ptr cc = ClusterComm::instance(); + if (_dbServers.size() == 0) { LOG_TOPIC(WARN, Logger::PREGEL) << "No servers registered"; return TRI_ERROR_FAILED; } + std::string base = - Utils::baseUrl(_vocbaseGuard.database()->name(), Utils::workerPrefix); + Utils::baseUrl(_vocbaseGuard.database().name(), Utils::workerPrefix); auto body = std::make_shared(message.toJson()); std::vector requests; + for (auto const& server : _dbServers) { requests.emplace_back("server:" + server, rest::RequestType::POST, base + path, body); diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 75ec7d5e2b..4e7ffc0bb1 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -106,10 +106,14 @@ class Conductor { void finishedRecoveryStep(VPackSlice const& data); public: - Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase, - std::vector const& vertexCollections, - std::vector const& edgeCollections, - std::string const& algoName, VPackSlice const& userConfig); + Conductor( + uint64_t executionNumber, + TRI_vocbase_t& vocbase, + std::vector const& vertexCollections, + std::vector const& edgeCollections, + std::string const& algoName, + VPackSlice const& userConfig + ); ~Conductor(); diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 5f32d77f06..857c48c2ef 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -336,13 +336,16 @@ std::unique_ptr GraphStore::_createTransaction() { transaction::Options transactionOptions; transactionOptions.waitForSync = false; transactionOptions.allowImplicitCollections = true; - auto ctx = transaction::StandaloneContext::Create(_vocbaseGuard.database()); + auto ctx = + transaction::StandaloneContext::Create(&(_vocbaseGuard.database())); std::unique_ptr trx( new transaction::UserTransaction(ctx, {}, {}, {}, transactionOptions)); Result res = trx->begin(); + if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } + return trx; } @@ -504,27 +507,39 @@ void GraphStore::_storeVertices(std::vector const& globalShards, if (it->shard() != currentShard) { if (trx) { res = trx->finish(res); + if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } } + currentShard = it->shard(); + ShardID const& shard = globalShards[currentShard]; transaction::Options transactionOptions; + transactionOptions.waitForSync = false; transactionOptions.allowImplicitCollections = false; trx.reset(new transaction::UserTransaction( - transaction::StandaloneContext::Create(_vocbaseGuard.database()), {}, - {shard}, {}, transactionOptions)); + transaction::StandaloneContext::Create(&(_vocbaseGuard.database())), + {}, + {shard}, + {}, + transactionOptions + )); res = trx->begin(); + if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } } transaction::BuilderLeaser b(trx.get()); + b->openArray(); + size_t buffer = 0; + while (it != it.end() && it->shard() == currentShard && buffer < 1000) { // This loop will fill a buffer of vertices until we run into a new // collection diff --git a/arangod/Replication/DatabaseInitialSyncer.cpp b/arangod/Replication/DatabaseInitialSyncer.cpp index 3db3eaced8..31b45b217b 100644 --- a/arangod/Replication/DatabaseInitialSyncer.cpp +++ b/arangod/Replication/DatabaseInitialSyncer.cpp @@ -62,14 +62,20 @@ using namespace arangodb::rest; size_t const DatabaseInitialSyncer::MaxChunkSize = 10 * 1024 * 1024; -DatabaseInitialSyncer::DatabaseInitialSyncer(TRI_vocbase_t* vocbase, - ReplicationApplierConfiguration const& configuration) - : InitialSyncer(configuration), - _vocbase(vocbase), - _hasFlushed(false) { - _vocbases.emplace(vocbase->name(), DatabaseGuard(vocbase)); +DatabaseInitialSyncer::DatabaseInitialSyncer( + TRI_vocbase_t& vocbase, + ReplicationApplierConfiguration const& configuration +): InitialSyncer(configuration), + _vocbase(&vocbase), + _hasFlushed(false) { + _vocbases.emplace( + std::piecewise_construct, + std::forward_as_tuple(vocbase.name()), + std::forward_as_tuple(vocbase) + ); + if (configuration._database.empty()) { - _databaseName = vocbase->name(); + _databaseName = vocbase.name(); } } diff --git a/arangod/Replication/DatabaseInitialSyncer.h b/arangod/Replication/DatabaseInitialSyncer.h index 74b821019b..9149d96bd6 100644 --- a/arangod/Replication/DatabaseInitialSyncer.h +++ b/arangod/Replication/DatabaseInitialSyncer.h @@ -76,23 +76,23 @@ class DatabaseInitialSyncer : public InitialSyncer { PHASE_DROP_CREATE, PHASE_DUMP } sync_phase_e; - - public: - DatabaseInitialSyncer(TRI_vocbase_t*, - ReplicationApplierConfiguration const&); public: - + DatabaseInitialSyncer( + TRI_vocbase_t& vocbase, + ReplicationApplierConfiguration const& configuration + ); + /// @brief run method, performs a full synchronization Result run(bool incremental) override { return runWithInventory(incremental, velocypack::Slice::noneSlice()); } - + /// @brief run method, performs a full synchronization with the /// given list of collections. Result runWithInventory(bool incremental, velocypack::Slice collections); - + TRI_vocbase_t* resolveVocbase(velocypack::Slice const&) override { return _vocbase; } /// @brief translate a phase to a phase name @@ -114,12 +114,12 @@ class DatabaseInitialSyncer : public InitialSyncer { TRI_vocbase_t* vocbase() const { TRI_ASSERT(vocbases().size() == 1); - return vocbases().begin()->second.database(); + return &(vocbases().begin()->second.database()); } - + /// @brief check whether the initial synchronization should be aborted bool isAborted() const override; - + /// @brief insert the batch id and barrier ID. /// For use in globalinitalsyncer void useAsChildSyncer(Syncer::MasterInfo const& info, @@ -132,7 +132,7 @@ class DatabaseInitialSyncer : public InitialSyncer { _batchId = batchId; _batchUpdateTime = batchUpdateTime; } - + /// @brief last time the barrier was extended or created /// The barrier prevents the deletion of WAL files for mmfiles double barrierUpdateTime() const { return _barrierUpdateTime; } diff --git a/arangod/Replication/DatabaseReplicationApplier.cpp b/arangod/Replication/DatabaseReplicationApplier.cpp index aba48bb53d..867d022e82 100644 --- a/arangod/Replication/DatabaseReplicationApplier.cpp +++ b/arangod/Replication/DatabaseReplicationApplier.cpp @@ -42,17 +42,19 @@ #include #include -using namespace arangodb; +namespace arangodb { /// @brief replication applier for a single database, without configuration -DatabaseReplicationApplier::DatabaseReplicationApplier(TRI_vocbase_t* vocbase) +DatabaseReplicationApplier::DatabaseReplicationApplier(TRI_vocbase_t& vocbase) : DatabaseReplicationApplier(ReplicationApplierConfiguration(), vocbase) {} /// @brief replication applier for a single database, with configuration -DatabaseReplicationApplier::DatabaseReplicationApplier(ReplicationApplierConfiguration const& configuration, - TRI_vocbase_t* vocbase) - : ReplicationApplier(configuration, std::string("database '") + vocbase->name() + "'"), - _vocbase(vocbase) {} +DatabaseReplicationApplier::DatabaseReplicationApplier( + ReplicationApplierConfiguration const& configuration, + TRI_vocbase_t& vocbase +): ReplicationApplier(configuration, std::string("database '") + vocbase.name() + "'"), + _vocbase(vocbase) { +} DatabaseReplicationApplier::~DatabaseReplicationApplier() { try { @@ -62,7 +64,7 @@ DatabaseReplicationApplier::~DatabaseReplicationApplier() { /// @brief execute the check condition bool DatabaseReplicationApplier::applies() const { - return (_vocbase->type() == TRI_VOCBASE_TYPE_NORMAL); + return (_vocbase.type() == TRI_VOCBASE_TYPE_NORMAL); } /// @brief configure the replication applier @@ -71,7 +73,7 @@ void DatabaseReplicationApplier::reconfigure(ReplicationApplierConfiguration con // no database THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no database configured"); } - + ReplicationApplier::reconfigure(configuration); } @@ -88,16 +90,21 @@ void DatabaseReplicationApplier::forget() { removeState(); StorageEngine* engine = EngineSelectorFeature::ENGINE; - engine->removeReplicationApplierConfiguration(_vocbase); + + engine->removeReplicationApplierConfiguration(&_vocbase); _configuration.reset(); } /// @brief factory function for creating a database-specific replication applier -DatabaseReplicationApplier* DatabaseReplicationApplier::create(TRI_vocbase_t* vocbase) { +/*static*/ DatabaseReplicationApplier* DatabaseReplicationApplier::create( + TRI_vocbase_t& vocbase +) { std::unique_ptr applier; - if (vocbase->type() == TRI_VOCBASE_TYPE_NORMAL) { - applier = std::make_unique(DatabaseReplicationApplier::loadConfiguration(vocbase), vocbase); + if (vocbase.type() == TRI_VOCBASE_TYPE_NORMAL) { + applier = std::make_unique( + DatabaseReplicationApplier::loadConfiguration(&vocbase), vocbase + ); applier->loadState(); } else { applier = std::make_unique(vocbase); @@ -129,8 +136,9 @@ void DatabaseReplicationApplier::storeConfiguration(bool doSync) { if (!applies()) { return; } - + VPackBuilder builder; + builder.openObject(); _configuration.toVelocyPack(builder, true, true); builder.close(); @@ -138,8 +146,10 @@ void DatabaseReplicationApplier::storeConfiguration(bool doSync) { LOG_TOPIC(DEBUG, Logger::REPLICATION) << "storing applier configuration " << builder.slice().toJson() << " for " << _databaseName; StorageEngine* engine = EngineSelectorFeature::ENGINE; - int res = engine->saveReplicationApplierConfiguration(_vocbase, builder.slice(), doSync); - + int res = engine->saveReplicationApplierConfiguration( + &_vocbase, builder.slice(), doSync + ); + if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } @@ -154,8 +164,13 @@ std::unique_ptr DatabaseReplicationApplier::buildTailingSyncer(TR return std::make_unique(_vocbase, _configuration, initialTick, useTick, barrierId); } - + std::string DatabaseReplicationApplier::getStateFilename() const { StorageEngine* engine = EngineSelectorFeature::ENGINE; - return arangodb::basics::FileUtils::buildFilename(engine->databasePath(_vocbase), "REPLICATION-APPLIER-STATE"); + + return arangodb::basics::FileUtils::buildFilename( + engine->databasePath(&_vocbase), "REPLICATION-APPLIER-STATE" + ); } + +} // arangodb diff --git a/arangod/Replication/DatabaseReplicationApplier.h b/arangod/Replication/DatabaseReplicationApplier.h index dab8ad49b8..cf36e6ddb6 100644 --- a/arangod/Replication/DatabaseReplicationApplier.h +++ b/arangod/Replication/DatabaseReplicationApplier.h @@ -31,16 +31,19 @@ struct TRI_vocbase_t; namespace arangodb { + /// @brief replication applier for a single database class DatabaseReplicationApplier final : public ReplicationApplier { friend class DatabaseTailingSyncer; friend class RestReplicationHandler; public: - explicit DatabaseReplicationApplier(TRI_vocbase_t* vocbase); + explicit DatabaseReplicationApplier(TRI_vocbase_t& vocbase); - DatabaseReplicationApplier(ReplicationApplierConfiguration const& configuration, - TRI_vocbase_t* vocbase); + DatabaseReplicationApplier( + ReplicationApplierConfiguration const& configuration, + TRI_vocbase_t& vocbase + ); ~DatabaseReplicationApplier(); @@ -55,26 +58,26 @@ class DatabaseReplicationApplier final : public ReplicationApplier { /// @brief configure the replication applier void reconfigure(ReplicationApplierConfiguration const& configuration) override; - + /// @brief store the configuration for the applier void storeConfiguration(bool doSync) override; /// @brief factory function for creating a database-specific replication applier - static DatabaseReplicationApplier* create(TRI_vocbase_t* vocbase); + static DatabaseReplicationApplier* create(TRI_vocbase_t& vocbase); /// @brief load a persisted configuration for the applier static ReplicationApplierConfiguration loadConfiguration(TRI_vocbase_t* vocbase); - + std::unique_ptr buildInitialSyncer() const override; std::unique_ptr buildTailingSyncer(TRI_voc_tick_t initialTick, bool useTick, TRI_voc_tick_t barrierId) const override; - + protected: std::string getStateFilename() const override; - + private: - TRI_vocbase_t* _vocbase; + TRI_vocbase_t& _vocbase; }; } -#endif +#endif \ No newline at end of file diff --git a/arangod/Replication/DatabaseTailingSyncer.cpp b/arangod/Replication/DatabaseTailingSyncer.cpp index ed7b1b8e3e..2b4e6c0383 100644 --- a/arangod/Replication/DatabaseTailingSyncer.cpp +++ b/arangod/Replication/DatabaseTailingSyncer.cpp @@ -56,15 +56,27 @@ using namespace arangodb::httpclient; using namespace arangodb::rest; DatabaseTailingSyncer::DatabaseTailingSyncer( - TRI_vocbase_t* vocbase, + TRI_vocbase_t& vocbase, ReplicationApplierConfiguration const& configuration, - TRI_voc_tick_t initialTick, bool useTick, TRI_voc_tick_t barrierId) - : TailingSyncer(vocbase->replicationApplier(), - configuration, initialTick, useTick, barrierId), - _vocbase(vocbase) { - _vocbases.emplace(vocbase->name(), DatabaseGuard(vocbase)); + TRI_voc_tick_t initialTick, + bool useTick, + TRI_voc_tick_t barrierId +): TailingSyncer( + vocbase.replicationApplier(), + configuration, + initialTick, + useTick, + barrierId + ), + _vocbase(&vocbase) { + _vocbases.emplace( + std::piecewise_construct, + std::forward_as_tuple(vocbase.name()), + std::forward_as_tuple(vocbase) + ); + if (configuration._database.empty()) { - _databaseName = vocbase->name(); + _databaseName = vocbase.name(); } } diff --git a/arangod/Replication/DatabaseTailingSyncer.h b/arangod/Replication/DatabaseTailingSyncer.h index 05a19adb5c..c4cf0f4505 100644 --- a/arangod/Replication/DatabaseTailingSyncer.h +++ b/arangod/Replication/DatabaseTailingSyncer.h @@ -33,40 +33,41 @@ class DatabaseReplicationApplier; class DatabaseTailingSyncer : public TailingSyncer { public: - DatabaseTailingSyncer(TRI_vocbase_t*, - ReplicationApplierConfiguration const&, - TRI_voc_tick_t initialTick, bool useTick, - TRI_voc_tick_t barrierId); + DatabaseTailingSyncer( + TRI_vocbase_t& vocbase, + ReplicationApplierConfiguration const& configuration, + TRI_voc_tick_t initialTick, + bool useTick, + TRI_voc_tick_t barrierId + ); - public: - TRI_vocbase_t* resolveVocbase(velocypack::Slice const&) override { return _vocbase; } /// @brief return the syncer's replication applier DatabaseReplicationApplier* applier() const { return static_cast(_applier); } - + /// @brief finalize the synchronization of a collection by tailing the WAL /// and filtering on the collection name until no more data is available Result syncCollectionFinalize(std::string const& collectionName); - + protected: - + /// @brief save the current applier state Result saveApplierState() override; - + TRI_vocbase_t* vocbase() const { TRI_ASSERT(vocbases().size() == 1); - return vocbases().begin()->second.database(); + return &(vocbases().begin()->second.database()); } private: - + /// @brief vocbase to use for this run TRI_vocbase_t* _vocbase; }; } -#endif +#endif \ No newline at end of file diff --git a/arangod/Replication/GlobalInitialSyncer.cpp b/arangod/Replication/GlobalInitialSyncer.cpp index a4cf1e0a43..7872a9184a 100644 --- a/arangod/Replication/GlobalInitialSyncer.cpp +++ b/arangod/Replication/GlobalInitialSyncer.cpp @@ -158,46 +158,49 @@ Result GlobalInitialSyncer::runInternal(bool incremental) { VPackSlice const nameSlice = it.get("name"); VPackSlice const idSlice = it.get("id"); VPackSlice const collections = it.get("collections"); + if (!nameSlice.isString() || !idSlice.isString() || !collections.isArray()) { return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, "database declaration is invalid in response"); } - + TRI_vocbase_t* vocbase = resolveVocbase(nameSlice); + if (vocbase == nullptr) { return Result(TRI_ERROR_INTERNAL, "vocbase not found"); } - + DatabaseGuard guard(nameSlice.copyString()); - + // change database name in place auto configurationCopy = _configuration; + configurationCopy._database = nameSlice.copyString(); - - DatabaseInitialSyncer syncer(vocbase, configurationCopy); - + + DatabaseInitialSyncer syncer(*vocbase, configurationCopy); + syncer.useAsChildSyncer(_masterInfo, _barrierId, _barrierUpdateTime, _batchId, _batchUpdateTime); + // run the syncer with the supplied inventory collections Result r = syncer.runWithInventory(false, collections); if (r.fail()) { return r; } - + // we need to pass on the update times to the next syncer _barrierUpdateTime = syncer.barrierUpdateTime(); _batchUpdateTime = syncer.batchUpdateTime(); - + sendExtendBatch(); sendExtendBarrier(); } - } catch (...) { return Result(TRI_ERROR_INTERNAL, "caught an unexpected exception"); } - + return TRI_ERROR_NO_ERROR; } diff --git a/arangod/Replication/ReplicationTransaction.h b/arangod/Replication/ReplicationTransaction.h index 19934956e5..fcc2b2b5a6 100644 --- a/arangod/Replication/ReplicationTransaction.h +++ b/arangod/Replication/ReplicationTransaction.h @@ -37,16 +37,13 @@ namespace arangodb { class ReplicationTransaction : public transaction::Methods { public: /// @brief create the transaction - explicit ReplicationTransaction(TRI_vocbase_t* vocbase) - : transaction::Methods(transaction::StandaloneContext::Create(vocbase)), - _guard(vocbase) { - + explicit ReplicationTransaction(TRI_vocbase_t& vocbase) + : transaction::Methods(transaction::StandaloneContext::Create(&vocbase)), + _guard(vocbase) { TRI_ASSERT(_state != nullptr); _state->setType(AccessMode::Type::EXCLUSIVE); } - public: - /// @brief get a collection by id /// this will automatically add the collection to the transaction /*inline TransactionCollection* trxCollection(TRI_voc_cid_t cid, AccessMode::Type) const override { @@ -79,4 +76,4 @@ class ReplicationTransaction : public transaction::Methods { } -#endif +#endif \ No newline at end of file diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index 4d2df46a66..566839f840 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -356,24 +356,32 @@ TRI_vocbase_t* Syncer::resolveVocbase(VPackSlice const& slice) { } else if (slice.isString()) { name = slice.copyString(); } + if (name.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_REPLICATION_INVALID_RESPONSE, "could not resolve vocbase id / name"); } - + // will work with either names or id's auto const& it = _vocbases.find(name); + if (it == _vocbases.end()) { // automatically checks for id in string TRI_vocbase_t* vocbase = DatabaseFeature::DATABASE->lookupDatabase(name); + if (vocbase != nullptr) { - _vocbases.emplace(name, DatabaseGuard(vocbase)); + _vocbases.emplace( + std::piecewise_construct, + std::forward_as_tuple(name), + std::forward_as_tuple(*vocbase) + ); } else { LOG_TOPIC(DEBUG, Logger::REPLICATION) << "could not find database '" << name << "'"; } + return vocbase; } else { - return it->second.database(); + return &(it->second.database()); } } diff --git a/arangod/Replication/TailingSyncer.cpp b/arangod/Replication/TailingSyncer.cpp index 142be82369..24d4439a70 100644 --- a/arangod/Replication/TailingSyncer.cpp +++ b/arangod/Replication/TailingSyncer.cpp @@ -444,7 +444,7 @@ Result TailingSyncer::startTransaction(VPackSlice const& slice) { LOG_TOPIC(TRACE, Logger::REPLICATION) << "starting replication transaction " << tid; - auto trx = std::make_unique(vocbase); + auto trx = std::make_unique(*vocbase); Result res = trx->begin(); if (res.ok()) { diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 1bf70cf015..24a975f2a2 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -1755,13 +1755,15 @@ void RestReplicationHandler::handleCommandSync() { TRI_ASSERT(!config._skipCreateDrop); std::unique_ptr syncer; + if (isGlobal) { syncer.reset(new GlobalInitialSyncer(config)); } else { - syncer.reset(new DatabaseInitialSyncer(&_vocbase, config)); + syncer.reset(new DatabaseInitialSyncer(_vocbase, config)); } Result r = syncer->run(config._incremental); + if (r.fail()) { LOG_TOPIC(ERR, Logger::REPLICATION) << "failed to sync: " << r.errorMessage(); diff --git a/arangod/RocksDBEngine/RocksDBExportCursor.cpp b/arangod/RocksDBEngine/RocksDBExportCursor.cpp index b5dc1d6dc2..465a872e32 100644 --- a/arangod/RocksDBEngine/RocksDBExportCursor.cpp +++ b/arangod/RocksDBEngine/RocksDBExportCursor.cpp @@ -107,8 +107,9 @@ VPackSlice RocksDBExportCursor::next() { size_t RocksDBExportCursor::count() const { return _size; } Result RocksDBExportCursor::dump(VPackBuilder& builder) { - auto ctx = transaction::StandaloneContext::Create(_guard.database()); + auto ctx = transaction::StandaloneContext::Create(&(_guard.database())); VPackOptions const* oldOptions = builder.options; + builder.options = ctx->getVPackOptions(); TRI_ASSERT(_iter.get() != nullptr); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 4cbe789e09..4f0931425c 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -102,10 +102,12 @@ uint64_t RocksDBReplicationContext::count() const { TRI_vocbase_t* RocksDBReplicationContext::vocbase() const { MUTEX_LOCKER(locker, _contextLock); + if (!_guard) { return nullptr; } - return _guard->database(); + + return &(_guard->database()); } // creates new transaction/snapshot @@ -116,7 +118,7 @@ void RocksDBReplicationContext::bind(TRI_vocbase_t* vocbase) { void RocksDBReplicationContext::internalBind(TRI_vocbase_t* vocbase, bool allowChange) { - if (!_trx || !_guard || (_guard->database() != vocbase)) { + if (!_trx || !_guard || (&(_guard->database()) != vocbase)) { TRI_ASSERT(allowChange); rocksdb::Snapshot const* snap = nullptr; if (_trx) { @@ -177,7 +179,8 @@ int RocksDBReplicationContext::bindCollection( int RocksDBReplicationContext::chooseDatabase(TRI_vocbase_t* vocbase) { TRI_ASSERT(_users > 0); MUTEX_LOCKER(locker, _contextLock); - if (_guard->database() == vocbase) { + + if (&(_guard->database()) == vocbase) { return TRI_ERROR_NO_ERROR; // nothing to do here } @@ -242,17 +245,23 @@ RocksDBReplicationResult RocksDBReplicationContext::dump( } }; TRI_DEFER(release()); + { MUTEX_LOCKER(writeLocker, _contextLock); TRI_ASSERT(vocbase != nullptr); - if (!_trx || !_guard || (_guard->database() != vocbase)) { + + if (!_trx || !_guard || (&(_guard->database()) != vocbase)) { return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick); } + TRI_voc_cid_t const id{::normalizeIdentifier(*_trx, collectionName)}; + if (0 == id) { return RocksDBReplicationResult{TRI_ERROR_BAD_PARAMETER, _lastTick}; } + collection = getCollectionIterator(id); + if (!collection) { return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick); } diff --git a/arangod/StorageEngine/WalAccess.cpp b/arangod/StorageEngine/WalAccess.cpp index e89832715a..ec986432c6 100644 --- a/arangod/StorageEngine/WalAccess.cpp +++ b/arangod/StorageEngine/WalAccess.cpp @@ -55,15 +55,22 @@ bool WalAccessContext::shouldHandleCollection(TRI_voc_tick_t dbid, TRI_vocbase_t* WalAccessContext::loadVocbase(TRI_voc_tick_t dbid) { TRI_ASSERT(dbid != 0); auto const& it = _vocbases.find(dbid); + if (it == _vocbases.end()) { TRI_vocbase_t* vocbase = DatabaseFeature::DATABASE->useDatabase(dbid); + if (vocbase != nullptr) { TRI_DEFER(vocbase->release()); - _vocbases.emplace(dbid, DatabaseGuard(vocbase)); + _vocbases.emplace( + std::piecewise_construct, + std::forward_as_tuple(dbid), + std::forward_as_tuple(*vocbase) + ); } + return vocbase; } else { - return it->second.database(); + return &(it->second.database()); } } diff --git a/arangod/Utils/CursorRepository.cpp b/arangod/Utils/CursorRepository.cpp index 2b3897787f..2efc3760a7 100644 --- a/arangod/Utils/CursorRepository.cpp +++ b/arangod/Utils/CursorRepository.cpp @@ -40,7 +40,7 @@ size_t const CursorRepository::MaxCollectCount = 32; /// @brief create a cursor repository //////////////////////////////////////////////////////////////////////////////// -CursorRepository::CursorRepository(TRI_vocbase_t* vocbase) +CursorRepository::CursorRepository(TRI_vocbase_t& vocbase) : _vocbase(vocbase), _lock(), _cursors() { _cursors.reserve(64); } diff --git a/arangod/Utils/CursorRepository.h b/arangod/Utils/CursorRepository.h index ff758b2e4c..b0221eeba5 100644 --- a/arangod/Utils/CursorRepository.h +++ b/arangod/Utils/CursorRepository.h @@ -32,6 +32,7 @@ struct TRI_vocbase_t; namespace arangodb { + namespace velocypack { class Builder; } @@ -46,7 +47,7 @@ class CursorRepository { /// @brief create a cursors repository ////////////////////////////////////////////////////////////////////////////// - explicit CursorRepository(TRI_vocbase_t*); + explicit CursorRepository(TRI_vocbase_t& vocbase); ////////////////////////////////////////////////////////////////////////////// /// @brief destroy a cursors repository @@ -54,15 +55,13 @@ class CursorRepository { ~CursorRepository(); - public: - //////////////////////////////////////////////////////////////////////////////// /// @brief stores a cursor in the registry /// the repository will take ownership of the cursor //////////////////////////////////////////////////////////////////////////////// Cursor* addCursor(std::unique_ptr cursor); - + ////////////////////////////////////////////////////////////////////////////// /// @brief creates a cursor and stores it in the registry /// the cursor will be returned with the usage flag set to true. it must be @@ -71,19 +70,19 @@ class CursorRepository { ////////////////////////////////////////////////////////////////////////////// Cursor* createFromQueryResult(aql::QueryResult&&, size_t, double, bool); - + ////////////////////////////////////////////////////////////////////////////// /// @brief creates a cursor and stores it in the registry /// the cursor will be returned with the usage flag set to true. it must be /// returned later using release() /// the cursor will create a query internally and retain it until deleted ////////////////////////////////////////////////////////////////////////////// - + Cursor* createQueryStream(std::string const& query, std::shared_ptr const& binds, std::shared_ptr const& opts, size_t batchSize, double ttl); - + ////////////////////////////////////////////////////////////////////////////// /// @brief remove a cursor by id ////////////////////////////////////////////////////////////////////////////// @@ -121,7 +120,7 @@ class CursorRepository { /// @brief vocbase ////////////////////////////////////////////////////////////////////////////// - TRI_vocbase_t* _vocbase; + TRI_vocbase_t& _vocbase; ////////////////////////////////////////////////////////////////////////////// /// @brief mutex for the cursors repository @@ -141,6 +140,7 @@ class CursorRepository { static size_t const MaxCollectCount; }; + } -#endif +#endif \ No newline at end of file diff --git a/arangod/Utils/DatabaseGuard.cpp b/arangod/Utils/DatabaseGuard.cpp index 54fb30f910..a4f072f9d6 100644 --- a/arangod/Utils/DatabaseGuard.cpp +++ b/arangod/Utils/DatabaseGuard.cpp @@ -26,32 +26,48 @@ #include "Basics/Exceptions.h" #include "RestServer/DatabaseFeature.h" -using namespace arangodb; +namespace { -/// @brief create the guard, using a database id -DatabaseGuard::DatabaseGuard(TRI_voc_tick_t id) : _vocbase(nullptr) { - DatabaseFeature* databaseFeature = DatabaseFeature::DATABASE; - _vocbase = databaseFeature->useDatabase(id); +template +TRI_vocbase_t& vocbase(T& id) { + auto* databaseFeature = arangodb::DatabaseFeature::DATABASE; - if (_vocbase == nullptr) { + if (!databaseFeature) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - TRI_ASSERT(!_vocbase->isDangling()); + auto* vocbase = databaseFeature->useDatabase(id); + + if (!vocbase) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + } + + return *vocbase; +} + +template<> +TRI_vocbase_t& vocbase(TRI_vocbase_t*& vocbase) { + // check from the original constructor of DatabaseGuard + TRI_ASSERT(vocbase != nullptr); + + return *vocbase; +} + +} // namespace + +namespace arangodb { + +DatabaseGuard::DatabaseGuard(TRI_vocbase_t* ptr): DatabaseGuard(vocbase(ptr)) { +} + +/// @brief create the guard, using a database id +DatabaseGuard::DatabaseGuard(TRI_voc_tick_t id): _vocbase(vocbase(id)) { + TRI_ASSERT(!_vocbase.isDangling()); } /// @brief create the guard, using a database name -DatabaseGuard::DatabaseGuard(std::string const& name) : _vocbase(nullptr) { - DatabaseFeature* databaseFeature = DatabaseFeature::DATABASE; - _vocbase = databaseFeature->useDatabase(name); - - if (_vocbase == nullptr) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - } - - TRI_ASSERT(!_vocbase->isDangling()); +DatabaseGuard::DatabaseGuard(std::string const& name): _vocbase(vocbase(name)) { + TRI_ASSERT(!_vocbase.isDangling()); } -DatabaseGuard::DatabaseGuard(DatabaseGuard&& other) : _vocbase(other._vocbase) { - other._vocbase = nullptr; -} +} // arangodb diff --git a/arangod/Utils/DatabaseGuard.h b/arangod/Utils/DatabaseGuard.h index 50d5a16906..14eccf7577 100644 --- a/arangod/Utils/DatabaseGuard.h +++ b/arangod/Utils/DatabaseGuard.h @@ -32,17 +32,22 @@ namespace arangodb { /// dropped while still using it. class DatabaseGuard { public: + DatabaseGuard(DatabaseGuard&&) = delete; DatabaseGuard(DatabaseGuard const&) = delete; DatabaseGuard& operator=(DatabaseGuard const&) = delete; /// @brief create guard on existing db - explicit DatabaseGuard(TRI_vocbase_t* vocbase) : _vocbase(vocbase) { - TRI_ASSERT(vocbase != nullptr); - if (!_vocbase->use()) { + explicit DatabaseGuard(TRI_vocbase_t& vocbase): _vocbase(vocbase) { + if (!_vocbase.use()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } } + /// @brief create guard on existing db pointer (not nullptr) + /// @deprecated DO NOT USE for new code + /// FIXME TODO remove once V8Task and arangodb::pregel::GraphStore are fixed + explicit DatabaseGuard(TRI_vocbase_t* vocbase); + /// @brief create the guard, using a database id explicit DatabaseGuard(TRI_voc_tick_t id); @@ -51,22 +56,18 @@ class DatabaseGuard { /// @brief destroy the guard ~DatabaseGuard() { - if (_vocbase != nullptr) { - TRI_ASSERT(!_vocbase->isDangling()); - _vocbase->release(); - } + TRI_ASSERT(!_vocbase.isDangling()); + _vocbase.release(); } - DatabaseGuard(DatabaseGuard&&); - - public: /// @brief return the database pointer - inline TRI_vocbase_t* database() const { return _vocbase; } + inline TRI_vocbase_t& database() const { return _vocbase; } private: /// @brief pointer to database - TRI_vocbase_t* _vocbase; + TRI_vocbase_t& _vocbase; }; + } -#endif +#endif \ No newline at end of file diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 4e02ee87fa..386e2db14e 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1913,6 +1913,8 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { } TRI_vocbase_t* vocbase = GetContextVocBase(isolate); + TRI_ASSERT(vocbase != nullptr); // for clarity assert duplicated from v8-util.cpp GetContextVocBase(...) + for (std::string const& name : paramVertices) { if (ss->isCoordinator()) { try { @@ -1982,8 +1984,9 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { } uint64_t en = pregel::PregelFeature::instance()->createExecutionNumber(); - auto c = std::make_unique(en, vocbase, paramVertices, edgeColls, - algorithm, paramBuilder.slice()); + auto c = std::make_unique( + en, *vocbase, paramVertices, edgeColls, algorithm, paramBuilder.slice() + ); pregel::PregelFeature::instance()->addConductor(std::move(c), en); TRI_ASSERT(pregel::PregelFeature::instance()->conductor(en)); pregel::PregelFeature::instance()->conductor(en)->start(); diff --git a/arangod/V8Server/v8-dispatcher.cpp b/arangod/V8Server/v8-dispatcher.cpp index dc577b5e4b..1d27839f85 100644 --- a/arangod/V8Server/v8-dispatcher.cpp +++ b/arangod/V8Server/v8-dispatcher.cpp @@ -232,7 +232,7 @@ void V8Task::removeTasksForDatabase(std::string const& name) { } bool V8Task::databaseMatches(std::string const& name) const { - return (_dbGuard->database()->name() == name); + return (_dbGuard->database().name() == name); } V8Task::V8Task(std::string const& id, std::string const& name, @@ -299,14 +299,16 @@ V8Task::callbackFunction() { // get the permissions to be used by this task bool allowContinue = true; - std::unique_ptr execContext; + if (!_user.empty()) { // not superuser - std::string const& dbname = _dbGuard->database()->name(); + auto& dbname = _dbGuard->database().name(); + execContext.reset(ExecContext::create(_user, dbname)); allowContinue = execContext->canUseDatabase(dbname, auth::Level::RW); allowContinue = allowContinue && ServerState::writeOpsEnabled(); } + ExecContextScope scope(_user.empty() ? ExecContext::superuser() : execContext.get()); @@ -412,12 +414,13 @@ void V8Task::toVelocyPack(VPackBuilder& builder) const { builder.add("offset", VPackValue(_offset.count() / 1000000.0)); builder.add("command", VPackValue(_command)); - builder.add("database", VPackValue(_dbGuard->database()->name())); + builder.add("database", VPackValue(_dbGuard->database().name())); } void V8Task::work(ExecContext const* exec) { - auto context = V8DealerFeature::DEALER->enterContext(_dbGuard->database(), - _allowUseDatabase); + auto context = V8DealerFeature::DEALER->enterContext( + &(_dbGuard->database()), _allowUseDatabase + ); // note: the context might be 0 in case of shut-down if (context == nullptr) { diff --git a/arangod/V8Server/v8-replication.cpp b/arangod/V8Server/v8-replication.cpp index c24f62099b..3351734dad 100644 --- a/arangod/V8Server/v8-replication.cpp +++ b/arangod/V8Server/v8-replication.cpp @@ -191,25 +191,24 @@ static void SynchronizeReplication( // treat the argument as an object from now on v8::Handle object = v8::Handle::Cast(args[0]); - VPackBuilder builder; int res = TRI_V8ToVPack(isolate, builder, args[0], false); if (res != TRI_ERROR_NO_ERROR) { TRI_V8_THROW_EXCEPTION(res); } - + TRI_vocbase_t* vocbase = GetContextVocBase(isolate); if (vocbase == nullptr) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - + std::string databaseName; if (applierType == APPLIER_DATABASE) { databaseName = vocbase->name(); } - + bool keepBarrier = false; if (object->Has(TRI_V8_ASCII_STRING(isolate, "keepBarrier"))) { keepBarrier = @@ -221,12 +220,11 @@ static void SynchronizeReplication( v8::Handle result = v8::Object::New(isolate); std::unique_ptr syncer; - if (applierType == APPLIER_DATABASE) { // database-specific synchronization - syncer.reset(new DatabaseInitialSyncer(vocbase, configuration)); - + syncer.reset(new DatabaseInitialSyncer(*vocbase, configuration)); + if (object->Has(TRI_V8_ASCII_STRING(isolate, "leaderId"))) { syncer->setLeaderId(TRI_ObjectToString(object->Get(TRI_V8_ASCII_STRING(isolate, "leaderId")))); } diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index e89f7cf3af..34ba164b25 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -1773,7 +1773,7 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id, _deadlockDetector(false), _userStructures(nullptr) { _queries.reset(new arangodb::aql::QueryList(this)); - _cursorRepository.reset(new arangodb::CursorRepository(this)); + _cursorRepository.reset(new arangodb::CursorRepository(*this)); _collectionKeys.reset(new arangodb::CollectionKeysRepository()); // init collections @@ -1856,7 +1856,8 @@ bool TRI_vocbase_t::IsAllowedName( } void TRI_vocbase_t::addReplicationApplier() { - DatabaseReplicationApplier* applier = DatabaseReplicationApplier::create(this); + auto* applier = DatabaseReplicationApplier::create(*this); + _replicationApplier.reset(applier); } @@ -1866,6 +1867,7 @@ void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, double ttl if (ttl <= 0.0) { ttl = InitialSyncer::defaultBatchTimeout; } + double const expires = TRI_microtime() + ttl; WRITE_LOCKER(writeLocker, _replicationClientsLock);