1
0
Fork 0

Fix auth with dump

This commit is contained in:
Simon Grätzer 2019-11-29 18:03:08 +08:00
parent b610d99d9f
commit 223895d34d
No known key found for this signature in database
GPG Key ID: E4736AA091116E5C
4 changed files with 63 additions and 23 deletions

View File

@ -224,7 +224,6 @@ void ClusterInfo::cleanup() {
_plannedViews.clear();
_plannedCollections.clear();
_shards.clear();
_shardKeys.clear();
_shardIds.clear();
_currentCollections.clear();
}
@ -235,6 +234,7 @@ void ClusterInfo::triggerBackgroundGetIds() {
_uniqid._nextUpperValue = 0ULL;
try {
_idLock.assertLockedByCurrentThread();
if (_uniqid._backgroundJobIsRunning) {
return;
}
@ -598,7 +598,7 @@ void ClusterInfo::loadPlan() {
// >
decltype(_shards) newShards;
decltype(_shardServers) newShardServers;
decltype(_shardKeys) newShardKeys;
decltype(_shardToName) newShardToName;
bool swapDatabases = false;
bool swapCollections = false;
@ -957,17 +957,16 @@ void ClusterInfo::loadPlan() {
databaseCollections.try_emplace(collectionId, newCollection);
}
newShardKeys.try_emplace(collectionId, std::make_shared<std::vector<std::string>>(
newCollection->shardKeys()));
auto shardIDs = newCollection->shardIds();
auto shards = std::make_shared<std::vector<std::string>>();
shards->reserve(shardIDs->size());
newShardToName.reserve(shardIDs->size());
for (auto const& p : *shardIDs) {
TRI_ASSERT(p.first.size() >= 2);
shards->push_back(p.first);
newShardServers.try_emplace(p.first, p.second);
newShardToName.try_emplace(p.first, newCollection->name());
}
// Sort by the number in the shard ID ("s0000001" for example):
@ -1053,8 +1052,8 @@ void ClusterInfo::loadPlan() {
if (swapCollections) {
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardKeys.swap(newShardKeys);
_shardServers.swap(newShardServers);
_shardToName.swap(newShardToName);
}
if (swapViews) {
@ -4287,6 +4286,16 @@ arangodb::Result ClusterInfo::getShardServers(ShardID const& shardId,
return arangodb::Result(TRI_ERROR_FAILED);
}
CollectionID ClusterInfo::getCollectionNameForShard(ShardID const& shardId) {
READ_LOCKER(readLocker, _planProt.lock);
auto it = _shardToName.find(shardId);
if (it != _shardToName.end()) {
return it->second;
}
return StaticStrings::Empty;
}
arangodb::Result ClusterInfo::agencyDump(std::shared_ptr<VPackBuilder> body) {
AgencyCommResult dump = _agency.dump();

View File

@ -815,6 +815,9 @@ class ClusterInfo final {
* @return List of DB servers serving the shard
*/
arangodb::Result getShardServers(ShardID const& shardId, std::vector<ServerID>&);
/// @brief map shardId to collection name (not ID)
CollectionID getCollectionNameForShard(ShardID const& shardId);
/**
* @brief Lock agency's hot backup with TTL 60 seconds
@ -956,8 +959,7 @@ class ClusterInfo final {
ProtectionData _planProt;
uint64_t _planVersion; // This is the version in the Plan which underlies
// the data in _plannedCollections, _shards and
// _shardKeys
// the data in _plannedCollections and _shards
uint64_t _currentVersion; // This is the version in Current which underlies
// the data in _currentDatabases,
// _currentCollections and _shardsIds
@ -978,11 +980,10 @@ class ClusterInfo final {
std::shared_ptr<std::vector<std::string>>>
_shards; // from Plan/Collections/
// (may later come from Current/Collections/ )
std::unordered_map<CollectionID,
std::shared_ptr<std::vector<std::string>>>
_shardKeys; // from Plan/Collections/
// planned shard => servers map
std::unordered_map<ShardID, std::vector<ServerID>> _shardServers;
// planned shard ID => collection name
std::unordered_map<ShardID, CollectionID> _shardToName;
AllViews _plannedViews; // from Plan/Views/
AllViews _newPlannedViews; // views that have been created during `loadPlan`

View File

@ -140,8 +140,12 @@ futures::Future<Result> RestHandler::forwardRequest(bool& forwarded) {
std::map<std::string, std::string> headers{_request->headers().begin(),
_request->headers().end()};
if (headers.find(StaticStrings::Authorization) == headers.end()) {
// we never override auth headers for internal requests
bool const forwardAuth = serverId.compare(0, 5, "CRDN-") == 0;
if (!forwardAuth) {
headers.erase(StaticStrings::Authorization);
} else if (headers.find(StaticStrings::Authorization) == headers.end()) {
// No authorization header is set, this is in particular the case if this
// request is coming in with VelocyStream, where the authentication happens
// once at the beginning of the connection and not with every request.

View File

@ -592,24 +592,43 @@ BAD_CALL:
/// @brief returns the short id of the server which should handle this request
std::string RestReplicationHandler::forwardingTarget() {
if (!ServerState::instance()->isCoordinator()) {
return "";
return StaticStrings::Empty;
}
auto const& suffixes = _request->suffixes();
size_t const len = suffixes.size();
if (len >= 1) {
auto const type = _request->requestType();
std::string const& command = suffixes[0];
if ((command == Batch) || (command == Inventory && type == rest::RequestType::GET) ||
(command == Dump && type == rest::RequestType::GET)) {
ServerID const& DBserver = _request->value("DBserver");
if (!DBserver.empty()) {
return DBserver;
if (len < 1) {
return StaticStrings::Empty;
}
auto const type = _request->requestType();
std::string const& command = suffixes[0];
if ((command == Inventory && type == rest::RequestType::GET) ||
(command == Dump && type == rest::RequestType::GET) ||
(command == Batch)) {
ServerID const& DBserver = _request->value("DBserver");
if (DBserver.empty()) {
return StaticStrings::Empty;
}
if (command == Dump) { // check auth here
bool found = false;
std::string const& shard = _request->value("collection", found);
if (found) {
auto& ci = server().getFeature<ClusterFeature>().clusterInfo();
auto cname = ci.getCollectionNameForShard(shard);
auto& exec = ExecContext::current();
ExecContextSuperuserScope escope(exec.isAdminUser());
if (!exec.canUseCollection(cname, auth::Level::RO)) {
return StaticStrings::Empty;
}
}
}
return DBserver;
}
return "";
return StaticStrings::Empty;
}
////////////////////////////////////////////////////////////////////////////////
@ -711,9 +730,16 @@ void RestReplicationHandler::handleCommandClusterInventory() {
resultBuilder.add(VPackValue(StaticStrings::Properties));
vocbase->toVelocyPack(resultBuilder);
}
auto& exec = ExecContext::current();
ExecContextSuperuserScope escope(exec.isAdminUser());
resultBuilder.add("collections", VPackValue(VPackValueType::Array));
for (std::shared_ptr<LogicalCollection> const& c : cols) {
if (!exec.canUseCollection(vocbase->name(), c->name(), auth::Level::RO)) {
continue;
}
// We want to check if the collection is usable and all followers
// are in sync:
std::shared_ptr<ShardMap> shardMap = c->shardIds();