//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Michael Hackstein /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// #include "LogicalCollection.h" #include "Aql/QueryCache.h" #include "Basics/Mutex.h" #include "Basics/ReadLocker.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "Cluster/ServerState.h" #include "RestServer/DatabaseFeature.h" #include "Sharding/ShardingInfo.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/PhysicalCollection.h" #include "StorageEngine/StorageEngine.h" #include "Transaction/Helpers.h" #include "Transaction/StandaloneContext.h" #include "Utils/CollectionNameResolver.h" #include "Utils/SingleCollectionTransaction.h" #include "VocBase/KeyGenerator.h" #include "VocBase/ManagedDocumentResult.h" #include #include #include using namespace arangodb; using Helper = arangodb::basics::VelocyPackHelper; namespace { static std::string translateStatus(TRI_vocbase_col_status_e status) { switch (status) { case TRI_VOC_COL_STATUS_UNLOADED: return "unloaded"; case TRI_VOC_COL_STATUS_LOADED: return "loaded"; case TRI_VOC_COL_STATUS_UNLOADING: return "unloading"; case TRI_VOC_COL_STATUS_DELETED: return "deleted"; case TRI_VOC_COL_STATUS_LOADING: return "loading"; case TRI_VOC_COL_STATUS_CORRUPTED: case TRI_VOC_COL_STATUS_NEW_BORN: default: return "unknown"; } } std::string readGloballyUniqueId(arangodb::velocypack::Slice info) { static const std::string empty; auto guid = arangodb::basics::VelocyPackHelper::getStringValue(info, arangodb::StaticStrings::DataSourceGuid, empty); if (!guid.empty()) { return guid; } auto version = arangodb::basics::VelocyPackHelper::getNumericValue( info, "version", static_cast(LogicalCollection::currentVersion())); // predictable UUID for legacy collections if (static_cast(version) < LogicalCollection::Version::v33 && info.isObject()) { return arangodb::basics::VelocyPackHelper::getStringValue(info, arangodb::StaticStrings::DataSourceName, empty); } return empty; } std::string readStringValue(arangodb::velocypack::Slice info, std::string const& name, std::string const& def) { return info.isObject() ? Helper::getStringValue(info, name, def) : def; } arangodb::LogicalDataSource::Type const& readType(arangodb::velocypack::Slice info, std::string const& key, TRI_col_type_e def) { static const auto& document = arangodb::LogicalDataSource::Type::emplace( arangodb::velocypack::StringRef("document")); static const auto& edge = arangodb::LogicalDataSource::Type::emplace(arangodb::velocypack::StringRef("edge")); // arbitrary system-global value for unknown static const auto& unknown = arangodb::LogicalDataSource::Type::emplace(arangodb::velocypack::StringRef()); switch (Helper::getNumericValue(info, key, def)) { case TRI_col_type_e::TRI_COL_TYPE_DOCUMENT: return document; case TRI_col_type_e::TRI_COL_TYPE_EDGE: return edge; default: return unknown; } } } // namespace // The Slice contains the part of the plan that // is relevant for this collection. LogicalCollection::LogicalCollection(TRI_vocbase_t& vocbase, VPackSlice const& info, bool isAStub, uint64_t planVersion /*= 0*/ ) : LogicalDataSource( LogicalCollection::category(), ::readType(info, StaticStrings::DataSourceType, TRI_COL_TYPE_UNKNOWN), vocbase, Helper::extractIdValue(info), ::readGloballyUniqueId(info), Helper::stringUInt64(info.get(StaticStrings::DataSourcePlanId)), ::readStringValue(info, StaticStrings::DataSourceName, ""), planVersion, TRI_vocbase_t::IsSystemName( ::readStringValue(info, StaticStrings::DataSourceName, "")) && Helper::getBooleanValue(info, StaticStrings::DataSourceSystem, false), Helper::getBooleanValue(info, StaticStrings::DataSourceDeleted, false)), _version(static_cast(Helper::getNumericValue(info, "version", static_cast(currentVersion())))), _v8CacheVersion(0), _type(Helper::getNumericValue(info, StaticStrings::DataSourceType, TRI_COL_TYPE_UNKNOWN)), _status(Helper::getNumericValue( info, "status", TRI_VOC_COL_STATUS_CORRUPTED)), _isAStub(isAStub), #ifdef USE_ENTERPRISE _isSmart(Helper::getBooleanValue(info, StaticStrings::IsSmart, false)), #endif _waitForSync(Helper::getBooleanValue(info, StaticStrings::WaitForSyncString, false)), _allowUserKeys(Helper::getBooleanValue(info, "allowUserKeys", true)), #ifdef USE_ENTERPRISE _smartJoinAttribute( ::readStringValue(info, StaticStrings::SmartJoinAttribute, "")), #endif _physical(EngineSelectorFeature::ENGINE->createPhysicalCollection(*this, info)) { TRI_ASSERT(info.isObject()); if (!TRI_vocbase_t::IsAllowedName(info)) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_NAME); } if (_version < minimumVersion()) { // collection is too "old" std::string errorMsg(std::string("collection '") + name() + "' has a too old version. Please start the server " "with the --database.auto-upgrade option."); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_FAILED, errorMsg); } TRI_ASSERT(!guid().empty()); // update server's tick value TRI_UpdateTickServer(static_cast(id())); // add keyOptions from slice VPackSlice keyOpts = info.get("keyOptions"); _keyGenerator.reset(KeyGenerator::factory(keyOpts)); if (!keyOpts.isNone()) { _keyOptions = VPackBuilder::clone(keyOpts).steal(); } _sharding = std::make_unique(info, this); #ifdef USE_ENTERPRISE if (ServerState::instance()->isCoordinator() || ServerState::instance()->isDBServer()) { if (!info.get(StaticStrings::SmartJoinAttribute).isNone() && !hasSmartJoinAttribute()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INVALID_SMART_JOIN_ATTRIBUTE, "smartJoinAttribute must contain a string attribute name"); } if (hasSmartJoinAttribute()) { auto const& sk = _sharding->shardKeys(); TRI_ASSERT(!sk.empty()); if (sk.size() != 1) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INVALID_SMART_JOIN_ATTRIBUTE, "smartJoinAttribute can only be used for collections with a single " "shardKey value"); } TRI_ASSERT(!sk.front().empty()); if (sk.front().back() != ':') { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INVALID_SMART_JOIN_ATTRIBUTE, std::string("smartJoinAttribute can only be used for shardKeys ending on ':', got '") + sk.front() + "'"); } if (isSmart()) { if (_type == TRI_COL_TYPE_EDGE) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INVALID_SMART_JOIN_ATTRIBUTE, "cannot use smartJoinAttribute on a smart edge collection"); } else if (_type == TRI_COL_TYPE_DOCUMENT) { VPackSlice sga = info.get(StaticStrings::GraphSmartGraphAttribute); if (sga.isString() && sga.copyString() != info.get(StaticStrings::SmartJoinAttribute).copyString()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INVALID_SMART_JOIN_ATTRIBUTE, "smartJoinAttribute must be equal to smartGraphAttribute"); } } } } } #else // whatever we got passed in, in a non-enterprise build, we just ignore // any specification for the smartJoinAttribute _smartJoinAttribute.clear(); #endif if (ServerState::instance()->isDBServer() || !ServerState::instance()->isRunningInCluster()) { _followers.reset(new FollowerInfo(this)); } TRI_ASSERT(_physical != nullptr); // This has to be called AFTER _phyiscal and _logical are properly linked // together. prepareIndexes(info.get("indexes")); } /*static*/ LogicalDataSource::Category const& LogicalCollection::category() noexcept { static const Category category; return category; } LogicalCollection::~LogicalCollection() = default; // SECTION: sharding ShardingInfo* LogicalCollection::shardingInfo() const { TRI_ASSERT(_sharding != nullptr); return _sharding.get(); } size_t LogicalCollection::numberOfShards() const { TRI_ASSERT(_sharding != nullptr); return _sharding->numberOfShards(); } size_t LogicalCollection::replicationFactor() const { TRI_ASSERT(_sharding != nullptr); return _sharding->replicationFactor(); } size_t LogicalCollection::writeConcern() const { TRI_ASSERT(_sharding != nullptr); return _sharding->writeConcern(); } std::string LogicalCollection::distributeShardsLike() const { TRI_ASSERT(_sharding != nullptr); return _sharding->distributeShardsLike(); } void LogicalCollection::distributeShardsLike(std::string const& cid, ShardingInfo const* other) { TRI_ASSERT(_sharding != nullptr); _sharding->distributeShardsLike(cid, other); } std::vector const& LogicalCollection::avoidServers() const { TRI_ASSERT(_sharding != nullptr); return _sharding->avoidServers(); } bool LogicalCollection::isSatellite() const { TRI_ASSERT(_sharding != nullptr); return _sharding->isSatellite(); } bool LogicalCollection::usesDefaultShardKeys() const { TRI_ASSERT(_sharding != nullptr); return _sharding->usesDefaultShardKeys(); } std::vector const& LogicalCollection::shardKeys() const { TRI_ASSERT(_sharding != nullptr); return _sharding->shardKeys(); } std::shared_ptr LogicalCollection::shardIds() const { TRI_ASSERT(_sharding != nullptr); return _sharding->shardIds(); } void LogicalCollection::setShardMap(std::shared_ptr const& map) { TRI_ASSERT(_sharding != nullptr); _sharding->setShardMap(map); } int LogicalCollection::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, std::string& shardID) { bool usesDefaultShardKeys; return getResponsibleShard(slice, docComplete, shardID, usesDefaultShardKeys); } int LogicalCollection::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, std::string& shardID, bool& usesDefaultShardKeys, VPackStringRef const& key) { TRI_ASSERT(_sharding != nullptr); return _sharding->getResponsibleShard(slice, docComplete, shardID, usesDefaultShardKeys, key); } /// @briefs creates a new document key, the input slice is ignored here std::string LogicalCollection::createKey(VPackSlice) { return keyGenerator()->generate(); } void LogicalCollection::prepareIndexes(VPackSlice indexesSlice) { TRI_ASSERT(_physical != nullptr); if (!indexesSlice.isArray()) { // always point to an array indexesSlice = arangodb::velocypack::Slice::emptyArraySlice(); } _physical->prepareIndexes(indexesSlice); } std::unique_ptr LogicalCollection::getAllIterator(transaction::Methods* trx) { return _physical->getAllIterator(trx); } std::unique_ptr LogicalCollection::getAnyIterator(transaction::Methods* trx) { return _physical->getAnyIterator(trx); } // @brief Return the number of documents in this collection uint64_t LogicalCollection::numberDocuments(transaction::Methods* trx, transaction::CountType type) { // detailed results should have been handled in the levels above us TRI_ASSERT(type != transaction::CountType::Detailed); int64_t documents = transaction::CountCache::NotPopulated; if (type == transaction::CountType::ForceCache) { // always return from the cache, regardless what's in it documents = _countCache.get(); } else if (type == transaction::CountType::TryCache) { documents = _countCache.get(transaction::CountCache::Ttl); } if (documents == transaction::CountCache::NotPopulated) { documents = static_cast(getPhysical()->numberDocuments(trx)); _countCache.store(documents); } TRI_ASSERT(documents >= 0); return static_cast(documents); } uint32_t LogicalCollection::v8CacheVersion() const { return _v8CacheVersion; } TRI_col_type_e LogicalCollection::type() const { return _type; } TRI_vocbase_col_status_e LogicalCollection::status() const { return _status; } TRI_vocbase_col_status_e LogicalCollection::getStatusLocked() { READ_LOCKER(readLocker, _lock); return _status; } void LogicalCollection::executeWhileStatusWriteLocked(std::function const& callback) { WRITE_LOCKER_EVENTUAL(locker, _lock); callback(); } void LogicalCollection::executeWhileStatusLocked(std::function const& callback) { READ_LOCKER(locker, _lock); callback(); } bool LogicalCollection::tryExecuteWhileStatusLocked(std::function const& callback) { TRY_READ_LOCKER(readLocker, _lock); if (!readLocker.isLocked()) { return false; } callback(); return true; } TRI_vocbase_col_status_e LogicalCollection::tryFetchStatus(bool& didFetch) { TRY_READ_LOCKER(locker, _lock); if (locker.isLocked()) { didFetch = true; return _status; } didFetch = false; return TRI_VOC_COL_STATUS_CORRUPTED; } /// @brief returns a translation of a collection status std::string LogicalCollection::statusString() const { READ_LOCKER(readLocker, _lock); return ::translateStatus(_status); } // SECTION: Properties TRI_voc_rid_t LogicalCollection::revision(transaction::Methods* trx) const { // TODO CoordinatorCase TRI_ASSERT(!ServerState::instance()->isCoordinator()); return _physical->revision(trx); } std::unique_ptr const& LogicalCollection::followers() const { return _followers; } IndexEstMap LogicalCollection::clusterIndexEstimates(bool allowUpdating, TRI_voc_tid_t tid) { return getPhysical()->clusterIndexEstimates(allowUpdating, tid); } void LogicalCollection::setClusterIndexEstimates(IndexEstMap&& estimates) { getPhysical()->setClusterIndexEstimates(std::move(estimates)); } void LogicalCollection::flushClusterIndexEstimates() { getPhysical()->flushClusterIndexEstimates(); } std::vector> LogicalCollection::getIndexes() const { return getPhysical()->getIndexes(); } void LogicalCollection::getIndexesVPack( VPackBuilder& result, std::function::type&)> const& filter) const { getPhysical()->getIndexesVPack(result, filter); } bool LogicalCollection::allowUserKeys() const { return _allowUserKeys; } // SECTION: Modification Functions // asks the storage engine to rename the collection to the given name // and persist the renaming info. It is guaranteed by the server // that no other active collection with the same name and id exists in the same // database when this function is called. If this operation fails somewhere in // the middle, the storage engine is required to fully revert the rename // operation // and throw only then, so that subsequent collection creation/rename requests // will // not fail. the WAL entry for the rename will be written *after* the call // to "renameCollection" returns Result LogicalCollection::rename(std::string&& newName) { // Should only be called from inside vocbase. // Otherwise caching is destroyed. TRI_ASSERT(!ServerState::instance()->isCoordinator()); // NOT YET IMPLEMENTED if (!vocbase().server().hasFeature()) { return Result( TRI_ERROR_INTERNAL, "failed to find feature 'Database' while renaming collection"); } auto& databaseFeature = vocbase().server().getFeature(); // Check for illegal states. switch (_status) { case TRI_VOC_COL_STATUS_CORRUPTED: return TRI_ERROR_ARANGO_CORRUPTED_COLLECTION; case TRI_VOC_COL_STATUS_DELETED: return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; default: // Fall through intentional break; } switch (_status) { case TRI_VOC_COL_STATUS_UNLOADED: case TRI_VOC_COL_STATUS_LOADED: case TRI_VOC_COL_STATUS_UNLOADING: case TRI_VOC_COL_STATUS_LOADING: { break; } default: // Unknown status return TRI_ERROR_INTERNAL; } auto doSync = databaseFeature.forceSyncProperties(); std::string oldName = name(); // Okay we can finally rename safely try { StorageEngine& engine = vocbase().server().getFeature().engine(); name(std::move(newName)); engine.changeCollection(vocbase(), *this, doSync); } catch (basics::Exception const& ex) { // Engine Rename somehow failed. Reset to old name name(std::move(oldName)); return ex.code(); } catch (...) { // Engine Rename somehow failed. Reset to old name name(std::move(oldName)); return TRI_ERROR_INTERNAL; } // CHECK if this ordering is okay. Before change the version was increased // after swapping in vocbase mapping. increaseV8Version(); return TRI_ERROR_NO_ERROR; } int LogicalCollection::close() { // This was unload() in 3.0 return getPhysical()->close(); } void LogicalCollection::load() { _physical->load(); } void LogicalCollection::unload() { _physical->unload(); } arangodb::Result LogicalCollection::drop() { // make sure collection has been closed this->close(); TRI_ASSERT(!ServerState::instance()->isCoordinator()); StorageEngine* engine = EngineSelectorFeature::ENGINE; engine->destroyCollection(vocbase(), *this); deleted(true); _physical->drop(); return arangodb::Result(); } void LogicalCollection::setStatus(TRI_vocbase_col_status_e status) { _status = status; if (status == TRI_VOC_COL_STATUS_LOADED) { increaseV8Version(); } } void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result, bool useSystem, bool isReady, bool allInSync) const { if (system() && !useSystem) { return; } result.openObject(); result.add(VPackValue("parameters")); std::unordered_set ignoreKeys{ "allowUserKeys", "cid", "count", "statusString", "version", "distributeShardsLike", "objectId", "indexes"}; VPackBuilder params = toVelocyPackIgnore(ignoreKeys, Serialization::List); { VPackObjectBuilder guard(&result); for (auto const& p : VPackObjectIterator(params.slice())) { result.add(p.key); result.add(p.value); } if (!_sharding->distributeShardsLike().empty()) { CollectionNameResolver resolver(vocbase()); result.add("distributeShardsLike", VPackValue(resolver.getCollectionNameCluster(static_cast( basics::StringUtils::uint64(distributeShardsLike()))))); } } result.add(VPackValue("indexes")); getIndexesVPack(result, [](Index const* idx, uint8_t& flags) { // we have to exclude the primary and the edge index here, because otherwise // at least the MMFiles engine will try to create it // AND exclude hidden indexes switch (idx->type()) { case Index::TRI_IDX_TYPE_PRIMARY_INDEX: case Index::TRI_IDX_TYPE_EDGE_INDEX: return false; default: flags = Index::makeFlags(); return !idx->isHidden() && !idx->inProgress(); } }); result.add("planVersion", VPackValue(planVersion())); result.add("isReady", VPackValue(isReady)); result.add("allInSync", VPackValue(allInSync)); result.close(); // CollectionInfo } arangodb::Result LogicalCollection::appendVelocyPack(arangodb::velocypack::Builder& result, Serialization context) const { bool const forPersistence = (context == Serialization::Persistence || context == Serialization::PersistenceWithInProgress); bool const showInProgress = (context == Serialization::PersistenceWithInProgress); // We write into an open object TRI_ASSERT(result.isOpenObject()); // Collection Meta Information result.add("cid", VPackValue(std::to_string(id()))); result.add(StaticStrings::DataSourceType, VPackValue(static_cast(_type))); result.add("status", VPackValue(_status)); result.add("statusString", VPackValue(::translateStatus(_status))); result.add("version", VPackValue(static_cast(_version))); // Collection Flags result.add("waitForSync", VPackValue(_waitForSync)); if (!forPersistence) { // with 'forPersistence' added by LogicalDataSource::toVelocyPack // FIXME TODO is this needed in !forPersistence??? result.add(StaticStrings::DataSourceDeleted, VPackValue(deleted())); result.add(StaticStrings::DataSourceSystem, VPackValue(system())); } // TODO is this still releveant or redundant in keyGenerator? result.add("allowUserKeys", VPackValue(_allowUserKeys)); // keyoptions result.add("keyOptions", VPackValue(VPackValueType::Object)); if (_keyGenerator != nullptr) { _keyGenerator->toVelocyPack(result); } result.close(); // Physical Information getPhysical()->getPropertiesVPack(result); // Indexes result.add(VPackValue("indexes")); auto indexFlags = Index::makeFlags(); // hide hidden indexes. In effect hides unfinished indexes, // and iResearch links (only on a single-server and coordinator) if (forPersistence) { indexFlags = Index::makeFlags(Index::Serialize::Internals); } auto filter = [indexFlags, forPersistence, showInProgress](arangodb::Index const* idx, decltype(Index::makeFlags())& flags) { if ((forPersistence || !idx->isHidden()) && (showInProgress || !idx->inProgress())) { flags = indexFlags; return true; } return false; }; getIndexesVPack(result, filter); // Cluster Specific result.add(StaticStrings::IsSmart, VPackValue(isSmart())); if (hasSmartJoinAttribute()) { result.add(StaticStrings::SmartJoinAttribute, VPackValue(_smartJoinAttribute)); } if (!forPersistence) { // with 'forPersistence' added by LogicalDataSource::toVelocyPack // FIXME TODO is this needed in !forPersistence??? result.add(StaticStrings::DataSourcePlanId, VPackValue(std::to_string(planId()))); } _sharding->toVelocyPack(result, Serialization::List != context); includeVelocyPackEnterprise(result); TRI_ASSERT(result.isOpenObject()); // We leave the object open return arangodb::Result(); } void LogicalCollection::toVelocyPackIgnore(VPackBuilder& result, std::unordered_set const& ignoreKeys, Serialization context) const { TRI_ASSERT(result.isOpenObject()); VPackBuilder b = toVelocyPackIgnore(ignoreKeys, context); result.add(VPackObjectIterator(b.slice())); } VPackBuilder LogicalCollection::toVelocyPackIgnore(std::unordered_set const& ignoreKeys, Serialization context) const { VPackBuilder full; full.openObject(); properties(full, context); full.close(); if (ignoreKeys.empty()) { return full; } return VPackCollection::remove(full.slice(), ignoreKeys); } void LogicalCollection::includeVelocyPackEnterprise(VPackBuilder&) const { // We ain't no enterprise } void LogicalCollection::increaseV8Version() { ++_v8CacheVersion; } arangodb::Result LogicalCollection::properties(velocypack::Slice const& slice, bool partialUpdate) { // the following collection properties are intentionally not updated, // as updating them would be very complicated: // - _cid // - _name // - _type // - _isSystem // - _isVolatile // ... probably a few others missing here ... if (!vocbase().server().hasFeature()) { return Result( TRI_ERROR_INTERNAL, "failed to find feature 'Database' while updating collection"); } auto& databaseFeature = vocbase().server().getFeature(); if (!vocbase().server().hasFeature() || !vocbase().server().getFeature().selected()) { return Result(TRI_ERROR_INTERNAL, "failed to find a storage engine while updating collection"); } auto& engine = vocbase().server().getFeature().engine(); MUTEX_LOCKER(guard, _infoLock); // prevent simultaneous updates size_t rf = _sharding->replicationFactor(); size_t wc = _sharding->writeConcern(); VPackSlice rfSl = slice.get(StaticStrings::ReplicationFactor); VPackSlice wcSl = slice.get(StaticStrings::WriteConcern); if (wcSl.isNone()) { // deprecated in 3.6 wcSl = slice.get(StaticStrings::MinReplicationFactor); } if (!rfSl.isNone()) { if (rfSl.isInteger()) { int64_t rfTest = rfSl.getNumber(); if (rfTest < 0) { // negative value for replication factor... not good return Result(TRI_ERROR_BAD_PARAMETER, "bad value for replicationFactor"); } rf = rfSl.getNumber(); if ((!isSatellite() && rf == 0) || rf > 10) { return Result(TRI_ERROR_BAD_PARAMETER, "bad value for replicationFactor"); } if (ServerState::instance()->isCoordinator() && rf != _sharding->replicationFactor()) { // sanity checks if (!_sharding->distributeShardsLike().empty()) { return Result(TRI_ERROR_FORBIDDEN, "cannot change replicationFactor for a collection using 'distributeShardsLike'"); } else if (_type == TRI_COL_TYPE_EDGE && isSmart()) { return Result(TRI_ERROR_NOT_IMPLEMENTED, "changing replicationFactor is " "not supported for smart edge collections"); } else if (isSatellite()) { return Result(TRI_ERROR_FORBIDDEN, "cannot change replicationFactor of a satellite collection"); } } } else if (rfSl.isString()) { if (rfSl.compareString(StaticStrings::Satellite) != 0) { // only the string "satellite" is allowed here return Result(TRI_ERROR_BAD_PARAMETER, "bad value for satellite"); } // we got the string "satellite"... #ifdef USE_ENTERPRISE if (!isSatellite()) { // but the collection is not a satellite collection! return Result(TRI_ERROR_FORBIDDEN, "cannot change satellite collection status"); } #else return Result(TRI_ERROR_FORBIDDEN, "cannot use satellite collection status"); #endif // fallthrough here if we set the string "satellite" for a satellite // collection TRI_ASSERT(isSatellite() && _sharding->replicationFactor() == 0 && rf == 0); } else { return Result(TRI_ERROR_BAD_PARAMETER, "bad value for replicationFactor"); } } if (!wcSl.isNone()) { if (wcSl.isInteger()) { int64_t wcTest = wcSl.getNumber(); if (wcTest < 0) { // negative value for writeConcern... not good return Result(TRI_ERROR_BAD_PARAMETER, "bad value for writeConcern"); } wc = wcSl.getNumber(); if (wc > rf) { return Result(TRI_ERROR_BAD_PARAMETER, "bad value for writeConcern"); } if (ServerState::instance()->isCoordinator() && rf != _sharding->writeConcern()) { // sanity checks if (!_sharding->distributeShardsLike().empty()) { return Result(TRI_ERROR_FORBIDDEN, "Cannot change writeConcern, please change " + _sharding->distributeShardsLike()); } else if (_type == TRI_COL_TYPE_EDGE && isSmart()) { return Result(TRI_ERROR_NOT_IMPLEMENTED, "Changing writeConcern " "not supported for smart edge collections"); } else if (isSatellite()) { return Result(TRI_ERROR_FORBIDDEN, "Satellite collection, " "cannot change writeConcern"); } } } else { return Result(TRI_ERROR_BAD_PARAMETER, "bad value for writeConcern"); } TRI_ASSERT((wc <= rf && !isSatellite()) || (wc == 0 && isSatellite())); } auto doSync = !engine.inRecovery() && databaseFeature.forceSyncProperties(); // The physical may first reject illegal properties. // After this call it either has thrown or the properties are stored Result res = getPhysical()->updateProperties(slice, doSync); if (!res.ok()) { return res; } TRI_ASSERT(!isSatellite() || rf == 0); _waitForSync = Helper::getBooleanValue(slice, "waitForSync", _waitForSync); _sharding->setWriteConcernAndReplicationFactor(wc, rf); if (ServerState::instance()->isCoordinator()) { // We need to inform the cluster as well auto& ci = vocbase().server().getFeature().clusterInfo(); return ci.setCollectionPropertiesCoordinator(vocbase().name(), std::to_string(id()), this); } engine.changeCollection(vocbase(), *this, doSync); if (DatabaseFeature::DATABASE != nullptr && DatabaseFeature::DATABASE->versionTracker() != nullptr) { DatabaseFeature::DATABASE->versionTracker()->track("change collection"); } return {}; } /// @brief return the figures for a collection futures::Future LogicalCollection::figures() const { return getPhysical()->figures(); } /// @brief opens an existing collection void LogicalCollection::open(bool ignoreErrors) { getPhysical()->open(ignoreErrors); TRI_UpdateTickServer(id()); } /// SECTION Indexes std::shared_ptr LogicalCollection::lookupIndex(TRI_idx_iid_t idxId) const { return getPhysical()->lookupIndex(idxId); } std::shared_ptr LogicalCollection::lookupIndex(std::string const& idxName) const { return getPhysical()->lookupIndex(idxName); } std::shared_ptr LogicalCollection::lookupIndex(VPackSlice const& info) const { if (!info.isObject()) { // Compatibility with old v8-vocindex. THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } return getPhysical()->lookupIndex(info); } std::shared_ptr LogicalCollection::createIndex(VPackSlice const& info, bool& created) { auto idx = _physical->createIndex(info, /*restore*/ false, created); if (idx) { if (DatabaseFeature::DATABASE != nullptr && DatabaseFeature::DATABASE->versionTracker() != nullptr) { DatabaseFeature::DATABASE->versionTracker()->track("create index"); } } return idx; } /// @brief drops an index, including index file removal and replication bool LogicalCollection::dropIndex(TRI_idx_iid_t iid) { TRI_ASSERT(!ServerState::instance()->isCoordinator()); #if USE_PLAN_CACHE arangodb::aql::PlanCache::instance()->invalidate(_vocbase); #endif arangodb::aql::QueryCache::instance()->invalidate(&vocbase(), guid()); bool result = _physical->dropIndex(iid); if (result) { if (DatabaseFeature::DATABASE != nullptr && DatabaseFeature::DATABASE->versionTracker() != nullptr) { DatabaseFeature::DATABASE->versionTracker()->track("drop index"); } } return result; } /// @brief Persist the connected physical collection. /// This should be called AFTER the collection is successfully /// created and only on Single/DBServer void LogicalCollection::persistPhysicalCollection() { // Coordinators are not allowed to have local collections! TRI_ASSERT(!ServerState::instance()->isCoordinator()); StorageEngine* engine = EngineSelectorFeature::ENGINE; auto path = engine->createCollection(vocbase(), *this); getPhysical()->setPath(path); } /// @brief Defer a callback to be executed when the collection /// can be dropped. The callback is supposed to drop /// the collection and it is guaranteed that no one is using /// it at that moment. void LogicalCollection::deferDropCollection(std::function const& callback) { _physical->deferDropCollection(callback); } /// @brief reads an element from the document collection Result LogicalCollection::read(transaction::Methods* trx, arangodb::velocypack::StringRef const& key, ManagedDocumentResult& result, bool lock) { TRI_IF_FAILURE("LogicalCollection::read") { return Result(TRI_ERROR_DEBUG); } return getPhysical()->read(trx, key, result, lock); } Result LogicalCollection::read(transaction::Methods* trx, arangodb::velocypack::Slice const& key, ManagedDocumentResult& result, bool lock) { TRI_IF_FAILURE("LogicalCollection::read") { return Result(TRI_ERROR_DEBUG); } return getPhysical()->read(trx, key, result, lock); } //////////////////////////////////////////////////////////////////////////////// /// @brief processes a truncate operation (note: currently this only clears /// the read-cache //////////////////////////////////////////////////////////////////////////////// Result LogicalCollection::truncate(transaction::Methods& trx, OperationOptions& options) { TRI_IF_FAILURE("LogicalCollection::truncate") { return Result(TRI_ERROR_DEBUG); } return getPhysical()->truncate(trx, options); } /// @brief compact-data operation Result LogicalCollection::compact() { return getPhysical()->compact(); } //////////////////////////////////////////////////////////////////////////////// /// @brief inserts a document or edge into the collection //////////////////////////////////////////////////////////////////////////////// Result LogicalCollection::insert(transaction::Methods* trx, VPackSlice const slice, ManagedDocumentResult& result, OperationOptions& options, bool lock, KeyLockInfo* keyLockInfo, std::function const& cbDuringLock) { TRI_IF_FAILURE("LogicalCollection::insert") { return Result(TRI_ERROR_DEBUG); } return getPhysical()->insert(trx, slice, result, options, lock, keyLockInfo, cbDuringLock); } /// @brief updates a document or edge in a collection Result LogicalCollection::update(transaction::Methods* trx, VPackSlice const newSlice, ManagedDocumentResult& result, OperationOptions& options, bool lock, ManagedDocumentResult& previous) { TRI_IF_FAILURE("LogicalCollection::update") { return Result(TRI_ERROR_DEBUG); } if (!newSlice.isObject()) { return Result(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } return getPhysical()->update(trx, newSlice, result, options, lock, previous); } /// @brief replaces a document or edge in a collection Result LogicalCollection::replace(transaction::Methods* trx, VPackSlice const newSlice, ManagedDocumentResult& result, OperationOptions& options, bool lock, ManagedDocumentResult& previous) { TRI_IF_FAILURE("LogicalCollection::replace") { return Result(TRI_ERROR_DEBUG); } if (!newSlice.isObject()) { return Result(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } return getPhysical()->replace(trx, newSlice, result, options, lock, previous); } /// @brief removes a document or edge Result LogicalCollection::remove(transaction::Methods& trx, velocypack::Slice const slice, OperationOptions& options, bool lock, ManagedDocumentResult& previous, KeyLockInfo* keyLockInfo, std::function const& cbDuringLock) { TRI_IF_FAILURE("LogicalCollection::remove") { return Result(TRI_ERROR_DEBUG); } return getPhysical()->remove(trx, slice, previous, options, lock, keyLockInfo, cbDuringLock); } bool LogicalCollection::readDocument(transaction::Methods* trx, LocalDocumentId const& token, ManagedDocumentResult& result) const { return getPhysical()->readDocument(trx, token, result); } bool LogicalCollection::readDocumentWithCallback(transaction::Methods* trx, LocalDocumentId const& token, IndexIterator::DocumentCallback const& cb) const { return getPhysical()->readDocumentWithCallback(trx, token, cb); } /// @brief a method to skip certain documents in AQL write operations, /// this is only used in the enterprise edition for smart graphs #ifndef USE_ENTERPRISE bool LogicalCollection::skipForAqlWrite(arangodb::velocypack::Slice document, std::string const& key) const { return false; } #endif // SECTION: Key Options VPackSlice LogicalCollection::keyOptions() const { if (_keyOptions == nullptr) { return arangodb::velocypack::Slice::nullSlice(); } return VPackSlice(_keyOptions->data()); }