1
0
Fork 0

restore perms

This commit is contained in:
hkernbach 2019-12-02 14:42:19 +01:00
parent 61b5723185
commit e06f5b4e1f
4 changed files with 130 additions and 16 deletions

View File

@ -241,7 +241,6 @@ void ClusterInfo::cleanup() {
theInstance->_plannedViews.clear();
theInstance->_plannedCollections.clear();
theInstance->_shards.clear();
theInstance->_shardKeys.clear();
theInstance->_shardIds.clear();
theInstance->_currentCollections.clear();
}
@ -605,7 +604,7 @@ void ClusterInfo::loadPlan() {
// >
decltype(_shards) newShards;
decltype(_shardServers) newShardServers;
decltype(_shardKeys) newShardKeys;
decltype(_shardToName) newShardToName;
bool swapDatabases = false;
bool swapCollections = false;
@ -958,16 +957,15 @@ void ClusterInfo::loadPlan() {
databaseCollections.emplace(collectionId, newCollection);
}
newShardKeys.emplace(collectionId, std::make_shared<std::vector<std::string>>(
newCollection->shardKeys()));
auto shardIDs = newCollection->shardIds();
auto shards = std::make_shared<std::vector<std::string>>();
shards->reserve(shardIDs->size());
newShardToName.reserve(shardIDs->size());
for (auto const& p : *shardIDs) {
shards->push_back(p.first);
newShardServers.emplace(p.first, p.second);
newShardToName.emplace(p.first, newCollection->name());
}
// Sort by the number in the shard ID ("s0000001" for example):
@ -1028,7 +1026,7 @@ void ClusterInfo::loadPlan() {
if (swapCollections) {
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardKeys.swap(newShardKeys);
_shardToName.swap(newShardToName);
_shardServers.swap(newShardServers);
}
@ -4197,6 +4195,16 @@ arangodb::Result ClusterInfo::getShardServers(ShardID const& shardId,
return arangodb::Result(TRI_ERROR_FAILED);
}
CollectionID ClusterInfo::getCollectionNameForShard(ShardID const& shardId) {
READ_LOCKER(readLocker, _planProt.lock);
auto it = _shardToName.find(shardId);
if (it != _shardToName.end()) {
return it->second;
}
return StaticStrings::Empty;
}
arangodb::Result ClusterInfo::agencyDump(std::shared_ptr<VPackBuilder> body) {
AgencyCommResult dump = _agency.dump();

View File

@ -818,6 +818,9 @@ class ClusterInfo final {
*/
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
/// @brief map shardId to collection name (not ID)
CollectionID getCollectionNameForShard(ShardID const& shardId);
/**
* @brief Lock agency's hot backup with TTL 60 seconds
*
@ -947,8 +950,7 @@ class ClusterInfo final {
ProtectionData _planProt;
uint64_t _planVersion; // This is the version in the Plan which underlies
// the data in _plannedCollections, _shards and
// _shardKeys
// the data in _plannedCollections and _shards and
uint64_t _currentVersion; // This is the version in Current which underlies
// the data in _currentDatabases,
// _currentCollections and _shardsIds
@ -957,7 +959,7 @@ class ClusterInfo final {
// We need information about collections, again we have
// data from Plan and from Current.
// The information for _shards and _shardKeys are filled from the
// The information for _shards and _shardToName are filled from the
// Plan (since they are fixed for the lifetime of the collection).
// _shardIds is filled from Current, since we have to be able to
// move shards between servers, and Plan contains who ought to be
@ -969,11 +971,11 @@ class ClusterInfo final {
std::shared_ptr<std::vector<std::string>>>
_shards; // from Plan/Collections/
// (may later come from Current/Collections/ )
std::unordered_map<CollectionID,
std::shared_ptr<std::vector<std::string>>>
_shardKeys; // from Plan/Collections/
// from Plan/Collections/
// planned shard => servers map
std::unordered_map<ShardID, std::vector<ServerID>> _shardServers;
// planned shard ID => collection name
std::unordered_map<ShardID, CollectionID> _shardToName;
AllViews _plannedViews; // from Plan/Views/
AllViews _newPlannedViews; // views that have been created during `loadPlan`

View File

@ -273,6 +273,12 @@ std::string const RestReplicationHandler::HoldReadLockCollection =
// main function that dispatches the different routes and commands
RestStatus RestReplicationHandler::execute() {
auto res = testPermissions();
if (!res.ok()) {
generateError(res);
return RestStatus::DONE;
}
// extract the request type
auto const type = _request->requestType();
auto const& suffixes = _request->suffixes();
@ -583,6 +589,94 @@ BAD_CALL:
return RestStatus::DONE;
}
Result RestReplicationHandler::testPermissions() {
if (!_request->authenticated()) {
return TRI_ERROR_NO_ERROR;
}
std::string user = _request->user();
auto const& suffixes = _request->suffixes();
size_t const len = suffixes.size();
if (len >= 1) {
auto const type = _request->requestType();
std::string const& command = suffixes[0];
if ((command == Batch) || (command == Inventory && type == rest::RequestType::GET) ||
(command == Dump && type == rest::RequestType::GET) ||
(command == RestoreCollection && type == rest::RequestType::PUT)) {
if (command == Dump) {
// check dump collection permissions (at least ro needed)
std::string collectionName = _request->value("collection");
if (ServerState::instance()->isCoordinator()) {
// We have a shard id, need to translate
auto ci = ClusterInfo::instance();
collectionName = ci->getCollectionNameForShard(collectionName);
}
if (!collectionName.empty()) {
auto& exec = ExecContext::CURRENT;
ExecContextSuperuserScope escope(exec->isAdminUser());
if (!exec->isAdminUser() &&
!exec->canUseCollection(collectionName, auth::Level::RO)) {
// not enough rights
return Result(TRI_ERROR_FORBIDDEN);
}
} else {
// not found, return 404
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
} else if (command == RestoreCollection) {
VPackSlice const slice = _request->payload();
VPackSlice const parameters = slice.get("parameters");
if (parameters.isObject()) {
if (parameters.get("name").isString()) {
std::string collectionName = parameters.get("name").copyString();
if (!collectionName.empty()) {
std::string dbName = _request->databaseName();
TRI_vocbase_t* vocbase = DatabaseFeature::DATABASE->lookupDatabase(dbName);
if (vocbase == nullptr) {
return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
std::string const& overwriteCollection =
_request->value("overwrite");
auto& exec = ExecContext::CURRENT;
ExecContextSuperuserScope escope(exec->isSuperuser());
if (overwriteCollection == "true" ||
vocbase->lookupCollection(collectionName) == nullptr) {
// 1.) re-create collection, means: overwrite=true (rw database)
// OR 2.) not existing, new collection (rw database)
if (!exec->isAdminUser() && !exec->canUseDatabase(dbName, auth::Level::RW)) {
return Result(TRI_ERROR_FORBIDDEN);
}
} else {
// 3u) Existing collection (ro database, rw collection)
// no overwrite. restoring into an existing collection
if (!exec->isAdminUser() &&
!exec->canUseCollection(collectionName, auth::Level::RW)) {
return Result(TRI_ERROR_FORBIDDEN);
}
}
} else {
return Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"empty collection name");
}
}
} else {
return Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection name type");
}
} else {
return Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection parameter type");
}
}
}
return Result(TRI_ERROR_NO_ERROR);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock JSF_put_api_replication_makeSlave
////////////////////////////////////////////////////////////////////////////////
@ -1190,7 +1284,10 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
if (!isValidReplFactorSlice) {
if (replicationFactor == 0) {
replicationFactor = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster")->defaultReplicationFactor();
replicationFactor =
application_features::ApplicationServer::getFeature<ClusterFeature>(
"Cluster")
->defaultReplicationFactor();
if (replicationFactor == 0) {
replicationFactor = 1;
}
@ -1970,9 +2067,9 @@ void RestReplicationHandler::handleCommandRestoreView() {
if (view) {
if (!overwrite) {
generateError(GeneralResponse::responseCode(TRI_ERROR_ARANGO_DUPLICATE_NAME),
TRI_ERROR_ARANGO_DUPLICATE_NAME,
std::string("unable to restore view '") + nameSlice.copyString() + ": " +
TRI_errno_string(TRI_ERROR_ARANGO_DUPLICATE_NAME));
TRI_ERROR_ARANGO_DUPLICATE_NAME,
std::string("unable to restore view '") + nameSlice.copyString() +
": " + TRI_errno_string(TRI_ERROR_ARANGO_DUPLICATE_NAME));
return;
}

View File

@ -505,6 +505,13 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
//////////////////////////////////////////////////////////////////////////////
ResultT<bool> cancelBlockingTransaction(aql::QueryId id) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Validate that the requesting user has access rights to this route
/// Will return TRI_ERROR_NO_ERROR if user has access
/// Will return error code otherwise.
//////////////////////////////////////////////////////////////////////////////
Result testPermissions();
};
} // namespace arangodb
#endif