1
0
Fork 0
arangodb/arangod/VocBase/Methods/Collections.cpp

971 lines
37 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "Collections.h"
#include "Basics/Common.h"
#include "Aql/Query.h"
#include "Aql/QueryRegistry.h"
#include "Basics/fasthash.h"
#include "Basics/LocalTaskQueue.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringBuffer.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Sharding/ShardingFeature.h"
#include "Sharding/ShardingInfo.h"
#include "StorageEngine/PhysicalCollection.h"
#include "Transaction/Helpers.h"
#include "Transaction/V8Context.h"
#include "Utils/Events.h"
#include "Utils/ExecContext.h"
#include "Utils/OperationCursor.h"
#include "Utils/SingleCollectionTransaction.h"
#include "V8/JavaScriptSecurityContext.h"
#include "V8/v8-conv.h"
#include "V8/v8-utils.h"
#include "V8Server/V8Context.h"
#include "V8Server/V8DealerFeature.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/Methods/CollectionCreationInfo.h"
#include "VocBase/vocbase.h"
#include <velocypack/Builder.h>
#include <velocypack/Collection.h>
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::methods;
Collections::Context::Context(TRI_vocbase_t& vocbase, LogicalCollection& coll)
: _vocbase(vocbase), _coll(coll), _trx(nullptr), _responsibleForTrx(true) {}
Collections::Context::Context(TRI_vocbase_t& vocbase, LogicalCollection& coll,
transaction::Methods* trx)
: _vocbase(vocbase), _coll(coll), _trx(trx), _responsibleForTrx(false) {
TRI_ASSERT(_trx != nullptr);
}
Collections::Context::~Context() {
if (_responsibleForTrx && _trx != nullptr) {
delete _trx;
}
}
transaction::Methods* Collections::Context::trx(AccessMode::Type const& type, bool embeddable,
bool forceLoadCollection) {
if (_responsibleForTrx && _trx == nullptr) {
auto ctx = transaction::V8Context::CreateWhenRequired(_vocbase, embeddable);
auto trx = std::make_unique<SingleCollectionTransaction>(ctx, _coll, type);
if (!trx) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_OUT_OF_MEMORY,
"Cannot create Transaction");
}
if (!forceLoadCollection) {
// we actually need this hint here, so that the collection is not
// loaded if it has status unloaded.
trx->addHint(transaction::Hints::Hint::NO_USAGE_LOCK);
}
Result res = trx->begin();
if (res.fail()) {
THROW_ARANGO_EXCEPTION(res);
}
_trx = trx.release();
}
// ADD asserts for running state and locking issues!
return _trx;
}
TRI_vocbase_t& Collections::Context::vocbase() const { return _vocbase; }
LogicalCollection* Collections::Context::coll() const { return &_coll; }
void Collections::enumerate(TRI_vocbase_t* vocbase,
std::function<void(std::shared_ptr<LogicalCollection> const&)> const& func) {
if (ServerState::instance()->isCoordinator()) {
auto& ci = vocbase->server().getFeature<ClusterFeature>().clusterInfo();
std::vector<std::shared_ptr<LogicalCollection>> colls =
ci.getCollections(vocbase->name());
for (std::shared_ptr<LogicalCollection> const& c : colls) {
if (!c->deleted()) {
func(c);
}
}
} else {
for (auto& c : vocbase->collections(false)) {
if (!c->deleted()) {
func(c);
}
}
}
}
/*static*/ arangodb::Result methods::Collections::lookup( // find collection
TRI_vocbase_t const& vocbase, // vocbase to search
std::string const& name, // collection name
FuncCallback func // invoke on found collection
) {
if (name.empty()) {
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
if (ServerState::instance()->isCoordinator()) {
try {
if (!vocbase.server().hasFeature<ClusterFeature>()) {
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
"failure to find 'ClusterInfo' instance while searching for "
"collection" // message
);
}
auto& ci = vocbase.server().getFeature<ClusterFeature>().clusterInfo();
auto coll = ci.getCollectionNT(vocbase.name(), name);
if (coll) {
// check authentication after ensuring the collection exists
if (!ExecContext::current().canUseCollection(vocbase.name(), coll->name(), auth::Level::RO)) {
return Result(TRI_ERROR_FORBIDDEN,
"No access to collection '" + name + "'");
}
func(coll);
return Result();
} else {
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
} catch (basics::Exception const& ex) {
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
return Result(TRI_ERROR_INTERNAL);
}
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
auto coll = vocbase.lookupCollection(name);
if (coll != nullptr) {
// check authentication after ensuring the collection exists
if (!ExecContext::current().canUseCollection(vocbase.name(), coll->name(), auth::Level::RO)) {
return Result(TRI_ERROR_FORBIDDEN,
"No access to collection '" + name + "'");
}
try {
func(coll);
} catch (basics::Exception const& ex) {
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
return Result(TRI_ERROR_INTERNAL);
}
return Result();
}
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
/*static*/ arangodb::Result Collections::create( // create collection
TRI_vocbase_t& vocbase, // collection vocbase
std::string const& name, // collection name
TRI_col_type_e collectionType, // collection type
arangodb::velocypack::Slice const& properties, // collection properties
bool createWaitsForSyncReplication, // replication wait flag
bool enforceReplicationFactor, // replication factor flag
bool isNewDatabase,
FuncCallback func) { // invoke on collection creation
if (name.empty()) {
events::CreateCollection(vocbase.name(), name, TRI_ERROR_ARANGO_ILLEGAL_NAME);
return TRI_ERROR_ARANGO_ILLEGAL_NAME;
} else if (collectionType != TRI_col_type_e::TRI_COL_TYPE_DOCUMENT &&
collectionType != TRI_col_type_e::TRI_COL_TYPE_EDGE) {
events::CreateCollection(vocbase.name(), name, TRI_ERROR_ARANGO_COLLECTION_TYPE_INVALID);
return TRI_ERROR_ARANGO_COLLECTION_TYPE_INVALID;
}
std::vector<CollectionCreationInfo> infos{{name, collectionType, properties}};
return create(vocbase, infos, createWaitsForSyncReplication,
enforceReplicationFactor, isNewDatabase, nullptr,
[&func](std::vector<std::shared_ptr<LogicalCollection>> const& cols) {
TRI_ASSERT(cols.size() == 1);
func(cols[0]);
});
}
Result Collections::create(TRI_vocbase_t& vocbase,
std::vector<CollectionCreationInfo> const& infos,
bool createWaitsForSyncReplication,
bool enforceReplicationFactor, bool isNewDatabase,
std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike,
MultiFuncCallback const& func) {
ExecContext const& exec = ExecContext::current();
if (!exec.canUseDatabase(vocbase.name(), auth::Level::RW)) {
for (auto const& info : infos) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_FORBIDDEN);
}
return arangodb::Result( // result
TRI_ERROR_FORBIDDEN, // code
"cannot create collection in " + vocbase.name() // message
);
}
TRI_ASSERT(!vocbase.isDangling());
bool haveShardingFeature = ServerState::instance()->isCoordinator() &&
vocbase.server().hasFeature<ShardingFeature>();
VPackBuilder builder;
VPackBuilder helper;
builder.openArray();
for (auto const& info : infos) {
TRI_ASSERT(builder.isOpenArray());
if (ServerState::instance()->isCoordinator()) {
Result res = ShardingInfo::validateShardsAndReplicationFactor(info.properties, vocbase.server());
if (res.fail()) {
return res;
}
}
if (info.name.empty()) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_ARANGO_ILLEGAL_NAME);
return TRI_ERROR_ARANGO_ILLEGAL_NAME;
}
if (info.collectionType != TRI_col_type_e::TRI_COL_TYPE_DOCUMENT &&
info.collectionType != TRI_col_type_e::TRI_COL_TYPE_EDGE) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_ARANGO_COLLECTION_TYPE_INVALID);
return TRI_ERROR_ARANGO_COLLECTION_TYPE_INVALID;
}
TRI_ASSERT(info.properties.isObject());
helper.clear();
helper.openObject();
helper.add(arangodb::StaticStrings::DataSourceType,
arangodb::velocypack::Value(static_cast<int>(info.collectionType)));
helper.add(arangodb::StaticStrings::DataSourceName,
arangodb::velocypack::Value(info.name));
if (ServerState::instance()->isCoordinator()) {
auto replicationFactorSlice = info.properties.get(StaticStrings::ReplicationFactor);
if (replicationFactorSlice.isNone()) {
auto factor = vocbase.replicationFactor();
if (factor > 0 && vocbase.IsSystemName(info.name)) {
auto& cl = vocbase.server().getFeature<ClusterFeature>();
factor = std::max(vocbase.replicationFactor(), cl.systemReplicationFactor());
}
helper.add(StaticStrings::ReplicationFactor, VPackValue(factor));
}
if (!vocbase.IsSystemName(info.name)) {
uint64_t numberOfShards = arangodb::basics::VelocyPackHelper::readNumericValue<uint64_t>(info.properties, StaticStrings::NumberOfShards, 0);
// system-collections will be sharded normally. only user collections will get
// the forced sharding
if (vocbase.server().getFeature<ClusterFeature>().forceOneShard()) {
// force one shard, and force distributeShardsLike to be "_graphs"
helper.add(StaticStrings::NumberOfShards, VPackValue(1));
helper.add(StaticStrings::DistributeShardsLike, VPackValue(vocbase.shardingPrototypeName()));
} else if (vocbase.sharding() == "single" && numberOfShards <= 1) {
auto distributeSlice = info.properties.get(StaticStrings::DistributeShardsLike);
if (distributeSlice.isNone()) {
helper.add(StaticStrings::DistributeShardsLike, VPackValue(vocbase.shardingPrototypeName()));
} else if (distributeSlice.isString() && distributeSlice.compareString("") == 0) {
helper.add(StaticStrings::DistributeShardsLike, VPackSlice::nullSlice()); //delete empty string from info slice
}
}
}
// not an error: for historical reasons the write concern is read from the
// variable "minReplicationFactor"
auto writeConcernSlice = info.properties.get(StaticStrings::MinReplicationFactor);
if (writeConcernSlice.isNone()) {
helper.add(StaticStrings::MinReplicationFactor, VPackValue(vocbase.writeConcern()));
}
} else { // single server
helper.add(StaticStrings::DistributeShardsLike, VPackSlice::nullSlice()); //delete empty string from info slice
helper.add(StaticStrings::ReplicationFactor, VPackSlice::nullSlice());
helper.add(StaticStrings::MinReplicationFactor, VPackSlice::nullSlice());
}
helper.close();
VPackBuilder merged =
VPackCollection::merge(info.properties, helper.slice(), false, true);
if (haveShardingFeature && !info.properties.get("shardingStrategy").isString()) {
// NOTE: We need to do this in a second merge as the geature call requires to have the
// DataSourceType set in the JSON, which has just been done by the call above.
helper.clear();
helper.openObject();
TRI_ASSERT(ServerState::instance()->isCoordinator());
helper.add("shardingStrategy",
VPackValue(vocbase.server().getFeature<ShardingFeature>().getDefaultShardingStrategyForNewCollection(
merged.slice())));
helper.close();
merged = VPackCollection::merge(merged.slice(), helper.slice(), false, true);
}
builder.add(merged.slice());
}
TRI_ASSERT(builder.isOpenArray());
builder.close();
VPackSlice const infoSlice = builder.slice();
std::vector<std::shared_ptr<LogicalCollection>> collections;
TRI_ASSERT(infoSlice.isArray());
TRI_ASSERT(infoSlice.length() >= 1);
TRI_ASSERT(infoSlice.length() == infos.size());
collections.reserve(infoSlice.length());
try {
if (ServerState::instance()->isCoordinator()) {
collections =
ClusterMethods::createCollectionOnCoordinator(vocbase, infoSlice, false,
createWaitsForSyncReplication,
enforceReplicationFactor,
isNewDatabase, colToDistributeShardsLike);
if (collections.empty()) {
for (auto const& info : infos) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_INTERNAL);
}
return Result(TRI_ERROR_INTERNAL, "createCollectionsOnCoordinator");
}
} else {
for (auto slice : VPackArrayIterator(infoSlice)) {
// Single server does not yet have a multi collection implementation
auto col = vocbase.createCollection(slice);
TRI_ASSERT(col != nullptr);
collections.emplace_back(col);
}
}
} catch (basics::Exception const& ex) {
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
return Result(TRI_ERROR_INTERNAL, "cannot create collection");
}
// Grant access to the collections.
// This is identical on cluster and SingleServer
try {
// in case of success we grant the creating user RW access
auth::UserManager* um = AuthenticationFeature::instance()->userManager();
if (um != nullptr && !exec.isSuperuser()) {
// this should not fail, we can not get here without database RW access
// however, there may be races for updating the users account, so we try
// a few times in case of a conflict
int tries = 0;
while (true) {
Result r = um->updateUser(exec.user(), [&](auth::User& entry) {
for (auto const& col : collections) {
// do not grant rights on system collections
if (!col->system()) {
entry.grantCollection(vocbase.name(), col->name(), auth::Level::RW);
events::CreateCollection(vocbase.name(), col->name(), TRI_ERROR_NO_ERROR);
}
}
return TRI_ERROR_NO_ERROR;
});
if (r.ok() || r.is(TRI_ERROR_USER_NOT_FOUND) || r.is(TRI_ERROR_USER_EXTERNAL)) {
// it seems to be allowed to created collections with an unknown user
break;
}
if (!r.is(TRI_ERROR_ARANGO_CONFLICT) || ++tries == 10) {
LOG_TOPIC("116bb", WARN, Logger::FIXME)
<< "Updating user failed with error: " << r.errorMessage() << ". giving up!";
for (auto const& col : collections) {
events::CreateCollection(vocbase.name(), col->name(), r.errorNumber());
}
return r;
}
// try again in case of conflict
LOG_TOPIC("ff123", TRACE, Logger::FIXME)
<< "Updating user failed with error: " << r.errorMessage() << ". trying again";
}
}
func(collections);
} catch (basics::Exception const& ex) {
for (auto const& info : infos) {
events::CreateCollection(vocbase.name(), info.name, ex.code());
}
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
for (auto const& info : infos) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_INTERNAL);
}
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
for (auto const& info : infos) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_INTERNAL);
}
return Result(TRI_ERROR_INTERNAL, "cannot create collection");
}
for (auto const& info : infos) {
events::CreateCollection(vocbase.name(), info.name, TRI_ERROR_NO_ERROR);
}
return TRI_ERROR_NO_ERROR;
}
void Collections::createSystemCollectionProperties(std::string const& collectionName,
VPackBuilder& bb, TRI_vocbase_t const& vocbase) {
uint32_t defaultReplicationFactor = vocbase.replicationFactor();
uint32_t defaultWriteConcern = vocbase.writeConcern();
auto& server = application_features::ApplicationServer::server();
if (server.hasFeature<ClusterFeature>()) {
defaultReplicationFactor = std::max(defaultReplicationFactor, server.getFeature<ClusterFeature>().systemReplicationFactor());
}
{
VPackObjectBuilder scope(&bb);
bb.add(StaticStrings::DataSourceSystem, VPackSlice::trueSlice());
bb.add(StaticStrings::WaitForSyncString, VPackSlice::falseSlice());
bb.add(StaticStrings::JournalSize, VPackValue(1024 * 1024));
bb.add(StaticStrings::ReplicationFactor, VPackValue(defaultReplicationFactor));
bb.add(StaticStrings::MinReplicationFactor, VPackValue(defaultWriteConcern));
// that forces all collections to be on the same physical DBserver
if (vocbase.isSystem()) {
if (collectionName != StaticStrings::UsersCollection) {
bb.add(StaticStrings::DistributeShardsLike, VPackValue(StaticStrings::UsersCollection));
}
} else {
if (collectionName != StaticStrings::GraphsCollection) {
bb.add(StaticStrings::DistributeShardsLike, VPackValue(StaticStrings::GraphsCollection));
}
}
}
}
/*static*/ std::pair<Result, std::shared_ptr<LogicalCollection>> Collections::createSystem(
TRI_vocbase_t& vocbase, std::string const& name, bool isNewDatabase) {
std::shared_ptr<LogicalCollection> createdCollection;
FuncCallback const returnColPtr =
[&createdCollection](std::shared_ptr<LogicalCollection> const& col) -> void {
TRI_ASSERT(col!=nullptr);
createdCollection = col;
};
Result res = methods::Collections::lookup(vocbase, name, returnColPtr);
if (res.ok()) {
// Collection lookup worked and we have a pointer to the collection
TRI_ASSERT(createdCollection!=nullptr);
return {res, createdCollection};
} else if (res.is(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) {
VPackBuilder bb;
createSystemCollectionProperties(name, bb, vocbase);
res = Collections::create(vocbase, // vocbase to create in
name, // collection name top create
TRI_COL_TYPE_DOCUMENT, // collection type to create
bb.slice(), // collection definition to create
true, // waitsForSyncReplication
true, // enforceReplicationFactor
isNewDatabase,
returnColPtr); // callback
if (res.ok()) {
TRI_ASSERT(createdCollection!=nullptr);
return {res, createdCollection};
}
}
// Something went wrong, we return res and nullptr
TRI_ASSERT(!res.ok());
return {res, nullptr};
}
Result Collections::load(TRI_vocbase_t& vocbase, LogicalCollection* coll) {
TRI_ASSERT(coll != nullptr);
if (ServerState::instance()->isCoordinator()) {
#ifdef USE_ENTERPRISE
auto& feature = vocbase.server().getFeature<ClusterFeature>();
return ULColCoordinatorEnterprise(feature, coll->vocbase().name(),
std::to_string(coll->id()), TRI_VOC_COL_STATUS_LOADED);
#else
auto& ci = vocbase.server().getFeature<ClusterFeature>().clusterInfo();
return ci.setCollectionStatusCoordinator(coll->vocbase().name(),
std::to_string(coll->id()),
TRI_VOC_COL_STATUS_LOADED);
#endif
}
auto ctx = transaction::V8Context::CreateWhenRequired(vocbase, true);
SingleCollectionTransaction trx(ctx, *coll, AccessMode::Type::READ);
Result res = trx.begin();
if (res.fail()) {
return res;
}
return trx.finish(res);
}
Result Collections::unload(TRI_vocbase_t* vocbase, LogicalCollection* coll) {
if (ServerState::instance()->isCoordinator()) {
#ifdef USE_ENTERPRISE
auto& feature = vocbase->server().getFeature<ClusterFeature>();
return ULColCoordinatorEnterprise(feature, vocbase->name(),
std::to_string(coll->id()),
TRI_VOC_COL_STATUS_UNLOADED);
#else
auto& ci = vocbase->server().getFeature<ClusterFeature>().clusterInfo();
return ci.setCollectionStatusCoordinator(vocbase->name(), std::to_string(coll->id()),
TRI_VOC_COL_STATUS_UNLOADED);
#endif
}
return vocbase->unloadCollection(coll, false);
}
Result Collections::properties(Context& ctxt, VPackBuilder& builder) {
LogicalCollection* coll = ctxt.coll();
TRI_ASSERT(coll != nullptr);
ExecContext const& exec = ExecContext::current();
bool canRead = exec.canUseCollection(coll->name(), auth::Level::RO);
if (!canRead || exec.databaseAuthLevel() == auth::Level::NONE) {
return Result(TRI_ERROR_FORBIDDEN, "cannot access " + coll->name());
}
std::unordered_set<std::string> ignoreKeys{
"allowUserKeys", "cid", "count", "deleted", "id", "indexes", "name",
"path", "planId", "shards", "status", "type", "version"};
if (!ServerState::instance()->isCoordinator()) {
// These are only relevant for cluster
ignoreKeys.insert({StaticStrings::DistributeShardsLike, StaticStrings::IsSmart,
StaticStrings::NumberOfShards, StaticStrings::ReplicationFactor,
StaticStrings::MinReplicationFactor,
StaticStrings::ShardKeys, StaticStrings::ShardingStrategy});
// this transaction is held longer than the following if...
auto trx = ctxt.trx(AccessMode::Type::READ, true, false);
TRI_ASSERT(trx != nullptr);
}
// note that we have an ongoing transaction here if we are in single-server
// case
VPackBuilder props = coll->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::Serialization::Properties);
TRI_ASSERT(builder.isOpenObject());
builder.add(VPackObjectIterator(props.slice()));
return TRI_ERROR_NO_ERROR;
}
Result Collections::updateProperties(LogicalCollection& collection,
velocypack::Slice const& props, bool partialUpdate) {
ExecContext const& exec = ExecContext::current();
bool canModify = exec.canUseCollection(collection.name(), auth::Level::RW);
if (!canModify || !exec.canUseDatabase(auth::Level::RW)) {
return TRI_ERROR_FORBIDDEN;
}
if (ServerState::instance()->isCoordinator()) {
ClusterInfo& ci =
collection.vocbase().server().getFeature<ClusterFeature>().clusterInfo();
auto info = ci.getCollection(collection.vocbase().name(),
std::to_string(collection.id()));
Result res = ShardingInfo::validateShardsAndReplicationFactor(props, collection.vocbase().server());
if (res.fail()) {
return res;
}
return info->properties(props, partialUpdate);
} else {
auto ctx = transaction::V8Context::CreateWhenRequired(collection.vocbase(), false);
SingleCollectionTransaction trx(ctx, collection, AccessMode::Type::EXCLUSIVE);
Result res = trx.begin();
if (!res.ok()) {
return res;
}
// try to write new parameter to file
auto updateRes = collection.properties(props, partialUpdate);
if (!updateRes.ok()) {
return updateRes;
}
auto physical = collection.getPhysical();
TRI_ASSERT(physical != nullptr);
return physical->persistProperties();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief helper function to rename collections in _graphs as well
////////////////////////////////////////////////////////////////////////////////
static int RenameGraphCollections(TRI_vocbase_t* vocbase, std::string const& oldName,
std::string const& newName) {
V8DealerFeature* dealer = V8DealerFeature::DEALER;
if (dealer == nullptr || !dealer->isEnabled()) {
return TRI_ERROR_NO_ERROR; // V8 might is disabled
}
StringBuffer buffer(true);
buffer.appendText(
"require('@arangodb/general-graph-common')._renameCollection(");
buffer.appendJsonEncoded(oldName.c_str(), oldName.size());
buffer.appendChar(',');
buffer.appendJsonEncoded(newName.c_str(), newName.size());
buffer.appendText(");");
JavaScriptSecurityContext securityContext =
JavaScriptSecurityContext::createInternalContext();
V8ContextGuard guard(vocbase, securityContext);
auto isolate = guard.isolate();
v8::HandleScope scope(isolate);
TRI_ExecuteJavaScriptString(isolate, isolate->GetCurrentContext(),
TRI_V8_ASCII_PAIR_STRING(isolate, buffer.c_str(),
buffer.length()),
TRI_V8_ASCII_STRING(isolate, "collection rename"), false);
return TRI_ERROR_NO_ERROR;
}
Result Collections::rename(LogicalCollection& collection,
std::string const& newName, bool doOverride) {
if (ServerState::instance()->isCoordinator()) {
// renaming a collection in a cluster is unsupported
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
if (newName.empty()) {
return Result(TRI_ERROR_BAD_PARAMETER, "<name> must be non-empty");
}
ExecContext const& exec = ExecContext::current();
if (!exec.canUseDatabase(auth::Level::RW) ||
!exec.canUseCollection(collection.name(), auth::Level::RW)) {
return TRI_ERROR_FORBIDDEN;
}
// check required to pass
// shell-collection-rocksdb-noncluster.js::testSystemSpecial
if (collection.system()) {
return TRI_set_errno(TRI_ERROR_FORBIDDEN);
}
if (!doOverride) {
auto isSystem = TRI_vocbase_t::IsSystemName(collection.name());
if (isSystem && !TRI_vocbase_t::IsSystemName(newName)) {
// a system collection shall not be renamed to a non-system collection
// name
return arangodb::Result(TRI_ERROR_ARANGO_ILLEGAL_NAME,
"a system collection shall not be renamed to a "
"non-system collection name");
} else if (!isSystem && TRI_vocbase_t::IsSystemName(newName)) {
return arangodb::Result(TRI_ERROR_ARANGO_ILLEGAL_NAME,
"a non-system collection shall not be renamed to "
"a system collection name");
}
if (!TRI_vocbase_t::IsAllowedName(isSystem, arangodb::velocypack::StringRef(newName))) {
return TRI_ERROR_ARANGO_ILLEGAL_NAME;
}
}
std::string const oldName(collection.name());
auto res = collection.vocbase().renameCollection(collection.id(), newName);
if (!res.ok()) {
return res;
}
// rename collection inside _graphs as well
return RenameGraphCollections(&(collection.vocbase()), oldName, newName);
}
#ifndef USE_ENTERPRISE
////////////////////////////////////////////////////////////////////////////////
/// @brief drops a collection, case of a coordinator in a cluster
////////////////////////////////////////////////////////////////////////////////
static Result DropVocbaseColCoordinator(arangodb::LogicalCollection* collection,
bool allowDropSystem) {
if (collection->system() && !allowDropSystem) {
return TRI_ERROR_FORBIDDEN;
}
auto& databaseName = collection->vocbase().name();
auto cid = std::to_string(collection->id());
ClusterInfo& ci =
collection->vocbase().server().getFeature<ClusterFeature>().clusterInfo();
auto res = ci.dropCollectionCoordinator(databaseName, cid, 300.0);
if (!res.ok()) {
return res;
}
collection->setStatus(TRI_VOC_COL_STATUS_DELETED);
return TRI_ERROR_NO_ERROR;
}
#endif
/*static*/ arangodb::Result Collections::drop( // drop collection
arangodb::LogicalCollection& coll, // collection to drop
bool allowDropSystem, // allow dropping system collection
double timeout // single-server drop timeout
) {
ExecContext const& exec = ExecContext::current();
if (!exec.canUseDatabase(coll.vocbase().name(), auth::Level::RW) || // vocbase modifiable
!exec.canUseCollection(coll.name(), auth::Level::RW)) { // collection modifiable
events::DropCollection(coll.vocbase().name(), coll.name(), TRI_ERROR_FORBIDDEN);
return arangodb::Result( // result
TRI_ERROR_FORBIDDEN, // code
"Insufficient rights to drop collection " + coll.name() // message
);
}
auto const& dbname = coll.vocbase().name();
std::string const collName = coll.name();
Result res;
// If we are a coordinator in a cluster, we have to behave differently:
if (ServerState::instance()->isCoordinator()) {
#ifdef USE_ENTERPRISE
res = DropColCoordinatorEnterprise(&coll, allowDropSystem);
#else
res = DropVocbaseColCoordinator(&coll, allowDropSystem);
#endif
} else {
res = coll.vocbase().dropCollection(coll.id(), allowDropSystem, timeout);
}
auth::UserManager* um = AuthenticationFeature::instance()->userManager();
if (res.ok() && um != nullptr) {
res = um->enumerateUsers(
[&](auth::User& entry) -> bool {
return entry.removeCollection(dbname, collName);
},
/*retryOnConflict*/ true);
}
events::DropCollection(coll.vocbase().name(), coll.name(), res.errorNumber());
return res;
}
futures::Future<Result> Collections::warmup(TRI_vocbase_t& vocbase,
LogicalCollection const& coll) {
ExecContext const& exec = ExecContext::current(); // disallow expensive ops
if (!exec.canUseCollection(coll.name(), auth::Level::RO)) {
return futures::makeFuture(Result(TRI_ERROR_FORBIDDEN));
}
if (ServerState::instance()->isCoordinator()) {
auto cid = std::to_string(coll.id());
auto& feature = vocbase.server().getFeature<ClusterFeature>();
return warmupOnCoordinator(feature, vocbase.name(), cid);
}
auto ctx = transaction::V8Context::CreateWhenRequired(vocbase, false);
SingleCollectionTransaction trx(ctx, coll, AccessMode::Type::READ);
Result res = trx.begin();
if (res.fail()) {
return futures::makeFuture(res);
}
auto poster = [](std::function<void()> fn) -> bool {
return SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, fn);
};
auto queue = std::make_shared<basics::LocalTaskQueue>(poster);
auto idxs = coll.getIndexes();
for (auto& idx : idxs) {
idx->warmup(&trx, queue);
}
queue->dispatchAndWait();
if (queue->status() == TRI_ERROR_NO_ERROR) {
res = trx.commit();
} else {
return futures::makeFuture(Result(queue->status()));
}
return futures::makeFuture(res);
}
futures::Future<OperationResult> Collections::revisionId(Context& ctxt) {
if (ServerState::instance()->isCoordinator()) {
auto& databaseName = ctxt.coll()->vocbase().name();
auto cid = std::to_string(ctxt.coll()->id());
auto& feature = ctxt.coll()->vocbase().server().getFeature<ClusterFeature>();
return revisionOnCoordinator(feature, databaseName, cid);
}
TRI_voc_rid_t rid = ctxt.coll()->revision(ctxt.trx(AccessMode::Type::READ, true, true));
VPackBuilder builder;
builder.add(VPackValue(rid));
return futures::makeFuture(OperationResult(Result(), builder.steal()));
}
/// @brief Helper implementation similar to ArangoCollection.all() in v8
/*static*/ arangodb::Result Collections::all(TRI_vocbase_t& vocbase, std::string const& cname,
DocCallback const& cb) {
// Implement it like this to stay close to the original
if (ServerState::instance()->isCoordinator()) {
auto empty = std::make_shared<VPackBuilder>();
std::string q = "FOR r IN @@coll RETURN r";
auto binds = std::make_shared<VPackBuilder>();
binds->openObject();
binds->add("@coll", VPackValue(cname));
binds->close();
arangodb::aql::Query query(false, vocbase, aql::QueryString(q), binds,
std::make_shared<VPackBuilder>(), arangodb::aql::PART_MAIN);
auto queryRegistry = QueryRegistryFeature::registry();
TRI_ASSERT(queryRegistry != nullptr);
aql::QueryResult queryResult = query.executeSync(queryRegistry);
Result res = queryResult.result;
if (queryResult.result.ok()) {
VPackSlice array = queryResult.data->slice();
for (VPackSlice doc : VPackArrayIterator(array)) {
cb(doc.resolveExternal());
}
}
return res;
} else {
auto ctx = transaction::V8Context::CreateWhenRequired(vocbase, true);
SingleCollectionTransaction trx(ctx, cname, AccessMode::Type::READ);
Result res = trx.begin();
if (res.fail()) {
return res;
}
// We directly read the entire cursor. so batchsize == limit
OperationCursor opCursor(trx.indexScan(cname, transaction::Methods::CursorType::ALL));
opCursor.allDocuments([&](LocalDocumentId const& token,
VPackSlice doc) { cb(doc.resolveExternal()); },
1000);
return trx.finish(res);
}
}
arangodb::Result Collections::checksum(LogicalCollection& collection,
bool withRevisions, bool withData,
uint64_t& checksum, TRI_voc_rid_t& revId) {
if (ServerState::instance()->isCoordinator()) {
return Result(TRI_ERROR_NOT_IMPLEMENTED);
}
auto ctx = transaction::V8Context::CreateWhenRequired(collection.vocbase(), true);
SingleCollectionTransaction trx(ctx, collection, AccessMode::Type::READ);
Result res = trx.begin();
if (res.fail()) {
return res;
}
revId = collection.revision(&trx);
checksum = 0;
// We directly read the entire cursor. so batchsize == limit
OperationCursor opCursor(trx.indexScan(collection.name(), transaction::Methods::CursorType::ALL));
opCursor.allDocuments([&](LocalDocumentId const& token, VPackSlice slice) {
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
if (withRevisions) {
localHash += transaction::helpers::extractRevSliceFromDocument(slice).hash();
}
if (withData) {
// with data
uint64_t const n = slice.length() ^ 0xf00ba44ba5;
uint64_t seed = fasthash64_uint64(n, 0xdeadf054);
for (auto it : VPackObjectIterator(slice, false)) {
// loop over all attributes, but exclude _rev, _id and _key
// _id is different for each collection anyway, _rev is covered by
// withRevisions, and _key was already handled before
VPackValueLength keyLength;
char const* key = it.key.getString(keyLength);
if (keyLength >= 3 && key[0] == '_' &&
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
(keyLength == 4 &&
(memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
// exclude attribute
continue;
}
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
}
}
checksum ^= localHash;
}, 1000);
return trx.finish(res);
}