1
0
Fork 0

restore perms

This commit is contained in:
hkernbach 2019-12-02 15:35:16 +01:00
parent 4ad2ff1f75
commit c830838aa5
5 changed files with 171 additions and 33 deletions

View File

@ -287,7 +287,7 @@ void ClusterInfo::cleanup() {
theInstance->_plannedViews.clear();
theInstance->_plannedCollections.clear();
theInstance->_shards.clear();
theInstance->_shardKeys.clear();
theInstance->_shardToName.clear();
theInstance->_shardIds.clear();
theInstance->_currentCollections.clear();
}
@ -613,7 +613,7 @@ void ClusterInfo::loadPlan() {
// >
decltype(_shards) newShards;
decltype(_shardServers) newShardServers;
decltype(_shardKeys) newShardKeys;
decltype(_shardToName) newShardToName;
bool swapDatabases = false;
bool swapCollections = false;
@ -926,15 +926,15 @@ void ClusterInfo::loadPlan() {
databaseCollections.emplace(collectionId, newCollection);
}
auto shardKeys =
std::make_shared<std::vector<std::string>>(newCollection->shardKeys());
newShardKeys.emplace(collectionId, std::move(shardKeys));
auto shardIDs = newCollection->shardIds();
auto shards = std::make_shared<std::vector<std::string>>();
shards->reserve(shardIDs->size());
newShardToName.reserve(shardIDs->size());
for (auto const& p : *shardIDs) {
shards->push_back(p.first);
newShardServers.emplace(p.first, p.second);
newShardToName.emplace(p.first, newCollection->name());
}
// Sort by the number in the shard ID ("s0000001" for example):
std::sort(shards->begin(), shards->end(),
@ -990,7 +990,7 @@ void ClusterInfo::loadPlan() {
if (swapCollections) {
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardKeys.swap(newShardKeys);
_shardToName.swap(newShardToName);
_shardServers.swap(newShardServers);
}
if (swapViews) {
@ -3887,6 +3887,16 @@ arangodb::Result ClusterInfo::getShardServers(ShardID const& shardId,
return arangodb::Result(TRI_ERROR_FAILED);
}
CollectionID ClusterInfo::getCollectionNameForShard(ShardID const& shardId) {
READ_LOCKER(readLocker, _planProt.lock);
auto it = _shardToName.find(shardId);
if (it != _shardToName.end()) {
return it->second;
}
return StaticStrings::Empty;
}
arangodb::Result ClusterInfo::agencyDump(std::shared_ptr<VPackBuilder> body) {
AgencyCommResult dump = _agency.dump();
body->add(dump.slice());

View File

@ -648,6 +648,9 @@ class ClusterInfo {
*/
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
/// @brief map shardId to collection name (not ID)
CollectionID getCollectionNameForShard(ShardID const& shardId);
//////////////////////////////////////////////////////////////////////////////
/// @brief get an operation timeout
//////////////////////////////////////////////////////////////////////////////
@ -751,8 +754,7 @@ class ClusterInfo {
ProtectionData _planProt;
uint64_t _planVersion; // This is the version in the Plan which underlies
// the data in _plannedCollections, _shards and
// _shardKeys
// the data in _plannedCollections and _shards.
uint64_t _currentVersion; // This is the version in Current which underlies
// the data in _currentDatabases,
// _currentCollections and _shardsIds
@ -761,7 +763,7 @@ class ClusterInfo {
// We need information about collections, again we have
// data from Plan and from Current.
// The information for _shards and _shardKeys are filled from the
// The information for _shards and _shardToName are filled from the
// Plan (since they are fixed for the lifetime of the collection).
// _shardIds is filled from Current, since we have to be able to
// move shards between servers, and Plan contains who ought to be
@ -773,11 +775,10 @@ class ClusterInfo {
std::shared_ptr<std::vector<std::string>>>
_shards; // from Plan/Collections/
// (may later come from Current/Collections/ )
std::unordered_map<CollectionID,
std::shared_ptr<std::vector<std::string>>>
_shardKeys; // from Plan/Collections/
// planned shard => servers map
std::unordered_map<ShardID, std::vector<ServerID>> _shardServers;
// planned shard ID => collection name
std::unordered_map<ShardID, CollectionID> _shardToName;
AllViews _plannedViews; // from Plan/Views/
AllViews _newPlannedViews; // views that have been created during `loadPlan` execution

View File

@ -112,15 +112,12 @@ static bool ignoreHiddenEnterpriseCollection(std::string const& name, bool force
static Result checkPlanLeaderDirect(std::shared_ptr<LogicalCollection> const& col,
std::string const& claimLeaderId) {
std::vector<std::string> agencyPath = {
"Plan",
"Collections",
col->vocbase().name(),
std::to_string(col->planId()),
"shards",
col->name()
};
std::vector<std::string> agencyPath = {"Plan",
"Collections",
col->vocbase().name(),
std::to_string(col->planId()),
"shards",
col->name()};
std::string shardAgencyPathString = StringUtils::join(agencyPath, '/');
@ -245,6 +242,12 @@ bool RestReplicationHandler::isCoordinatorError() {
// main function that dispatches the different routes and commands
RestStatus RestReplicationHandler::execute() {
auto res = testPermissions();
if (!res.ok()) {
generateError(res);
return RestStatus::DONE;
}
// extract the request type
auto const type = _request->requestType();
auto const& suffixes = _request->suffixes();
@ -547,9 +550,97 @@ BAD_CALL:
return RestStatus::DONE;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief update the leader of a shard
//////////////////////////////////////////////////////////////////////////////
Result RestReplicationHandler::testPermissions() {
if (!_request->authenticated()) {
return TRI_ERROR_NO_ERROR;
}
std::string user = _request->user();
auto const& suffixes = _request->suffixes();
size_t const len = suffixes.size();
if (len >= 1) {
auto const type = _request->requestType();
std::string const& command = suffixes[0];
if ((command == "batch") || (command == "inventory" && type == rest::RequestType::GET) ||
(command == "dump" && type == rest::RequestType::GET) ||
(command == "restore-collection" && type == rest::RequestType::PUT)) {
if (command == "dump") {
// check dump collection permissions (at least ro needed)
std::string collectionName = _request->value("collection");
if (ServerState::instance()->isCoordinator()) {
// We have a shard id, need to translate
auto ci = ClusterInfo::instance();
collectionName = ci->getCollectionNameForShard(collectionName);
}
if (!collectionName.empty()) {
auto& exec = ExecContext::CURRENT;
ExecContextSuperuserScope escope(exec->isAdminUser());
if (!exec->isAdminUser() &&
!exec->canUseCollection(collectionName, auth::Level::RO)) {
// not enough rights
return Result(TRI_ERROR_FORBIDDEN);
}
} else {
// not found, return 404
return Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
}
} else if (command == "restore-collection") {
VPackSlice const slice = _request->payload();
VPackSlice const parameters = slice.get("parameters");
if (parameters.isObject()) {
if (parameters.get("name").isString()) {
std::string collectionName = parameters.get("name").copyString();
if (!collectionName.empty()) {
std::string dbName = _request->databaseName();
TRI_vocbase_t* vocbase = DatabaseFeature::DATABASE->lookupDatabase(dbName);
if (vocbase == nullptr) {
return Result(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
std::string const& overwriteCollection =
_request->value("overwrite");
auto& exec = ExecContext::CURRENT;
ExecContextSuperuserScope escope(exec->isSuperuser());
if (overwriteCollection == "true" ||
vocbase->lookupCollection(collectionName) == nullptr) {
// 1.) re-create collection, means: overwrite=true (rw database)
// OR 2.) not existing, new collection (rw database)
if (!exec->isAdminUser() && !exec->canUseDatabase(dbName, auth::Level::RW)) {
return Result(TRI_ERROR_FORBIDDEN);
}
} else {
// 3u) Existing collection (ro database, rw collection)
// no overwrite. restoring into an existing collection
if (!exec->isAdminUser() &&
!exec->canUseCollection(collectionName, auth::Level::RW)) {
return Result(TRI_ERROR_FORBIDDEN);
}
}
} else {
return Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"empty collection name");
}
}
} else {
return Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection name type");
}
} else {
return Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection parameter type");
}
}
}
return Result(TRI_ERROR_NO_ERROR);
}
//////////////////////////////////////////////////////////////////////////////
/// @brief update the leader of a shard
//////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandSetTheLeader() {
TRI_ASSERT(ServerState::instance()->isDBServer());
@ -596,7 +687,8 @@ void RestReplicationHandler::handleCommandSetTheLeader() {
}
if (!oldLeaderIdSlice.isEqualString(currentLeader)) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN, "old leader not as expected");
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN,
"old leader not as expected");
return;
}
@ -887,9 +979,9 @@ void RestReplicationHandler::handleCommandRestoreCollection() {
if (name.isObject()) {
name = name.get("name");
if (name.isString() && !name.copyString().empty() && name.copyString()[0] == '_') {
// system collection... it may have been created in the meantime by a background action
// collection should be there now - as long as the collection is there in the end
// we are satisfied
// system collection... it may have been created in the meantime by a
// background action collection should be there now - as long as the
// collection is there in the end we are satisfied
res.reset();
}
}
@ -1250,15 +1342,22 @@ Result RestReplicationHandler::processRestoreCollectionCoordinator(
s = parameters.get(StaticStrings::ReplicationFactor);
if (s.isString() && s.copyString() == "satellite") {
// set "satellite" replicationFactor to the default replication factor
ClusterFeature* cl = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster");
ClusterFeature* cl = application_features::ApplicationServer::getFeature<ClusterFeature>(
"Cluster");
uint32_t replicationFactor = cl->systemReplicationFactor();
toMerge.add(StaticStrings::ReplicationFactor, VPackValue(replicationFactor));
changes.push_back(std::string("changed 'replicationFactor' attribute value to ") + std::to_string(replicationFactor));;
changes.push_back(
std::string("changed 'replicationFactor' attribute value to ") +
std::to_string(replicationFactor));
;
}
if (!changes.empty()) {
LOG_TOPIC(INFO, Logger::CLUSTER) << "rewrote info for collection '" << name << "' on restore for usage with the community version. the following changes were applied: " << basics::StringUtils::join(changes, ". ");
LOG_TOPIC(INFO, Logger::CLUSTER)
<< "rewrote info for collection '"
<< name << "' on restore for usage with the community version. the following changes were applied: "
<< basics::StringUtils::join(changes, ". ");
}
#endif
@ -2384,7 +2483,7 @@ void RestReplicationHandler::handleCommandAddFollower() {
col->followers()->add(followerId);
{ // untrack the (async) replication client, so the WAL may be cleaned
{ // untrack the (async) replication client, so the WAL may be cleaned
TRI_server_id_t const serverId = StringUtils::uint64(
basics::VelocyPackHelper::getStringValue(body, "serverId", ""));
SyncerId const syncerId = SyncerId{StringUtils::uint64(

View File

@ -458,6 +458,13 @@ class RestReplicationHandler : public RestVocbaseBaseHandler {
//////////////////////////////////////////////////////////////////////////////
ResultT<bool> cancelBlockingTransaction(aql::QueryId id) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief Validate that the requesting user has access rights to this route
/// Will return TRI_ERROR_NO_ERROR if user has access
/// Will return error code otherwise.
//////////////////////////////////////////////////////////////////////////////
Result testPermissions();
};
} // namespace arangodb
#endif

View File

@ -39,6 +39,7 @@ class Methods;
/// context for convencience
class ExecContext : public RequestContext {
protected:
friend struct ExecContextSuperuserScope;
enum class Type { Default, Internal };
ExecContext(ExecContext::Type type, std::string const& user,
@ -143,6 +144,7 @@ class ExecContext : public RequestContext {
/// level of current database
auth::Level _databaseAuthLevel;
private:
static ExecContext SUPERUSER;
};
@ -158,6 +160,25 @@ struct ExecContextScope {
private:
ExecContext const* _old;
};
struct ExecContextSuperuserScope {
explicit ExecContextSuperuserScope()
: _old(ExecContext::CURRENT) {
ExecContext::CURRENT = &ExecContext::SUPERUSER;
}
explicit ExecContextSuperuserScope(bool cond) : _old(ExecContext::CURRENT) {
if (cond) {
ExecContext::CURRENT = &ExecContext::SUPERUSER;
}
}
~ExecContextSuperuserScope() { ExecContext::CURRENT = _old; }
private:
ExecContext const* _old;
};
} // namespace arangodb
#endif