1
0
Fork 0
This commit is contained in:
Frank Celler 2016-06-01 15:57:28 +02:00
parent 963449d5f7
commit 3e61cb433d
11 changed files with 101 additions and 213 deletions

View File

@ -445,81 +445,6 @@ bool shardKeysChanged(std::string const& dbname, std::string const& collname,
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns users
////////////////////////////////////////////////////////////////////////////////
int usersOnCoordinator(std::string const& dbname, VPackBuilder& result,
double timeout) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<CollectionInfo> collinfo =
ci->getCollection(dbname, TRI_COL_NAME_USERS);
if (collinfo->empty()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
auto shards = collinfo->shardIds();
CoordTransactionID coordTransactionID = TRI_NewTickServer();
for (auto const& p : *shards) {
auto headers = std::make_unique<std::unordered_map<std::string, std::string>>();
// set collection name (shard id)
auto body = std::make_shared<std::string>();
body->append("{\"collection\":\"");
body->append(p.first);
body->append("\"}");
cc->asyncRequest(
"", coordTransactionID, "shard:" + p.first,
arangodb::GeneralRequest::RequestType::PUT,
"/_db/" + StringUtils::urlEncode(dbname) + "/_api/simple/all", body,
headers, nullptr, 10.0);
}
try {
// Now listen to the results:
int count;
int nrok = 0;
for (count = (int)shards->size(); count > 0; count--) {
auto res = cc->wait("", coordTransactionID, 0, "", timeout);
if (res.status == CL_COMM_RECEIVED) {
if (res.answer_code == arangodb::GeneralResponse::ResponseCode::OK ||
res.answer_code == arangodb::GeneralResponse::ResponseCode::CREATED) {
std::shared_ptr<VPackBuilder> answerBuilder = ExtractAnswer(res);
VPackSlice answer = answerBuilder->slice();
if (answer.isObject()) {
VPackSlice r = answer.get("result");
if (r.isArray()) {
for (auto const& p : VPackArrayIterator(r)) {
if (p.isObject()) {
result.add(p);
}
}
}
nrok++;
}
}
}
}
if (nrok != (int)shards->size()) {
return TRI_ERROR_INTERNAL;
}
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
// the DBserver could have reported an error.
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -63,13 +63,6 @@ bool shardKeysChanged(std::string const& dbname, std::string const& collname,
VPackSlice const& oldValue, VPackSlice const& newValue,
bool isPatch);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns users
////////////////////////////////////////////////////////////////////////////////
int usersOnCoordinator(std::string const& dbname,
arangodb::velocypack::Builder& result, double timeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -63,7 +63,7 @@ HeartbeatThread::HeartbeatThread(TRI_server_t* server,
_statusLock(),
_agency(),
_condition(),
_refetchUsers(),
_refetchUsers(true),
_myId(ServerState::instance()->getId()),
_interval(interval),
_maxFailsBeforeWarning(maxFailsBeforeWarning),
@ -379,10 +379,9 @@ void HeartbeatThread::runCoordinator() {
<< "Reloading users for database " << vocbase->_name
<< ".";
if (!fetchUsers(vocbase)) {
if (!fetchUsers()) {
// something is wrong... probably the database server
// with the _users collection is not yet available
RestServerFeature::AUTH_INFO->insertInitial();
allOK = false;
// we will not set oldUserVersion such that we will try this
// very same exercise again in the next heartbeat
@ -562,16 +561,16 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
HasRunOnce = 1;
// insert initial user(s) for database
if (!fetchUsers(vocbase)) {
if (!fetchUsers()) {
TRI_ReleaseVocBase(vocbase);
return false; // We give up, we will try again in the
// next heartbeat
}
}
} else if (TRI_EqualString(vocbase->_name, TRI_VOC_SYSTEM_DATABASE)) {
if (_refetchUsers.find(vocbase) != _refetchUsers.end()) {
if (_refetchUsers) {
// must re-fetch users for an existing database
if (!fetchUsers(vocbase)) {
if (!fetchUsers()) {
fetchingUsersFailed = true;
}
}
@ -734,52 +733,23 @@ bool HeartbeatThread::sendState() {
/// @brief fetch users for a database (run on coordinator only)
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::fetchUsers(TRI_vocbase_t* vocbase) {
bool result = false;
bool HeartbeatThread::fetchUsers() {
VPackBuilder builder;
builder.openArray();
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "fetching users for database '" << vocbase->_name << "'";
<< "fetching users for database";
int res = usersOnCoordinator(std::string(vocbase->_name), builder, 10.0);
if (res == TRI_ERROR_NO_ERROR) {
builder.close();
VPackSlice users = builder.slice();
// we were able to read from the _users collection
TRI_ASSERT(users.isArray());
if (users.length() == 0) {
// no users found, now insert initial default user
RestServerFeature::AUTH_INFO->insertInitial();
} else {
// users found in collection, insert them into cache
RestServerFeature::AUTH_INFO->populate(users);
}
result = true;
} else if (res == TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) {
// could not access _users collection, probably the cluster
// was just created... insert initial default user
RestServerFeature::AUTH_INFO->insertInitial();
result = true;
} else if (res == TRI_ERROR_INTERNAL) {
// something is wrong... probably the database server with the
// _users collection is not yet available
// try again next time
result = false;
}
bool result = RestServerFeature::AUTH_INFO.reload();
if (result) {
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "fetching users for database '" << vocbase->_name << "' successful";
_refetchUsers.erase(vocbase);
<< "fetching users successful";
_refetchUsers = false;
} else {
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "fetching users for database '" << vocbase->_name
<< "' failed with error: " << TRI_errno_string(res);
_refetchUsers.insert(vocbase);
<< "fetching users failed";
_refetchUsers = true;
}
return result;

View File

@ -140,7 +140,7 @@ class HeartbeatThread : public Thread {
/// @brief fetch users for a database (run on coordinator only)
//////////////////////////////////////////////////////////////////////////////
bool fetchUsers(TRI_vocbase_t*);
bool fetchUsers();
//////////////////////////////////////////////////////////////////////////////
/// @brief bring the db server in sync with the desired state
@ -180,11 +180,10 @@ class HeartbeatThread : public Thread {
arangodb::basics::ConditionVariable _condition;
//////////////////////////////////////////////////////////////////////////////
/// @brief users for these databases will be re-fetched the next time the
/// heartbeat thread runs
/// @brief users will be re-fetched the next time the heartbeat thread runs
//////////////////////////////////////////////////////////////////////////////
std::unordered_set<TRI_vocbase_t*> _refetchUsers;
bool _refetchUsers;
//////////////////////////////////////////////////////////////////////////////
/// @brief this server's id

View File

@ -75,7 +75,7 @@ using namespace arangodb;
using namespace arangodb::rest;
using namespace arangodb::options;
AuthInfo* RestServerFeature::AUTH_INFO = nullptr;
AuthInfo RestServerFeature::AUTH_INFO;
RestServerFeature* RestServerFeature::RESTSERVER = nullptr;
RestServerFeature::RestServerFeature(
@ -258,10 +258,13 @@ void RestServerFeature::start() {
<< (_authenticationUnixSockets ? "on" : "off");
#endif
}
// populate the authentication cache. otherwise no one can access the new
// database
RestServerFeature::AUTH_INFO.reload();
}
void RestServerFeature::stop() {
RESTSERVER = nullptr;
for (auto& server : _servers) {
server->stopListening();
}
@ -275,6 +278,8 @@ void RestServerFeature::stop() {
}
_httpOptions._vocbase = nullptr;
RESTSERVER = nullptr;
}
void RestServerFeature::buildServers() {

View File

@ -40,7 +40,7 @@ class RestServerThread;
class RestServerFeature final
: public application_features::ApplicationFeature {
public:
static AuthInfo* AUTH_INFO;
static AuthInfo AUTH_INFO;
public:
static bool authenticationEnabled() {

View File

@ -29,9 +29,9 @@
#include "Endpoint/ConnectionInfo.h"
#include "Logger/Logger.h"
#include "RestServer/RestServerFeature.h"
#include "VocBase/AuthInfo.h"
#include "VocBase/server.h"
#include "VocBase/vocbase.h"
#include "Vocbase/AuthInfo.h"
using namespace arangodb;
using namespace arangodb::basics;
@ -147,7 +147,7 @@ GeneralResponse::ResponseCode VocbaseContext::authenticate() {
}
AuthResult result =
RestServerFeature::AUTH_INFO->checkAuthentication(auth, _vocbase->_name);
RestServerFeature::AUTH_INFO.checkAuthentication(auth, _vocbase->_name);
if (!result._authorized) {
return GeneralResponse::ResponseCode::UNAUTHORIZED;

View File

@ -927,24 +927,6 @@ static void JS_ParseDatetime(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
/// @brief reloads the authentication info, coordinator case
////////////////////////////////////////////////////////////////////////////////
static bool ReloadAuthCoordinator(TRI_vocbase_t* vocbase) {
VPackBuilder builder;
builder.openArray();
int res = usersOnCoordinator(TRI_VOC_SYSTEM_DATABASE, builder, 60.0);
if (res == TRI_ERROR_NO_ERROR) {
builder.close();
return RestServerFeature::AUTH_INFO->populate(builder.slice());
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief reloads the authentication info from collection _users
////////////////////////////////////////////////////////////////////////////////
@ -963,15 +945,12 @@ static void JS_ReloadAuth(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_USAGE("RELOAD_AUTH()");
}
bool result;
if (ServerState::instance()->isCoordinator()) {
result = ReloadAuthCoordinator(vocbase);
} else {
result = RestServerFeature::AUTH_INFO->reload();
}
bool result = RestServerFeature::AUTH_INFO.reload();
if (result) {
TRI_V8_RETURN_TRUE();
}
TRI_V8_RETURN_FALSE();
TRI_V8_TRY_CATCH_END
}
@ -3198,10 +3177,6 @@ static void JS_CreateDatabase(v8::FunctionCallbackInfo<v8::Value> const& args) {
// and switch back
v8g->_vocbase = orig;
// populate the authentication cache. otherwise no one can access the new
// database
RestServerFeature::AUTH_INFO->reload();
// finally decrease the reference-counter
TRI_ReleaseVocBase(database);

View File

@ -23,6 +23,8 @@
#include "AuthInfo.h"
#include <iostream>
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
@ -38,6 +40,7 @@
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::velocypack;
static AuthEntry CreateAuthEntry(VPackSlice const& slice) {
if (slice.isNone() || !slice.isObject()) {
@ -91,6 +94,14 @@ static AuthEntry CreateAuthEntry(VPackSlice const& slice) {
bool mustChange =
VelocyPackHelper::getBooleanValue(slice, "changePassword", false);
std::cout
<< "user: " << userSlice.copyString() << "\n"
<< "method: " << methodSlice.copyString() << "\n"
<< "salt: " << saltSlice.copyString() << "\n"
<< "hash: " << hashSlice.copyString() << "\n"
<< "active: " << active << "\n"
<< "must change: " << mustChange << "\n";
return AuthEntry(userSlice.copyString(), methodSlice.copyString(),
saltSlice.copyString(), hashSlice.copyString(), active,
mustChange);
@ -99,27 +110,25 @@ static AuthEntry CreateAuthEntry(VPackSlice const& slice) {
void AuthInfo::clear() {
_authInfo.clear();
_authCache.clear();
_authInfoLoaded = false;
}
bool AuthInfo::reload() {
insertInitial();
TRI_vocbase_t* vocbase = DatabaseFeature::DATABASE->vocbase();
if (vocbase == nullptr) {
LOG(DEBUG) << "system database is unknown, cannot load authentication "
<< "and authorization information";
return false;
}
LOG(DEBUG) << "starting to load authentication and authorization information";
WRITE_LOCKER(writeLocker, _authInfoLock);
TRI_vocbase_col_t* collection =
TRI_LookupCollectionByNameVocBase(vocbase, TRI_COL_NAME_USERS);
if (collection == nullptr) {
LOG(INFO) << "collection '_users' does not exist, no authentication will "
"be available";
return false;
}
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(vocbase),
collection->_cid, TRI_TRANSACTION_READ);
TRI_COL_NAME_USERS, TRI_TRANSACTION_READ);
int res = trx.begin();
@ -127,40 +136,37 @@ bool AuthInfo::reload() {
return false;
}
auto work = [&](TRI_doc_mptr_t const* ptr) {
VPackSlice slice(ptr->vpack());
AuthEntry auth = CreateAuthEntry(slice);
OperationResult users =
trx.all(TRI_COL_NAME_USERS, 0, UINT64_MAX, OperationOptions());
if (auth.isActive()) {
_authInfo[auth.username()] = auth;
}
trx.finish(users.code);
if (users.failed()) {
LOG(ERR) << "cannot read users from _users collection";
return false;
}
auto usersSlice = users.slice();
if (!usersSlice.isArray()) {
LOG(ERR) << "cannot read users from _users collection";
return false;
}
return true;
};
clear();
trx.invokeOnAllElements(collection->_name, work);
if (usersSlice.length() == 0) {
insertInitial();
} else {
trx.finish(TRI_ERROR_NO_ERROR);
for (VPackSlice const& userSlice : VPackArrayIterator(usersSlice)) {
AuthEntry auth = CreateAuthEntry(userSlice);
_authInfoLoaded = true;
return true;
}
bool AuthInfo::populate(VPackSlice const& slice) {
TRI_ASSERT(slice.isArray());
WRITE_LOCKER(writeLocker, _authInfoLock);
clear();
for (VPackSlice const& authSlice : VPackArrayIterator(slice)) {
AuthEntry auth = CreateAuthEntry(authSlice);
if (auth.isActive()) {
_authInfo.emplace(auth.username(), auth);
}
if (auth.isActive()) {
_authInfo[auth.username()] = auth;
}
};
}
return true;
@ -206,7 +212,29 @@ AuthResult AuthInfo::checkAuthentication(std::string const& authorizationField,
return AuthResult();
}
bool AuthInfo::insertInitial() {
bool AuthInfo::populate(VPackSlice const& slice) {
TRI_ASSERT(slice.isArray());
WRITE_LOCKER(writeLocker, _authInfoLock);
clear();
for (VPackSlice const& authSlice : VPackArrayIterator(slice)) {
AuthEntry auth = CreateAuthEntry(authSlice);
if (auth.isActive()) {
_authInfo.emplace(auth.username(), auth);
}
}
return true;
}
void AuthInfo::insertInitial() {
if (!_authInfo.empty()) {
return;
}
try {
VPackBuilder builder;
builder.openArray();
@ -234,7 +262,7 @@ bool AuthInfo::insertInitial() {
builder.add("active", VPackValue(true));
builder.add("databases", VPackValue(VPackValueType::Array));
builder.add(VPackValue(TRI_VOC_SYSTEM_DATABASE));
builder.add(VPackValue("*"));
builder.close();
builder.close(); // authData
@ -242,13 +270,9 @@ bool AuthInfo::insertInitial() {
builder.close(); // The Array
populate(builder.slice());
return true;
} catch (...) {
// No action
}
// We get here only through error
return false;
}
#if 0

View File

@ -103,11 +103,13 @@ class AuthInfo {
char const* databaseName);
bool reload();
bool insertInitial();
bool populate(velocypack::Slice const& slice);
private:
void clear();
bool populate(velocypack::Slice const& slice);
void insertInitial();
std::string checkCache(std::string const& authorizationField,
bool* mustChange);
@ -115,7 +117,6 @@ class AuthInfo {
std::unordered_map<std::string, arangodb::AuthEntry> _authInfo;
std::unordered_map<std::string, arangodb::AuthCache> _authCache;
basics::ReadWriteLock _authInfoLock;
bool _authInfoLoaded = false;
};
}

View File

@ -1363,9 +1363,6 @@ int TRI_InitDatabasesServer(TRI_server_t* server) {
TRI_ASSERT(vocbase != nullptr);
TRI_ASSERT(vocbase->_type == TRI_VOCBASE_TYPE_NORMAL);
// initialize the authentication data for the database
RestServerFeature::AUTH_INFO->reload();
// start the compactor for the database
TRI_StartCompactorVocBase(vocbase);
@ -1618,7 +1615,6 @@ int TRI_CreateDatabaseServer(TRI_server_t* server, TRI_voc_tick_t databaseId,
CreateApplicationDirectory(vocbase->_name, appPath.c_str());
if (!arangodb::wal::LogfileManager::instance()->isInRecovery()) {
RestServerFeature::AUTH_INFO->reload();
TRI_StartCompactorVocBase(vocbase);
// start the replication applier
@ -1986,7 +1982,7 @@ int TRI_GetUserDatabasesServer(TRI_server_t* server, char const* username,
char const* dbName = p.second->_name;
TRI_ASSERT(dbName != nullptr);
if (!RestServerFeature::AUTH_INFO->canUseDatabase(username, dbName)) {
if (!RestServerFeature::AUTH_INFO.canUseDatabase(username, dbName)) {
// user cannot see database
continue;
}