//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Michael Hackstein /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// #include "LogicalCollection.h" #include "Aql/QueryCache.h" #include "Basics/LocalTaskQueue.h" #include "Basics/ReadLocker.h" #include "Basics/StaticStrings.h" #include "Basics/StringUtils.h" #include "Basics/Timers.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Basics/encoding.h" #include "Basics/process-utils.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "Cluster/ServerState.h" #include "Indexes/Index.h" #include "RestServer/DatabaseFeature.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Helpers.h" #include "Utils/CollectionNameResolver.h" #include "Utils/OperationOptions.h" #include "Utils/SingleCollectionTransaction.h" #include "Utils/StandaloneTransactionContext.h" #include "VocBase/DatafileStatisticsContainer.h" #include "VocBase/KeyGenerator.h" #include "VocBase/ManagedDocumentResult.h" #include "VocBase/PhysicalCollection.h" #include "VocBase/ticks.h" #include #include #include using namespace arangodb; using Helper = arangodb::basics::VelocyPackHelper; namespace { static TRI_voc_cid_t ReadCid(VPackSlice info) { if (!info.isObject()) { // ERROR CASE return 0; } // Somehow the cid is now propagated to dbservers TRI_voc_cid_t cid = Helper::extractIdValue(info); if (cid == 0) { if (ServerState::instance()->isDBServer()) { cid = ClusterInfo::instance()->uniqid(1); } else if (ServerState::instance()->isCoordinator()) { cid = ClusterInfo::instance()->uniqid(1); } else { cid = TRI_NewTickServer(); } } return cid; } static TRI_voc_cid_t ReadPlanId(VPackSlice info, TRI_voc_cid_t cid) { if (!info.isObject()) { // ERROR CASE return 0; } VPackSlice id = info.get("planId"); if (id.isNone()) { return cid; } if (id.isString()) { // string cid, e.g. "9988488" return arangodb::basics::StringUtils::uint64(id.copyString()); } else if (id.isNumber()) { // numeric cid, e.g. 9988488 return id.getNumericValue(); } // TODO Throw error for invalid type? return cid; } static std::string const ReadStringValue(VPackSlice info, std::string const& name, std::string const& def) { if (!info.isObject()) { return def; } return Helper::getStringValue(info, name, def); } static std::shared_ptr const> CopySliceValue(VPackSlice info, std::string const& name) { if (!info.isObject()) { return nullptr; } info = info.get(name); if (info.isNone()) { return nullptr; } return VPackBuilder::clone(info).steal(); } } /// @brief This the "copy" constructor used in the cluster /// it is required to create objects that survive plan /// modifications and can be freed /// Can only be given to V8, cannot be used for functionality. LogicalCollection::LogicalCollection(LogicalCollection const& other) : _internalVersion(0), _cid(other.cid()), _planId(other.planId()), _type(other.type()), _name(other.name()), _distributeShardsLike(other.distributeShardsLike()), _avoidServers(other.avoidServers()), _isSmart(other.isSmart()), _status(other.status()), _isLocal(false), _isDeleted(other._isDeleted), _isSystem(other.isSystem()), _isVolatile(other.isVolatile()), _waitForSync(other.waitForSync()), _keyOptions(other._keyOptions), _version(other._version), _indexBuckets(other.indexBuckets()), _indexes(), _replicationFactor(other.replicationFactor()), _numberOfShards(other.numberOfShards()), _allowUserKeys(other.allowUserKeys()), _shardIds(new ShardMap()), // Not needed _vocbase(other.vocbase()), _cleanupIndexes(0), _persistentIndexes(0), _physical(other.getPhysical()->clone(this,other.getPhysical())), _keyGenerator() { _keyGenerator.reset(KeyGenerator::factory(other.keyOptions())); if (ServerState::instance()->isDBServer() || !ServerState::instance()->isRunningInCluster()) { _followers.reset(new FollowerInfo(this)); } // Copy over index definitions _indexes.reserve(other._indexes.size()); for (auto const& idx : other._indexes) { _indexes.emplace_back(idx); } } // @brief Constructor used in coordinator case. // The Slice contains the part of the plan that // is relevant for this collection. LogicalCollection::LogicalCollection(TRI_vocbase_t* vocbase, VPackSlice const& info, bool isPhysical) : _internalVersion(0), _cid(ReadCid(info)), _planId(ReadPlanId(info, _cid)), _type(Helper::readNumericValue(info, "type", TRI_COL_TYPE_UNKNOWN)), _name(ReadStringValue(info, "name", "")), _distributeShardsLike(ReadStringValue(info, "distributeShardsLike", "")), _isSmart(Helper::readBooleanValue(info, "isSmart", false)), _status(Helper::readNumericValue( info, "status", TRI_VOC_COL_STATUS_CORRUPTED)), _isLocal(!ServerState::instance()->isCoordinator()), _isDeleted(Helper::readBooleanValue(info, "deleted", false)), _isSystem(IsSystemName(_name) && Helper::readBooleanValue(info, "isSystem", false)), _isVolatile(Helper::readBooleanValue(info, "isVolatile", false)), _waitForSync(Helper::readBooleanValue(info, "waitForSync", false)), _keyOptions(CopySliceValue(info, "keyOptions")), _version(Helper::readNumericValue(info, "version", currentVersion())), _indexBuckets(Helper::readNumericValue( info, "indexBuckets", DatabaseFeature::defaultIndexBuckets())), _replicationFactor(1), _numberOfShards(Helper::readNumericValue(info, "numberOfShards", 1)), _allowUserKeys(Helper::readBooleanValue(info, "allowUserKeys", true)), _shardIds(new ShardMap()), _vocbase(vocbase), _cleanupIndexes(0), _persistentIndexes(0), _physical(EngineSelectorFeature::ENGINE->createPhysicalCollection(this,info)), _keyGenerator() { getPhysical()->setPath(ReadStringValue(info, "path", "")); if (!IsAllowedName(info)) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_NAME); } if (_version < minimumVersion()) { // collection is too "old" std::string errorMsg(std::string("collection '") + _name + "' has a too old version. Please start the server " "with the --database.auto-upgrade option."); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_FAILED, errorMsg); } if (_isVolatile && _waitForSync) { // Illegal collection configuration THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "volatile collections do not support the waitForSync option"); } if (getPhysical()->journalSize() < TRI_JOURNAL_MINIMAL_SIZE) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, ".journalSize too small"); } VPackSlice shardKeysSlice = info.get("shardKeys"); bool const isCluster = ServerState::instance()->isRunningInCluster(); // Cluster only tests if (ServerState::instance()->isCoordinator()) { if ((_numberOfShards == 0 && !_isSmart) || _numberOfShards > 1000) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "invalid number of shards"); } VPackSlice keyGenSlice = info.get("keyOptions"); if (keyGenSlice.isObject()) { keyGenSlice = keyGenSlice.get("type"); if (keyGenSlice.isString()) { StringRef tmp(keyGenSlice); if (!tmp.empty() && tmp != "traditional") { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNSUPPORTED, "non-traditional key generators are " "not supported for sharded " "collections"); } } } } auto replicationFactorSlice = info.get("replicationFactor"); if (!replicationFactorSlice.isNone()) { bool isError = true; if (replicationFactorSlice.isNumber()) { _replicationFactor = replicationFactorSlice.getNumber(); // mop: only allow satellite collections to be created explicitly if (_replicationFactor > 0 && _replicationFactor <= 10) { isError = false; #ifdef USE_ENTERPRISE } else if (_replicationFactor == 0) { isError = false; #endif } } #ifdef USE_ENTERPRISE else if (replicationFactorSlice.isString() && replicationFactorSlice.copyString() == "satellite") { _replicationFactor = 0; _numberOfShards = 1; _distributeShardsLike = ""; _avoidServers.clear(); isError = false; } #endif if (isError) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "invalid replicationFactor"); } } if (shardKeysSlice.isNone() || isSatellite()) { // Use default. _shardKeys.emplace_back(StaticStrings::KeyString); } else { if (shardKeysSlice.isArray()) { for (auto const& sk : VPackArrayIterator(shardKeysSlice)) { if (sk.isString()) { std::string key = sk.copyString(); // remove : char at the beginning or end (for enterprise) std::string stripped; if (!key.empty()) { if (key.front() == ':') { stripped = key.substr(1); } else if (key.back() == ':') { stripped = key.substr(0, key.size() - 1); } else { stripped = key; } } // system attributes are not allowed (except _key) if (!stripped.empty() && stripped != StaticStrings::IdString && stripped != StaticStrings::RevString) { _shardKeys.emplace_back(key); } } } if (_shardKeys.empty() && !isCluster) { // Compatibility. Old configs might store empty shard-keys locally. // This is translated to ["_key"]. In cluster-case this always was // forbidden. _shardKeys.emplace_back(StaticStrings::KeyString); } } } if (_shardKeys.empty() || _shardKeys.size() > 8) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "invalid number of shard keys"); } _keyGenerator.reset(KeyGenerator::factory(info.get("keyOptions"))); auto shardsSlice = info.get("shards"); if (shardsSlice.isObject()) { for (auto const& shardSlice : VPackObjectIterator(shardsSlice)) { if (shardSlice.key.isString() && shardSlice.value.isArray()) { ShardID shard = shardSlice.key.copyString(); std::vector servers; for (auto const& serverSlice : VPackArrayIterator(shardSlice.value)) { servers.push_back(serverSlice.copyString()); } _shardIds->emplace(shard, servers); } } } if (info.hasKey("avoidServers")) { auto avoidServersSlice = info.get("avoidServers"); if (avoidServersSlice.isArray()) { for (const auto& i : VPackArrayIterator(avoidServersSlice)) { if (i.isString()) { _avoidServers.push_back(i.copyString()); } else { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "avoidServers must be a vector of strings we got " << avoidServersSlice.toJson() << ". discarding!" ; _avoidServers.clear(); break; } } } } if (_indexes.empty()) { createInitialIndexes(); } auto indexesSlice = info.get("indexes"); if (indexesSlice.isArray()) { StorageEngine* engine = EngineSelectorFeature::ENGINE; IndexFactory const* idxFactory = engine->indexFactory(); TRI_ASSERT(idxFactory != nullptr); for (auto const& v : VPackArrayIterator(indexesSlice)) { if (arangodb::basics::VelocyPackHelper::getBooleanValue(v, "error", false)) { // We have an error here. // Do not add index. // TODO Handle Properly continue; } auto idx = idxFactory->prepareIndexFromSlice(v, false, this, true); // TODO Move IndexTypeCheck out if (idx->type() == Index::TRI_IDX_TYPE_PRIMARY_INDEX || idx->type() == Index::TRI_IDX_TYPE_EDGE_INDEX) { continue; } if (isCluster) { addIndexCoordinator(idx, false); } else { addIndex(idx); } } } #ifdef ARANGODB_ENABLE_MAINTAINER_MODE if (_indexes[0]->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "got invalid indexes for collection '" << _name << "'"; for (auto const& it : _indexes) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "- " << it.get(); } } #endif if (!ServerState::instance()->isCoordinator() && isPhysical) { // If we are not in the coordinator we need a path // to the physical data. StorageEngine* engine = EngineSelectorFeature::ENGINE; if (getPhysical()->path().empty()) { std::string path = engine->createCollection(_vocbase, _cid, this); getPhysical()->setPath(path); } } int64_t count = Helper::readNumericValue(info, "count", -1); if (count != -1) { _physical->updateCount(count); } if (ServerState::instance()->isDBServer() || !ServerState::instance()->isRunningInCluster()) { _followers.reset(new FollowerInfo(this)); } // update server's tick value TRI_UpdateTickServer(static_cast(_cid)); } LogicalCollection::~LogicalCollection() {} bool LogicalCollection::IsAllowedName(VPackSlice parameters) { bool allowSystem = Helper::readBooleanValue(parameters, "isSystem", false); std::string name = ReadStringValue(parameters, "name", ""); if (name.empty()) { return false; } bool ok; char const* ptr; size_t length = 0; // check allow characters: must start with letter or underscore if system is // allowed for (ptr = name.c_str(); *ptr; ++ptr) { if (length == 0) { if (allowSystem) { ok = (*ptr == '_') || ('a' <= *ptr && *ptr <= 'z') || ('A' <= *ptr && *ptr <= 'Z'); } else { ok = ('a' <= *ptr && *ptr <= 'z') || ('A' <= *ptr && *ptr <= 'Z'); } } else { ok = (*ptr == '_') || (*ptr == '-') || ('0' <= *ptr && *ptr <= '9') || ('a' <= *ptr && *ptr <= 'z') || ('A' <= *ptr && *ptr <= 'Z'); } if (!ok) { return false; } ++length; } // invalid name length if (length == 0 || length > TRI_COL_NAME_LENGTH) { return false; } return true; } /// @brief checks if a collection name is allowed /// Returns true if the name is allowed and false otherwise bool LogicalCollection::IsAllowedName(bool allowSystem, std::string const& name) { bool ok; char const* ptr; size_t length = 0; // check allow characters: must start with letter or underscore if system is // allowed for (ptr = name.c_str(); *ptr; ++ptr) { if (length == 0) { if (allowSystem) { ok = (*ptr == '_') || ('a' <= *ptr && *ptr <= 'z') || ('A' <= *ptr && *ptr <= 'Z'); } else { ok = ('a' <= *ptr && *ptr <= 'z') || ('A' <= *ptr && *ptr <= 'Z'); } } else { ok = (*ptr == '_') || (*ptr == '-') || ('0' <= *ptr && *ptr <= '9') || ('a' <= *ptr && *ptr <= 'z') || ('A' <= *ptr && *ptr <= 'Z'); } if (!ok) { return false; } ++length; } // invalid name length if (length == 0 || length > TRI_COL_NAME_LENGTH) { return false; } return true; } // @brief Return the number of documents in this collection uint64_t LogicalCollection::numberDocuments() const { return getPhysical()->numberDocuments(); } uint32_t LogicalCollection::internalVersion() const { return _internalVersion; } std::string LogicalCollection::cid_as_string() const { return basics::StringUtils::itoa(_cid); } TRI_voc_cid_t LogicalCollection::planId() const { return _planId; } TRI_col_type_e LogicalCollection::type() const { return _type; } std::string LogicalCollection::name() const { // TODO Activate this lock. Right now we have some locks outside. // READ_LOCKER(readLocker, _lock); return _name; } std::string const& LogicalCollection::distributeShardsLike() const { return _distributeShardsLike; } void LogicalCollection::distributeShardsLike(std::string const& cid) { _distributeShardsLike = cid; } std::vector const& LogicalCollection::avoidServers() const { return _avoidServers; } void LogicalCollection::avoidServers(std::vector const& a) { _avoidServers = a; } std::string LogicalCollection::dbName() const { TRI_ASSERT(_vocbase != nullptr); return _vocbase->name(); } TRI_vocbase_col_status_e LogicalCollection::status() const { return _status; } TRI_vocbase_col_status_e LogicalCollection::getStatusLocked() { READ_LOCKER(readLocker, _lock); return _status; } void LogicalCollection::executeWhileStatusLocked( std::function const& callback) { READ_LOCKER(readLocker, _lock); callback(); } bool LogicalCollection::tryExecuteWhileStatusLocked( std::function const& callback) { TRY_READ_LOCKER(readLocker, _lock); if (!readLocker.isLocked()) { return false; } callback(); return true; } TRI_vocbase_col_status_e LogicalCollection::tryFetchStatus(bool& didFetch) { TRY_READ_LOCKER(locker, _lock); if (locker.isLocked()) { didFetch = true; return _status; } didFetch = false; return TRI_VOC_COL_STATUS_CORRUPTED; } /// @brief returns a translation of a collection status std::string LogicalCollection::statusString() { READ_LOCKER(readLocker, _lock); switch (_status) { case TRI_VOC_COL_STATUS_UNLOADED: return "unloaded"; case TRI_VOC_COL_STATUS_LOADED: return "loaded"; case TRI_VOC_COL_STATUS_UNLOADING: return "unloading"; case TRI_VOC_COL_STATUS_DELETED: return "deleted"; case TRI_VOC_COL_STATUS_LOADING: return "loading"; case TRI_VOC_COL_STATUS_CORRUPTED: case TRI_VOC_COL_STATUS_NEW_BORN: default: return "unknown"; } } // SECTION: Properties TRI_voc_rid_t LogicalCollection::revision() const { // TODO CoordinatorCase return _physical->revision(); } bool LogicalCollection::isLocal() const { return _isLocal; } bool LogicalCollection::deleted() const { return _isDeleted; } bool LogicalCollection::isSystem() const { return _isSystem; } bool LogicalCollection::isVolatile() const { return _isVolatile; } bool LogicalCollection::waitForSync() const { return _waitForSync; } bool LogicalCollection::isSmart() const { return _isSmart; } std::unique_ptr const& LogicalCollection::followers() const { return _followers; } void LogicalCollection::setDeleted(bool newValue) { _isDeleted = newValue; } // SECTION: Key Options VPackSlice LogicalCollection::keyOptions() const { if (_keyOptions == nullptr) { return Helper::NullValue(); } return VPackSlice(_keyOptions->data()); } // SECTION: Indexes uint32_t LogicalCollection::indexBuckets() const { return _indexBuckets; } std::vector> const& LogicalCollection::getIndexes() const { return _indexes; } void LogicalCollection::getIndexesVPack(VPackBuilder& result, bool withFigures) const { result.openArray(); for (auto const& idx : _indexes) { result.openObject(); idx->toVelocyPack(result, withFigures); result.close(); } result.close(); } void LogicalCollection::getPropertiesVPack(VPackBuilder& result, bool translateCids) const { TRI_ASSERT(result.isOpenObject()); result.add("id", VPackValue(std::to_string(_cid))); result.add("name", VPackValue(_name)); result.add("type", VPackValue(static_cast(_type))); result.add("status", VPackValue(_status)); result.add("deleted", VPackValue(_isDeleted)); result.add("doCompact", VPackValue(getPhysical()->doCompact())); result.add("isSystem", VPackValue(_isSystem)); result.add("isVolatile", VPackValue(_isVolatile)); result.add("waitForSync", VPackValue(_waitForSync)); // maybe add journalsize in Pysical - problem we need ot create one object // we shold not merge one silice created by they physical with the one // created in this place or maybe just split info in logical and pysical part // that would probably result in bigger changes TODO FIXME result.add("journalSize", VPackValue(getPhysical()->journalSize())); result.add("indexBuckets", VPackValue(_indexBuckets)); result.add("replicationFactor", VPackValue(_replicationFactor)); if (!_distributeShardsLike.empty()) { if (translateCids) { CollectionNameResolver resolver(_vocbase); result.add("distributeShardsLike", VPackValue(resolver.getCollectionNameCluster( static_cast( basics::StringUtils::uint64(_distributeShardsLike))))); } else { result.add("distributeShardsLike", VPackValue(_distributeShardsLike)); } } if (_keyGenerator != nullptr) { result.add(VPackValue("keyOptions")); result.openObject(); _keyGenerator->toVelocyPack(result); result.close(); } result.add(VPackValue("shardKeys")); result.openArray(); for (auto const& key : _shardKeys) { result.add(VPackValue(key)); } result.close(); // shardKeys } // SECTION: Replication int LogicalCollection::replicationFactor() const { return static_cast(_replicationFactor); } // SECTION: Sharding int LogicalCollection::numberOfShards() const { return static_cast(_numberOfShards); } bool LogicalCollection::allowUserKeys() const { return _allowUserKeys; } #ifndef USE_ENTERPRISE bool LogicalCollection::usesDefaultShardKeys() const { return (_shardKeys.size() == 1 && _shardKeys[0] == StaticStrings::KeyString); } #endif std::vector const& LogicalCollection::shardKeys() const { return _shardKeys; } std::shared_ptr LogicalCollection::shardIds() const { // TODO make threadsafe update on the cache. return _shardIds; } void LogicalCollection::setShardMap(std::shared_ptr& map) { _shardIds = map; } // SECTION: Modification Functions // asks the storage engine to rename the collection to the given name // and persist the renaming info. It is guaranteed by the server // that no other active collection with the same name and id exists in the same // database when this function is called. If this operation fails somewhere in // the middle, the storage engine is required to fully revert the rename // operation // and throw only then, so that subsequent collection creation/rename requests // will // not fail. the WAL entry for the rename will be written *after* the call // to "renameCollection" returns int LogicalCollection::rename(std::string const& newName) { // Should only be called from inside vocbase. // Otherwise caching is destroyed. TRI_ASSERT(!ServerState::instance()->isCoordinator()); // NOT YET IMPLEMENTED WRITE_LOCKER_EVENTUAL(locker, _lock); // Check for illeagal states. switch (_status) { case TRI_VOC_COL_STATUS_CORRUPTED: return TRI_ERROR_ARANGO_CORRUPTED_COLLECTION; case TRI_VOC_COL_STATUS_DELETED: return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; default: // Fall through intentional break; } // Check for duplicate name auto other = _vocbase->lookupCollection(newName); if (other != nullptr) { return TRI_ERROR_ARANGO_DUPLICATE_NAME; } switch (_status) { case TRI_VOC_COL_STATUS_UNLOADED: case TRI_VOC_COL_STATUS_LOADED: case TRI_VOC_COL_STATUS_UNLOADING: case TRI_VOC_COL_STATUS_LOADING: { break; } default: // Unknown status return TRI_ERROR_INTERNAL; } std::string oldName = _name; _name = newName; // Okay we can finally rename safely try { StorageEngine* engine = EngineSelectorFeature::ENGINE; bool const doSync = application_features::ApplicationServer::getFeature( "Database") ->forceSyncProperties(); engine->changeCollection(_vocbase, _cid, this, doSync); } catch (basics::Exception const& ex) { // Engine Rename somehow failed. Reset to old name _name = oldName; return ex.code(); } catch (...) { // Engine Rename somehow failed. Reset to old name _name = oldName; return TRI_ERROR_INTERNAL; } // CHECK if this ordering is okay. Before change the version was increased // after swapping in vocbase mapping. increaseInternalVersion(); return TRI_ERROR_NO_ERROR; } int LogicalCollection::close() { // This was unload() in 3.0 return getPhysical()->close(); } void LogicalCollection::unload() { } void LogicalCollection::drop() { // make sure collection has been closed this->close(); TRI_ASSERT(!ServerState::instance()->isCoordinator()); StorageEngine* engine = EngineSelectorFeature::ENGINE; engine->dropCollection(_vocbase, this); _isDeleted = true; // save some memory by freeing the indexes _indexes.clear(); try { // close collection. this will also invalidate the revisions cache _physical->close(); } catch (...) { // don't throw from here... dropping should succeed } } void LogicalCollection::setStatus(TRI_vocbase_col_status_e status) { _status = status; if (status == TRI_VOC_COL_STATUS_LOADED) { increaseInternalVersion(); } } void LogicalCollection::toVelocyPackForAgency(VPackBuilder& result) { _status = TRI_VOC_COL_STATUS_LOADED; result.openObject(); toVelocyPackInObject(result, false); result.close(); // Base Object } void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result, bool useSystem) const { if (_isSystem && !useSystem) { return; } result.openObject(); result.add(VPackValue("parameters")); result.openObject(); toVelocyPackInObject(result, true); result.close(); result.add(VPackValue("indexes")); getIndexesVPack(result, false); result.close(); // CollectionInfo } void LogicalCollection::toVelocyPack(VPackBuilder& result, bool withPath) const { result.openObject(); toVelocyPackInObject(result, false); result.add( "cid", VPackValue(std::to_string(_cid))); // export cid for compatibility, too result.add("planId", VPackValue(std::to_string(_planId))); // export planId for cluster result.add("version", VPackValue(_version)); result.add("count", VPackValue(_physical->initialCount())); if (withPath) { result.add("path", VPackValue(getPhysical()->path())); } result.add("allowUserKeys", VPackValue(_allowUserKeys)); result.close(); } // Internal helper that inserts VPack info into an existing object and leaves // the object open void LogicalCollection::toVelocyPackInObject(VPackBuilder& result, bool translateCids) const { getPropertiesVPack(result, translateCids); result.add("numberOfShards", VPackValue(_numberOfShards)); if (!_avoidServers.empty()) { result.add(VPackValue("avoidServers")); VPackArrayBuilder b(&result); for (auto const& i : _avoidServers) { result.add(VPackValue(i)); } } result.add(VPackValue("shards")); result.openObject(); for (auto const& shards : *_shardIds) { result.add(VPackValue(shards.first)); result.openArray(); for (auto const& servers : shards.second) { result.add(VPackValue(servers)); } result.close(); // server array } result.close(); // shards result.add(VPackValue("indexes")); getIndexesVPack(result, false); } void LogicalCollection::toVelocyPack(VPackBuilder& builder, bool includeIndexes, TRI_voc_tick_t maxTick) { TRI_ASSERT(!builder.isClosed()); StorageEngine* engine = EngineSelectorFeature::ENGINE; engine->getCollectionInfo(_vocbase, _cid, builder, includeIndexes, maxTick); } void LogicalCollection::increaseInternalVersion() { ++_internalVersion; } int LogicalCollection::updateProperties(VPackSlice const& slice, bool doSync) { // the following collection properties are intentionally not updated as // updating // them would be very complicated: // - _cid // - _name // - _type // - _isSystem // - _isVolatile // ... probably a few others missing here ... WRITE_LOCKER(writeLocker, _infoLock); // some basic validation... if (isVolatile() && arangodb::basics::VelocyPackHelper::getBooleanValue( slice, "waitForSync", waitForSync())) { // the combination of waitForSync and isVolatile makes no sense THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "volatile collections do not support the waitForSync option"); } if (isVolatile() != arangodb::basics::VelocyPackHelper::getBooleanValue( slice, "isVolatile", isVolatile())) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "isVolatile option cannot be changed at runtime"); } uint32_t tmp = arangodb::basics::VelocyPackHelper::getNumericValue( slice, "indexBuckets", 2 /*Just for validation, this default Value passes*/); if (tmp == 0 || tmp > 1024) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "indexBuckets must be a two-power between 1 and 1024"); } // end of validation _waitForSync = Helper::getBooleanValue(slice, "waitForSync", _waitForSync); getPhysical()->updateProperties(slice,doSync); _indexBuckets = Helper::getNumericValue(slice, "indexBuckets", _indexBuckets); if (!_isLocal) { // We need to inform the cluster as well return ClusterInfo::instance()->setCollectionPropertiesCoordinator( _vocbase->name(), cid_as_string(), this); } int64_t count = arangodb::basics::VelocyPackHelper::getNumericValue( slice, "count", _physical->initialCount()); if (count != _physical->initialCount()) { _physical->updateCount(count); } StorageEngine* engine = EngineSelectorFeature::ENGINE; engine->changeCollection(_vocbase, _cid, this, doSync); return TRI_ERROR_NO_ERROR; } /// @brief return the figures for a collection std::shared_ptr LogicalCollection::figures() { auto builder = std::make_shared(); if (ServerState::instance()->isCoordinator()) { builder->openObject(); builder->close(); int res = figuresOnCoordinator(dbName(), cid_as_string(), builder); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } } else { builder->openObject(); // add index information size_t sizeIndexes = getPhysical()->memory(); size_t numIndexes = 0; for (auto const& idx : _indexes) { sizeIndexes += static_cast(idx->memory()); ++numIndexes; } builder->add("indexes", VPackValue(VPackValueType::Object)); builder->add("count", VPackValue(numIndexes)); builder->add("size", VPackValue(sizeIndexes)); builder->close(); // indexes // add engine-specific figures getPhysical()->figures(builder); builder->close(); } return builder; } /// @brief opens an existing collection void LogicalCollection::open(bool ignoreErrors) { getPhysical()->open(ignoreErrors); TRI_UpdateTickServer(_cid); } /// SECTION Indexes std::shared_ptr LogicalCollection::lookupIndex( TRI_idx_iid_t idxId) const { for (auto const& idx : _indexes) { if (idx->id() == idxId) { return idx; } } return nullptr; } std::shared_ptr LogicalCollection::lookupIndex( VPackSlice const& info) const { if (!info.isObject()) { // Compatibility with old v8-vocindex. THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } // extract type VPackSlice value = info.get("type"); if (!value.isString()) { // Compatibility with old v8-vocindex. THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } std::string tmp = value.copyString(); arangodb::Index::IndexType const type = arangodb::Index::type(tmp.c_str()); for (auto const& idx : _indexes) { if (idx->type() == type) { // Only check relevant indices if (idx->matchesDefinition(info)) { // We found an index for this definition. return idx; } } } return nullptr; } std::shared_ptr LogicalCollection::createIndex(transaction::Methods* trx, VPackSlice const& info, bool& created) { // TODO Get LOCK for the vocbase auto idx = lookupIndex(info); if (idx != nullptr) { created = false; // We already have this index. // Should we throw instead? return idx; } StorageEngine* engine = EngineSelectorFeature::ENGINE; IndexFactory const* idxFactory = engine->indexFactory(); TRI_ASSERT(idxFactory != nullptr); // We are sure that we do not have an index of this type. // We also hold the lock. // Create it idx = idxFactory->prepareIndexFromSlice(info, true, this, false); TRI_ASSERT(idx != nullptr); if (ServerState::instance()->isCoordinator()) { // In the coordinator case we do not fill the index // We only inform the others. addIndexCoordinator(idx, true); created = true; return idx; } int res = getPhysical()->saveIndex(trx, idx); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } // Until here no harm is done if sth fails. The shared ptr will clean up. if // left before addIndex(idx); { VPackBuilder builder; bool const doSync = application_features::ApplicationServer::getFeature( "Database") ->forceSyncProperties(); toVelocyPack(builder, false); updateProperties(builder.slice(), doSync); } created = true; return idx; } /// @brief removes an index by id bool LogicalCollection::removeIndex(TRI_idx_iid_t iid) { size_t const n = _indexes.size(); for (size_t i = 0; i < n; ++i) { auto idx = _indexes[i]; if (!idx->canBeDropped()) { continue; } if (idx->id() == iid) { // found! idx->drop(); _indexes.erase(_indexes.begin() + i); // update statistics if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) { --_cleanupIndexes; } if (idx->isPersistent()) { --_persistentIndexes; } return true; } } // not found return false; } /// @brief drops an index, including index file removal and replication bool LogicalCollection::dropIndex(TRI_idx_iid_t iid, bool writeMarker) { TRI_ASSERT(!ServerState::instance()->isCoordinator()); return _physical->dropIndex(iid, writeMarker); } /// @brief creates the initial indexes for the collection void LogicalCollection::createInitialIndexes() { // TODO Properly fix this. The outside should make sure that only NEW // collections // try to create the indexes. if (!_indexes.empty()) { return; } std::vector> systemIndexes; StorageEngine* engine = EngineSelectorFeature::ENGINE; IndexFactory const* idxFactory = engine->indexFactory(); TRI_ASSERT(idxFactory != nullptr); idxFactory->fillSystemIndexes(this, systemIndexes); for (auto const& it : systemIndexes) { addIndex(it); } } std::vector> const* LogicalCollection::indexList() const { return &_indexes; } void LogicalCollection::addIndex(std::shared_ptr idx) { // primary index must be added at position 0 TRI_ASSERT(idx->type() != arangodb::Index::TRI_IDX_TYPE_PRIMARY_INDEX || _indexes.empty()); auto const id = idx->id(); for (auto const& it : _indexes) { if (it->id() == id) { // already have this particular index. do not add it again return; } } TRI_UpdateTickServer(static_cast(id)); _indexes.emplace_back(idx); // update statistics if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) { ++_cleanupIndexes; } if (idx->isPersistent()) { ++_persistentIndexes; } } void LogicalCollection::addIndexCoordinator( std::shared_ptr idx, bool distribute) { auto const id = idx->id(); for (auto const& it : _indexes) { if (it->id() == id) { // already have this particular index. do not add it again return; } } _indexes.emplace_back(idx); if (distribute) { THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } } /// @brief reads an element from the document collection int LogicalCollection::read(transaction::Methods* trx, std::string const& key, ManagedDocumentResult& result, bool lock) { return read(trx, StringRef(key.c_str(), key.size()), result, lock); } int LogicalCollection::read(transaction::Methods* trx, StringRef const& key, ManagedDocumentResult& result, bool lock) { transaction::BuilderLeaser builder(trx); builder->add(VPackValuePair(key.data(), key.size(), VPackValueType::String)); return getPhysical()->read(trx, builder->slice(), result, lock); } //////////////////////////////////////////////////////////////////////////////// /// @brief processes a truncate operation (note: currently this only clears /// the read-cache //////////////////////////////////////////////////////////////////////////////// void LogicalCollection::truncate(transaction::Methods* trx, OperationOptions& options) { getPhysical()->truncate(trx, options); } //////////////////////////////////////////////////////////////////////////////// /// @brief inserts a document or edge into the collection //////////////////////////////////////////////////////////////////////////////// int LogicalCollection::insert(transaction::Methods* trx, VPackSlice const slice, ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock) { resultMarkerTick = 0; return getPhysical()->insert(trx, slice, result, options, resultMarkerTick, lock); } /// @brief updates a document or edge in a collection int LogicalCollection::update(transaction::Methods* trx, VPackSlice const newSlice, ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous) { resultMarkerTick = 0; if (!newSlice.isObject()) { return TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID; } prevRev = 0; TRI_voc_rid_t revisionId = 0; if (options.isRestore) { VPackSlice oldRev = TRI_ExtractRevisionIdAsSlice(newSlice); if (!oldRev.isString()) { return TRI_ERROR_ARANGO_DOCUMENT_REV_BAD; } bool isOld; VPackValueLength l; char const* p = oldRev.getString(l); revisionId = TRI_StringToRid(p, l, isOld, false); if (isOld) { // Do not tolerate old revision IDs revisionId = TRI_HybridLogicalClock(); } } else { revisionId = TRI_HybridLogicalClock(); } VPackSlice key = newSlice.get(StaticStrings::KeyString); if (key.isNone()) { return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD; } return getPhysical()->update(trx, newSlice, result, options, resultMarkerTick, lock, prevRev, previous, revisionId, key); } /// @brief replaces a document or edge in a collection int LogicalCollection::replace( transaction::Methods* trx, VPackSlice const newSlice, ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous) { resultMarkerTick = 0; if (!newSlice.isObject()) { return TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID; } prevRev = 0; VPackSlice fromSlice; VPackSlice toSlice; if (type() == TRI_COL_TYPE_EDGE) { fromSlice = newSlice.get(StaticStrings::FromString); if (!fromSlice.isString()) { return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE; } toSlice = newSlice.get(StaticStrings::ToString); if (!toSlice.isString()) { return TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE; } } TRI_voc_rid_t revisionId = 0; if (options.isRestore) { VPackSlice oldRev = TRI_ExtractRevisionIdAsSlice(newSlice); if (!oldRev.isString()) { return TRI_ERROR_ARANGO_DOCUMENT_REV_BAD; } bool isOld; VPackValueLength l; char const* p = oldRev.getString(l); revisionId = TRI_StringToRid(p, l, isOld, false); if (isOld || revisionId == UINT64_MAX) { // Do not tolerate old revision ticks or invalid ones: revisionId = TRI_HybridLogicalClock(); } } else { revisionId = TRI_HybridLogicalClock(); } return getPhysical()->replace(trx, newSlice, result, options, resultMarkerTick, lock, prevRev, previous, revisionId, fromSlice, toSlice); } /// @brief removes a document or edge int LogicalCollection::remove(transaction::Methods* trx, VPackSlice const slice, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous) { resultMarkerTick = 0; // create remove marker TRI_voc_rid_t revisionId = 0; if (options.isRestore) { VPackSlice oldRev = TRI_ExtractRevisionIdAsSlice(slice); if (!oldRev.isString()) { revisionId = TRI_HybridLogicalClock(); } else { bool isOld; VPackValueLength l; char const* p = oldRev.getString(l); revisionId = TRI_StringToRid(p, l, isOld, false); if (isOld || revisionId == UINT64_MAX) { // Do not tolerate old revisions or illegal ones revisionId = TRI_HybridLogicalClock(); } } } else { revisionId = TRI_HybridLogicalClock(); } return getPhysical()->remove(trx, slice, previous, options, resultMarkerTick, lock, revisionId, prevRev); } void LogicalCollection::sizeHint(transaction::Methods* trx, int64_t hint) { if (hint <= 0) { return; } getPhysical()->sizeHint(trx, hint); } bool LogicalCollection::readDocument(transaction::Methods* trx, DocumentIdentifierToken const& token, ManagedDocumentResult& result) { return getPhysical()->readDocument(trx, token, result); } bool LogicalCollection::readDocumentConditional( transaction::Methods* trx, DocumentIdentifierToken const& token, TRI_voc_tick_t maxTick, ManagedDocumentResult& result) { return getPhysical()->readDocumentConditional(trx, token, maxTick, result); } /// @brief a method to skip certain documents in AQL write operations, /// this is only used in the enterprise edition for smart graphs #ifndef USE_ENTERPRISE bool LogicalCollection::skipForAqlWrite(arangodb::velocypack::Slice document, std::string const& key) const { return false; } #endif bool LogicalCollection::isSatellite() const { return _replicationFactor == 0; }