1
0
Fork 0

Add load balancing support to cursor API in 3.3 (#5797)

This commit is contained in:
Dan Larkin-York 2018-07-24 03:56:40 -04:00 committed by Jan
parent 9a639e0d48
commit 1897fd029f
27 changed files with 1354 additions and 244 deletions

View File

@ -1,6 +1,13 @@
v3.3.13 (XXXX-XX-XX)
--------------------
* Added load balancer support and user-restriction to cursor API.
If a cursor is accessed on a different coordinator than where it was created,
the requests will be forwarded to the correct coordinator. If a cursor is
accessed by a different user than the one who created it, the request will
be denied.
* keep failed follower in followers list in Plan.
This increases the changes of a failed follower getting back into sync if the
@ -21,7 +28,7 @@ v3.3.13 (XXXX-XX-XX)
* fixed issue #5827: Batch request handling incompatible with .NET's default
ContentType format
* fixed agency's log compaction for internal issue #2249
* fixed agency's log compaction for internal issue #2249
* inspector collects additionally disk data size and storage engine statistics

View File

@ -11,20 +11,20 @@ can optionally be SSL-encrypted.
ArangoDB uses the standard HTTP methods (e.g. *GET*, *POST*, *PUT*, *DELETE*) plus
the *PATCH* method described in [RFC 5789](http://tools.ietf.org/html/rfc5789).
Most server APIs expect clients to send any payload data in [JSON](http://www.json.org)
Most server APIs expect clients to send any payload data in [JSON](http://www.json.org)
format. Details on the expected format and JSON attributes can be found in the
documentation of the individual server methods.
Clients sending requests to ArangoDB must use either HTTP 1.0 or HTTP 1.1.
Other HTTP versions are not supported by ArangoDB and any attempt to send
Other HTTP versions are not supported by ArangoDB and any attempt to send
a different HTTP version signature will result in the server responding with
an HTTP 505 (HTTP version not supported) error.
ArangoDB will always respond to client requests with HTTP 1.1. Clients
should therefore support HTTP version 1.1.
should therefore support HTTP version 1.1.
Clients are required to include the *Content-Length* HTTP header with the
correct content length in every request that can have a body (e.g. *POST*,
Clients are required to include the *Content-Length* HTTP header with the
correct content length in every request that can have a body (e.g. *POST*,
*PUT* or *PATCH*) request. ArangoDB will not process requests without a
*Content-Length* header - thus chunked transfer encoding for POST-documents
is not supported.
@ -33,10 +33,10 @@ HTTP Keep-Alive
---------------
ArangoDB supports HTTP keep-alive. If the client does not send a *Connection*
header in its request, and the client uses HTTP version 1.1, ArangoDB will assume
the client wants to keep alive the connection.
header in its request, and the client uses HTTP version 1.1, ArangoDB will assume
the client wants to keep alive the connection.
If clients do not wish to use the keep-alive feature, they should
explicitly indicate that by sending a *Connection: Close* HTTP header in
explicitly indicate that by sending a *Connection: Close* HTTP header in
the request.
ArangoDB will close connections automatically for clients that send requests
@ -58,24 +58,24 @@ Blocking vs. Non-blocking HTTP Requests
ArangoDB supports both blocking and non-blocking HTTP requests.
ArangoDB is a multi-threaded server, allowing the processing of multiple
client requests at the same time. Request/response handling and the actual
ArangoDB is a multi-threaded server, allowing the processing of multiple
client requests at the same time. Request/response handling and the actual
work are performed on the server in parallel by multiple worker threads.
Still, clients need to wait for their requests to be processed by the server,
and thus keep one connection of a pool occupied.
By default, the server will fully process an incoming request and then return
the result to the client when the operation is finished. The client must
wait for the server's HTTP response before it can send additional requests over
the same connection. For clients that are single-threaded and/or are
blocking on I/O themselves, waiting idle for the server response may be
By default, the server will fully process an incoming request and then return
the result to the client when the operation is finished. The client must
wait for the server's HTTP response before it can send additional requests over
the same connection. For clients that are single-threaded and/or are
blocking on I/O themselves, waiting idle for the server response may be
non-optimal.
To reduce blocking on the client side, ArangoDB offers a generic mechanism for
non-blocking, asynchronous execution: clients can add the
HTTP header *x-arango-async: true* to any of their requests, marking
them as to be executed asynchronously on the server. ArangoDB will put such
requests into an in-memory task queue and return an *HTTP 202* (accepted)
HTTP header *x-arango-async: true* to any of their requests, marking
them as to be executed asynchronously on the server. ArangoDB will put such
requests into an in-memory task queue and return an *HTTP 202* (accepted)
response to the client instantly and thus finish this HTTP-request.
The server will execute the tasks from the queue asynchronously as fast
as possible, while clients can continue to do other work.
@ -84,23 +84,23 @@ option ["--scheduler.maximal-queue-size"](../../Manual/Administration/Configurat
then the request will be rejected instantly with an *HTTP 500* (internal
server error) response.
Asynchronous execution decouples the request/response handling from the actual
work to be performed, allowing fast server responses and greatly reducing wait
time for clients. Overall this allows for much higher throughput than if
Asynchronous execution decouples the request/response handling from the actual
work to be performed, allowing fast server responses and greatly reducing wait
time for clients. Overall this allows for much higher throughput than if
clients would always wait for the server's response.
Keep in mind that the asynchronous execution is just "fire and forget".
Clients will get any of their asynchronous requests answered with a generic
HTTP 202 response. At the time the server sends this response, it does not
know whether the requested operation can be carried out successfully (the
actual operation execution will happen at some later point). Clients therefore
cannot make a decision based on the server response and must rely on their
Keep in mind that the asynchronous execution is just "fire and forget".
Clients will get any of their asynchronous requests answered with a generic
HTTP 202 response. At the time the server sends this response, it does not
know whether the requested operation can be carried out successfully (the
actual operation execution will happen at some later point). Clients therefore
cannot make a decision based on the server response and must rely on their
requests being valid and processable by the server.
Additionally, the server's asynchronous task queue is an in-memory data
structure, meaning not-yet processed tasks from the queue might be lost in
case of a crash. Clients should therefore not use the asynchronous feature
when they have strict durability requirements or if they rely on the immediate
Additionally, the server's asynchronous task queue is an in-memory data
structure, meaning not-yet processed tasks from the queue might be lost in
case of a crash. Clients should therefore not use the asynchronous feature
when they have strict durability requirements or if they rely on the immediate
result of the request they send.
For details on the subsequent processing
@ -124,17 +124,17 @@ The response to an HTTP OPTIONS request will be generic and not expose any priva
There is an additional option to control authentication for custom Foxx apps. The option
[--server.authentication-system-only](../../Manual/Administration/Configuration/GeneralArangod.html)
controls whether authentication is required only for requests to the internal database APIs and the admin interface.
controls whether authentication is required only for requests to the internal database APIs and the admin interface.
It is turned on by default, meaning that other APIs (this includes custom Foxx apps) do not require authentication.
The default values allow exposing a public custom Foxx API built with ArangoDB to the outside
world without the need for HTTP authentication, but still protecting the usage of the
internal database APIs (i.e. */_api/*, */_admin/*) with HTTP authentication.
If the server is started with the *--server.authentication-system-only* option set
to *false*, all incoming requests will need HTTP authentication if the server is configured
to require HTTP authentication (i.e. *--server.authentication true*).
Setting the option to *true* will make the server require authentication only for requests to the
If the server is started with the *--server.authentication-system-only* option set
to *false*, all incoming requests will need HTTP authentication if the server is configured
to require HTTP authentication (i.e. *--server.authentication true*).
Setting the option to *true* will make the server require authentication only for requests to the
internal database APIs and will allow unauthenticated requests to all other URLs.
Here's a short summary:
@ -146,12 +146,12 @@ Here's a short summary:
authentication for all requests (including custom Foxx apps).
* `--server.authentication false`: authentication disabled for all requests
Whenever authentication is required and the client has not yet authenticated,
Whenever authentication is required and the client has not yet authenticated,
ArangoDB will return *HTTP 401* (Unauthorized). It will also send the *WWW-Authenticate*
response header, indicating that the client should prompt the user for username and
response header, indicating that the client should prompt the user for username and
password if supported. If the client is a browser, then sending back this header will
normally trigger the display of the browser-side HTTP authentication dialog.
As showing the browser HTTP authentication dialog is undesired in AJAX requests,
As showing the browser HTTP authentication dialog is undesired in AJAX requests,
ArangoDB can be told to not send the *WWW-Authenticate* header back to the client.
Whenever a client sends the *X-Omit-WWW-Authenticate* HTTP header (with an arbitrary value)
to ArangoDB, ArangoDB will only send status code 401, but no *WWW-Authenticate* header.
@ -194,7 +194,7 @@ Error Handling
The following should be noted about how ArangoDB handles client errors in its
HTTP layer:
* client requests using an HTTP version signature different than *HTTP/1.0* or
* client requests using an HTTP version signature different than *HTTP/1.0* or
*HTTP/1.1* will get an *HTTP 505* (HTTP version not supported) error in return.
* ArangoDB will reject client requests with a negative value in the
*Content-Length* request header with *HTTP 411* (Length Required).
@ -356,11 +356,35 @@ PUT, DELETE, PATCH) of a request using one of the following custom HTTP headers:
* *x-http-method*
* *x-method-override*
This allows using HTTP clients that do not support all "common" HTTP methods such as
PUT, PATCH and DELETE. It also allows bypassing proxies and tools that would otherwise
just let certain types of requests (e.g. GET and POST) pass through.
This allows using HTTP clients that do not support all "common" HTTP methods such as
PUT, PATCH and DELETE. It also allows bypassing proxies and tools that would otherwise
just let certain types of requests (e.g. GET and POST) pass through.
Enabling this option may impose a security risk, so it should only be used in very
controlled environments. Thus the default value for this option is *false* (no method
Enabling this option may impose a security risk, so it should only be used in very
controlled environments. Thus the default value for this option is *false* (no method
overriding allowed). You need to enable it explicitly if you want to use this
feature.
Load-balancer support
---------------------
When running in cluster mode, ArangoDB exposes some APIs which store request
state data on specific coordinator nodes, and thus subsequent requests which
require access to this state must be served by the coordinator node which owns
this state data. In order to support function behind a load-balancer, ArangoDB
can transparently forward requests within the cluster to the correct node. If a
request is forwarded, the response will contain the following custom HTTP header
whose value will be the ID of the node which actually answered the request:
* *x-arango-request-served-by*
The following APIs may use request forwarding:
* `/_api/cursor`
Note: since forwarding such requests require an additional cluster-internal HTTP
request, they should be avoided when possible for best performance. Typically
this is accomplished either by directing the requests to the correct coordinator
at a client-level or by enabling request "stickiness" on a load balancer. Since
these approaches are not always possible in a given environment, we support the
request forwarding as a fall-back solution.

View File

@ -636,7 +636,7 @@ ClusterCommResult const ClusterComm::wait(
ClusterCommTimeout endTime = TRI_microtime() + timeout;
TRI_ASSERT(timeout >= 0.0);
// if we cannot find the sought operation, we will return the status
// DROPPED. if we get into the timeout while waiting, we will still return
// CL_COMM_TIMEOUT.
@ -1214,8 +1214,9 @@ std::pair<ClusterCommResult*, HttpRequest*> ClusterComm::prepareRequest(std::str
}
void ClusterComm::addAuthorization(std::unordered_map<std::string, std::string>* headers) {
if (_authenticationEnabled) {
headers->emplace("Authorization", _jwtAuthorization);
if (_authenticationEnabled &&
headers->find(StaticStrings::Authorization) == headers->end()) {
headers->emplace(StaticStrings::Authorization, _jwtAuthorization);
}
}

View File

@ -223,6 +223,7 @@ void ClusterInfo::flush() {
loadServers();
loadCurrentDBServers();
loadCurrentCoordinators();
loadCurrentMappings();
loadPlan();
loadCurrent();
}
@ -629,7 +630,7 @@ void ClusterInfo::loadPlan() {
_shardServers.swap(newShardServers);
}
_planProt.doneVersion = storedVersion;
_planProt.isValid = true; // will never be reset to false
_planProt.isValid = true;
} else {
LOG_TOPIC(ERR, Logger::CLUSTER) << "\"Plan\" is not an object in agency";
}
@ -770,7 +771,7 @@ void ClusterInfo::loadCurrent() {
_shardIds.swap(newShardIds);
}
_currentProt.doneVersion = storedVersion;
_currentProt.isValid = true; // will never be reset to false
_currentProt.isValid = true;
} else {
LOG_TOPIC(ERR, Logger::CLUSTER) << "Current is not an object!";
}
@ -1145,7 +1146,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
std::string const name =
arangodb::basics::VelocyPackHelper::getStringValue(json, "name", "");
{
// check if a collection with the same name is already planned
loadPlan();
@ -1207,7 +1208,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
// wait that all followers have created our new collection
if (tmpError.empty() && waitForReplication) {
std::vector<ServerID> plannedServers;
{
READ_LOCKER(readLocker, _planProt.lock);
@ -1260,7 +1260,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
}
return true;
};
// ATTENTION: The following callback calls the above closure in a
// different thread. Nevertheless, the closure accesses some of our
// local variables. Therefore we have to protect all accesses to them
@ -1350,7 +1350,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
// Update our cache:
loadPlan();
}
bool isSmart = false;
VPackSlice smartSlice = json.get("isSmart");
if (smartSlice.isBool() && smartSlice.getBool()) {
@ -2387,7 +2387,7 @@ void ClusterInfo::loadServers() {
_servers.swap(newServers);
_serverAliases.swap(newAliases);
_serversProt.doneVersion = storedVersion;
_serversProt.isValid = true; // will never be reset to false
_serversProt.isValid = true;
}
return;
}
@ -2528,7 +2528,7 @@ void ClusterInfo::loadCurrentCoordinators() {
WRITE_LOCKER(writeLocker, _coordinatorsProt.lock);
_coordinators.swap(newCoordinators);
_coordinatorsProt.doneVersion = storedVersion;
_coordinatorsProt.isValid = true; // will never be reset to false
_coordinatorsProt.isValid = true;
}
return;
}
@ -2542,6 +2542,73 @@ void ClusterInfo::loadCurrentCoordinators() {
<< " body: " << result.body();
}
static std::string const prefixMappings = "Target/MapUniqueToShortID";
void ClusterInfo::loadCurrentMappings() {
++_mappingsProt.wantedVersion; // Indicate that after *NOW* somebody
// has to reread from the agency!
MUTEX_LOCKER(mutexLocker, _mappingsProt.mutex);
uint64_t storedVersion = _mappingsProt.wantedVersion; // this is the
// version we will
// set in the end
if (_mappingsProt.doneVersion == storedVersion) {
// Somebody else did, what we intended to do, so just return
return;
}
// Now contact the agency:
AgencyCommResult result = _agency.getValues(prefixMappings);
if (result.successful()) {
velocypack::Slice mappings =
result.slice()[0].get(std::vector<std::string>(
{AgencyCommManager::path(), "Target", "MapUniqueToShortID"}));
if (mappings.isObject()) {
decltype(_coordinatorIdMap) newCoordinatorIdMap;
decltype(_dbserverIdMap) newDBServerIdMap;
decltype(_nameMap) newNameMap;
for (auto const& mapping : VPackObjectIterator(mappings)) {
ServerID fullId = mapping.key.copyString();
auto mapObject = mapping.value;
if (mapObject.isObject()) {
ServerShortName shortName = mapObject.get("ShortName").copyString();
newNameMap.emplace(shortName, fullId);
ServerShortID shortId = mapObject.get("TransactionID").getNumericValue<ServerShortID>();
static std::string const expectedPrefix{"Coordinator"};
if (shortName.size() > expectedPrefix.size() &&
shortName.substr(0, expectedPrefix.size()) == expectedPrefix) {
newCoordinatorIdMap.emplace(shortId, fullId);
} else {
newDBServerIdMap.emplace(shortId, fullId);
}
}
}
// Now set the new value:
{
WRITE_LOCKER(writeLocker, _mappingsProt.lock);
_nameMap.swap(newNameMap);
_coordinatorIdMap.swap(newCoordinatorIdMap);
_dbserverIdMap.swap(newDBServerIdMap);
_mappingsProt.doneVersion = storedVersion;
_mappingsProt.isValid = true;
}
return;
}
}
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Error while loading " << prefixMappings
<< " httpCode: " << result.httpCode()
<< " errorCode: " << result.errorCode()
<< " errorMessage: " << result.errorMessage()
<< " body: " << result.body();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about all DBservers from the agency
/// Usually one does not have to call this directly.
@ -2623,7 +2690,7 @@ void ClusterInfo::loadCurrentDBServers() {
WRITE_LOCKER(writeLocker, _DBServersProt.lock);
_DBServers.swap(newDBServers);
_DBServersProt.doneVersion = storedVersion;
_DBServersProt.isValid = true; // will never be reset to false
_DBServersProt.isValid = true;
}
return;
}
@ -2847,6 +2914,72 @@ std::vector<ServerID> ClusterInfo::getCurrentCoordinators() {
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lookup full coordinator ID from short ID
////////////////////////////////////////////////////////////////////////////////
ServerID ClusterInfo::getCoordinatorByShortID(ServerShortID shortId) {
ServerID result;
if (!_mappingsProt.isValid) {
loadCurrentMappings();
}
// return a consistent state of servers
READ_LOCKER(readLocker, _mappingsProt.lock);
auto it = _coordinatorIdMap.find(shortId);
if (it != _coordinatorIdMap.end()) {
result = it->second;
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lookup full dbserver ID from short ID
////////////////////////////////////////////////////////////////////////////////
ServerID ClusterInfo::getDBServerByShortID(ServerShortID shortId) {
ServerID result;
if (!_mappingsProt.isValid) {
loadCurrentMappings();
}
// return a consistent state of servers
READ_LOCKER(readLocker, _mappingsProt.lock);
auto it = _dbserverIdMap.find(shortId);
if (it != _dbserverIdMap.end()) {
result = it->second;
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lookup full server ID from short name
////////////////////////////////////////////////////////////////////////////////
ServerID ClusterInfo::getServerByShortName(ServerShortName const& shortName) {
ServerID result;
if (!_mappingsProt.isValid) {
loadCurrentMappings();
}
// return a consistent state of servers
READ_LOCKER(readLocker, _mappingsProt.lock);
auto it = _nameMap.find(shortName);
if (it != _nameMap.end()) {
result = it->second;
}
return result;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate plan
//////////////////////////////////////////////////////////////////////////////
@ -2873,6 +3006,17 @@ void ClusterInfo::invalidateCurrentCoordinators() {
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate current mappings
//////////////////////////////////////////////////////////////////////////////
void ClusterInfo::invalidateCurrentMappings() {
{
WRITE_LOCKER(writeLocker, _mappingsProt.lock);
_mappingsProt.isValid = false;
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate current
//////////////////////////////////////////////////////////////////////////////
@ -2891,6 +3035,7 @@ void ClusterInfo::invalidateCurrent() {
_currentProt.isValid = false;
}
invalidateCurrentCoordinators();
invalidateCurrentMappings();
}
//////////////////////////////////////////////////////////////////////////////

View File

@ -47,10 +47,12 @@ class Slice;
class ClusterInfo;
class LogicalCollection;
typedef std::string ServerID; // ID of a server
typedef std::string DatabaseID; // ID/name of a database
typedef std::string CollectionID; // ID of a collection
typedef std::string ShardID; // ID of a shard
typedef std::string ServerID; // ID of a server
typedef std::string DatabaseID; // ID/name of a database
typedef std::string CollectionID; // ID of a collection
typedef std::string ShardID; // ID of a shard
typedef uint32_t ServerShortID; // Short ID of a server
typedef std::string ServerShortName; // Short name of a server
class CollectionInfoCurrent {
friend class ClusterInfo;
@ -255,7 +257,7 @@ class ClusterInfo {
//////////////////////////////////////////////////////////////////////////////
static ClusterInfo* instance();
//////////////////////////////////////////////////////////////////////////////
/// @brief cleanup method which frees cluster-internal shared ptrs on shutdown
//////////////////////////////////////////////////////////////////////////////
@ -306,7 +308,7 @@ class ClusterInfo {
/// @brief ask about a collection
/// If it is not found in the cache, the cache is reloaded once. The second
/// argument can be a collection ID or a collection name (both cluster-wide).
/// if the collection is not found afterwards, this method will throw an
/// if the collection is not found afterwards, this method will throw an
/// exception
//////////////////////////////////////////////////////////////////////////////
@ -438,6 +440,13 @@ class ClusterInfo {
void loadCurrentCoordinators();
//////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the mappings between different IDs/names from the agency
/// Usually one does not have to call this directly.
//////////////////////////////////////////////////////////////////////////////
void loadCurrentMappings();
//////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about all DBservers from the agency
/// Usually one does not have to call this directly.
@ -484,6 +493,24 @@ class ClusterInfo {
std::vector<ServerID> getCurrentCoordinators();
//////////////////////////////////////////////////////////////////////////////
/// @brief lookup a full coordinator ID by short ID
//////////////////////////////////////////////////////////////////////////////
ServerID getCoordinatorByShortID(ServerShortID);
//////////////////////////////////////////////////////////////////////////////
/// @brief lookup a full dbserver ID by short ID
//////////////////////////////////////////////////////////////////////////////
ServerID getDBServerByShortID(ServerShortID);
//////////////////////////////////////////////////////////////////////////////
/// @brief lookup a full server ID by short name
//////////////////////////////////////////////////////////////////////////////
ServerID getServerByShortName(ServerShortName const&);
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate planned
//////////////////////////////////////////////////////////////////////////////
@ -502,6 +529,12 @@ class ClusterInfo {
void invalidateCurrentCoordinators();
//////////////////////////////////////////////////////////////////////////////
/// @brief invalidate current id mappings
//////////////////////////////////////////////////////////////////////////////
void invalidateCurrentMappings();
//////////////////////////////////////////////////////////////////////////////
/// @brief get current "Plan" structure
//////////////////////////////////////////////////////////////////////////////
@ -520,7 +553,7 @@ class ClusterInfo {
std::unordered_map<ServerID, std::string> getServers();
virtual std::unordered_map<ServerID, std::string> getServerAliases();
private:
void loadClusterId();
@ -609,6 +642,12 @@ class ClusterInfo {
_coordinators; // from Current/Coordinators
ProtectionData _coordinatorsProt;
// Mappings between short names/IDs and full server IDs
std::unordered_map<ServerShortID, ServerID> _coordinatorIdMap;
std::unordered_map<ServerShortID, ServerID> _dbserverIdMap;
std::unordered_map<ServerShortName, ServerID> _nameMap;
ProtectionData _mappingsProt;
std::shared_ptr<VPackBuilder> _plan;
std::shared_ptr<VPackBuilder> _current;
@ -692,8 +731,8 @@ class ClusterInfo {
//////////////////////////////////////////////////////////////////////////////
static double const reloadServerListTimeout;
arangodb::Mutex _failedServersMutex;
arangodb::Mutex _failedServersMutex;
std::vector<std::string> _failedServers;
};

View File

@ -205,7 +205,7 @@ ServerState::StateEnum ServerState::stringToState(std::string const& value) {
return STATE_SHUTDOWN;
}
// TODO MAX: do we need to understand other states, too?
return STATE_UNDEFINED;
}
@ -316,17 +316,17 @@ static int LookupLocalInfoToId(std::string const& localInfo,
std::string& description) {
// fetch value at Plan/DBServers
// we need to do this to determine the server's role
std::string const key = "Target/MapLocalToID";
int count = 0;
while (++count <= 600) {
AgencyComm comm;
AgencyCommResult result = comm.getValues(key);
if (!result.successful()) {
std::string const endpoints = AgencyCommManager::MANAGER->endpointsString();
LOG_TOPIC(DEBUG, Logger::STARTUP)
<< "Could not fetch configuration from agency endpoints ("
<< endpoints << "): got status code " << result._statusCode
@ -335,7 +335,7 @@ static int LookupLocalInfoToId(std::string const& localInfo,
VPackSlice slice = result.slice()[0].get(
std::vector<std::string>(
{AgencyCommManager::path(), "Target", "MapLocalToID"}));
if (!slice.isObject()) {
LOG_TOPIC(DEBUG, Logger::STARTUP) << "Target/MapLocalToID corrupt: "
<< "no object.";
@ -377,7 +377,7 @@ bool ServerState::integrateIntoCluster(ServerState::RoleEnum role,
// lookup in agency
// if (found) {
// persist id
// }
// }
// }
// if (id still not set) {
// generate and persist new id
@ -392,7 +392,7 @@ bool ServerState::integrateIntoCluster(ServerState::RoleEnum role,
if (res == TRI_ERROR_NO_ERROR) {
writePersistedId(id);
setId(id);
}
}
}
if (id.empty()) {
@ -414,7 +414,7 @@ bool ServerState::integrateIntoCluster(ServerState::RoleEnum role,
Logger::setRole(roleToString(role)[0]);
_role.store(role, std::memory_order_release);
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "We successfully announced ourselves as "
<< roleToString(role) << " and our id is "
<< id;
@ -437,7 +437,7 @@ std::string ServerState::roleToAgencyKey(ServerState::RoleEnum role) {
return "Coordinator";
case ROLE_SINGLE:
return "Single";
case ROLE_UNDEFINED:
case ROLE_AGENT: {
TRI_ASSERT(false);
@ -466,7 +466,7 @@ std::string ServerState::getUuidFilename() {
}
bool ServerState::hasPersistedId() {
std::string uuidFilename = getUuidFilename();
std::string uuidFilename = getUuidFilename();
return FileUtils::exists(uuidFilename);
}
@ -493,7 +493,7 @@ std::string ServerState::generatePersistedId(RoleEnum const& role) {
}
std::string ServerState::getPersistedId() {
std::string uuidFilename = getUuidFilename();
std::string uuidFilename = getUuidFilename();
std::ifstream ifs(uuidFilename);
std::string id;
@ -562,7 +562,7 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
AgencyReadTransaction readValueTrx(std::vector<std::string>{AgencyCommManager::path(targetIdStr),
AgencyCommManager::path(targetUrl)});
AgencyCommResult result = comm.sendTransactionWithFailover(readValueTrx, 0.0);
if (!result.successful()) {
LOG_TOPIC(WARN, Logger::CLUSTER) << "Couldn't fetch " << targetIdStr
<< " and " << targetUrl;
@ -624,6 +624,7 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
result = comm.sendTransactionWithFailover(trx, 0.0);
if (result.successful()) {
setShortId(num + 1); // save short ID for generating server-specific ticks
return true;
}
sleep(1);
@ -633,6 +634,26 @@ bool ServerState::registerAtAgency(AgencyComm& comm,
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the short server id
////////////////////////////////////////////////////////////////////////////////
uint32_t ServerState::getShortId() {
return _shortId.load(std::memory_order_relaxed);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the short server id
////////////////////////////////////////////////////////////////////////////////
void ServerState::setShortId(uint32_t id) {
if (id == 0) {
return;
}
_shortId.store(id, std::memory_order_relaxed);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the server role
////////////////////////////////////////////////////////////////////////////////
@ -846,7 +867,7 @@ Result ServerState::propagateClusterServerMode(Mode mode) {
builder.add(VPackValue(true));
}
operations.push_back(AgencyOperation("Readonly", AgencyValueOperationType::SET, builder.slice()));
AgencyWriteTransaction readonlyMode(operations);
AgencyComm comm;
AgencyCommResult r = comm.sendTransactionWithFailover(readonlyMode);

View File

@ -53,7 +53,7 @@ class ServerState {
STATE_STOPPED, // primary only
STATE_SHUTDOWN // used by all roles
};
enum class Mode : uint8_t {
DEFAULT = 0,
/// reject all requests
@ -78,12 +78,12 @@ class ServerState {
/// @brief get the string representation of a role
static std::string roleToString(RoleEnum);
static std::string roleToShortString(RoleEnum);
/// @brief get the key for lists of a role in the agency
static std::string roleToAgencyListKey(RoleEnum);
/// @brief get the key for a role in the agency
static std::string roleToAgencyKey(RoleEnum role);
@ -95,25 +95,25 @@ class ServerState {
/// @brief convert a string representation to a state
static StateEnum stringToState(std::string const&);
/// @brief get the string representation of a mode
static std::string modeToString(Mode);
/// @brief convert a string representation to a mode
static Mode stringToMode(std::string const&);
/// @brief sets server mode, returns previously held
/// value (performs atomic read-modify-write operation)
static Mode setServerMode(Mode mode);
/// @brief atomically load current server mode
static Mode serverMode();
/// @brief checks maintenance mode
static bool isMaintenance() {
return serverMode() == Mode::MAINTENANCE;
}
/// @brief should not allow DDL operations / transactions
static bool writeOpsEnabled() {
Mode mode = serverMode();
@ -129,9 +129,9 @@ class ServerState {
/// @brief flush the server state (used for testing)
void flush();
bool isSingleServer() { return isSingleServer(loadRole()); }
static bool isSingleServer(ServerState::RoleEnum role) {
return (role == ServerState::ROLE_SINGLE);
}
@ -153,14 +153,14 @@ class ServerState {
static bool isDBServer(ServerState::RoleEnum role) {
return (role == ServerState::ROLE_PRIMARY);
}
/// @brief whether or not the role is a cluster-related role
static bool isClusterRole(ServerState::RoleEnum role) {
return (role == ServerState::ROLE_PRIMARY ||
role == ServerState::ROLE_COORDINATOR);
}
/// @brief check whether the server is an agent
/// @brief check whether the server is an agent
bool isAgent() { return isAgent(loadRole()); }
/// @brief check whether the server is an agent
@ -170,12 +170,12 @@ class ServerState {
/// @brief check whether the server is running in a cluster
bool isRunningInCluster() { return isClusterRole(loadRole()); }
/// @brief check whether the server is running in a cluster
static bool isRunningInCluster(ServerState::RoleEnum role) {
return isClusterRole(role);
static bool isRunningInCluster(ServerState::RoleEnum role) {
return isClusterRole(role);
}
/// @brief check whether the server is a single server or coordinator
bool isSingleServerOrCoordinator() {
RoleEnum role = loadRole();
@ -184,10 +184,10 @@ class ServerState {
/// @brief get the server role
RoleEnum getRole();
bool integrateIntoCluster(RoleEnum role, std::string const& myAddr,
std::string const& myLocalInfo);
bool unregister();
/// @brief set the server role
@ -195,10 +195,16 @@ class ServerState {
/// @brief get the server id
std::string getId();
/// @brief set the server id
void setId(std::string const&);
/// @brief get the short id
uint32_t getShortId();
/// @brief set the server short id
void setShortId(uint32_t);
/// @brief get the server address
std::string getAddress();
@ -235,7 +241,7 @@ class ServerState {
void setFoxxmaster(std::string const&);
void setFoxxmasterQueueupdate(bool);
bool getFoxxmasterQueueupdate();
std::string getPersistedId();
@ -257,18 +263,21 @@ private:
/// @brief validate a state transition for a coordinator server
bool checkCoordinatorState(StateEnum);
/// @brief register at agency
bool registerAtAgency(AgencyComm&, const RoleEnum&, std::string const&);
/// @brief register shortname for an id
bool registerShortName(std::string const& id, const RoleEnum&);
/// file where the server persists it's UUID
std::string getUuidFilename();
/// @brief the server's id, can be set just once
std::string _id;
/// @brief the server's short id, can be set just once
std::atomic<uint32_t> _shortId;
/// @brief the JavaScript startup path, can be set just once
std::string _javaScriptStartupPath;
@ -291,7 +300,7 @@ private:
bool _initialized;
std::string _foxxmaster;
bool _foxxmasterQueueupdate;
};
}

View File

@ -231,6 +231,13 @@ void GeneralCommTask::executeRequest(
return;
}
// forward to correct server if necessary
bool forwarded = handler->forwardRequest();
if (forwarded) {
addResponse(*handler->response(), handler->stealStatistics());
return;
}
// asynchronous request
if (found && (asyncExec == "true" || asyncExec == "store")) {
RequestStatistics::SET_ASYNC(statistics(messageId));

View File

@ -103,7 +103,7 @@ void HttpCommTask::addResponse(GeneralResponse& baseResponse,
finishExecution(baseResponse);
resetKeepAlive();
// response has been queued, allow further requests
_requestPending = false;
@ -202,7 +202,7 @@ void HttpCommTask::addResponse(GeneralResponse& baseResponse,
// caller must hold the _lock
bool HttpCommTask::processRead(double startTime) {
TRI_ASSERT(_peer->strand.running_in_this_thread());
cancelKeepAlive();
TRI_ASSERT(_readBuffer.c_str() != nullptr);
@ -272,7 +272,7 @@ bool HttpCommTask::processRead(double startTime) {
(std::memcmp(_readBuffer.c_str(), "VST/1.0\r\n\r\n", 11) == 0 ||
std::memcmp(_readBuffer.c_str(), "VST/1.1\r\n\r\n", 11) == 0)) {
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "switching from HTTP to VST";
ProtocolVersion protocolVersion = _readBuffer.c_str()[6] == '0'
ProtocolVersion protocolVersion = _readBuffer.c_str()[6] == '0'
? ProtocolVersion::VST_1_0 : ProtocolVersion::VST_1_1;
// mark task as abandoned, no more reads will happen on _peer
@ -282,7 +282,7 @@ bool HttpCommTask::processRead(double startTime) {
std::shared_ptr<GeneralCommTask> commTask = std::make_shared<VstCommTask>(
_loop, _server, std::move(_peer), std::move(_connectionInfo),
GeneralServerFeature::keepAliveTimeout(),
GeneralServerFeature::keepAliveTimeout(),
protocolVersion, /*skipSocketInit*/ true);
commTask->addToReadBuffer(_readBuffer.c_str() + 11,
_readBuffer.length() - 11);
@ -558,7 +558,7 @@ bool HttpCommTask::processRead(double startTime) {
// .............................................................................
// CORS
// .............................................................................
// OPTIONS requests currently go unauthenticated
if (isOptionsRequest) {
// handle HTTP OPTIONS requests directly
@ -566,23 +566,23 @@ bool HttpCommTask::processRead(double startTime) {
_incompleteRequest.reset(nullptr);
return true;
}
// .............................................................................
// authenticate
// .............................................................................
// first scrape the auth headers and try to determine and authenticate the user
rest::ResponseCode authResult = handleAuthHeader(_incompleteRequest.get());
// authenticated
// authenticated
if (authResult != rest::ResponseCode::SERVER_ERROR) {
// prepare execution will send an error message
RequestFlow cont = prepareExecution(*_incompleteRequest.get());
if (cont == RequestFlow::Continue) {
processRequest(std::move(_incompleteRequest));
}
} else {
std::string realm = "Bearer token_type=\"JWT\", realm=\"ArangoDB\"";
HttpResponse resp(rest::ResponseCode::UNAUTHORIZED, leaseStringBuffer(0));
@ -612,7 +612,7 @@ void HttpCommTask::processRequest(std::unique_ptr<HttpRequest> request) {
<< (StringUtils::escapeUnicode(body)) << "\"";
}
}
// create a handler and execute
auto resp = std::make_unique<HttpResponse>(rest::ResponseCode::SERVER_ERROR,
leaseStringBuffer(1024));
@ -763,22 +763,24 @@ ResponseCode HttpCommTask::handleAuthHeader(HttpRequest* request) const {
bool found;
std::string const& authStr =
request->header(StaticStrings::Authorization, found);
if (!found) {
events::CredentialsMissing(request);
return rest::ResponseCode::UNAUTHORIZED;
}
size_t methodPos = authStr.find_first_of(' ');
if (methodPos != std::string::npos) {
// skip over authentication method
char const* auth = authStr.c_str() + methodPos;
while (*auth == ' ') {
++auth;
}
LOG_TOPIC(DEBUG, arangodb::Logger::REQUESTS) << "Authorization header: " << authStr;
LOG_TOPIC(DEBUG, arangodb::Logger::REQUESTS) <<
"\"authorization-header\",\"" << (void*)this << "\",\""
<< authStr << "\"";
try {
// note that these methods may throw in case of an error
AuthenticationMethod authMethod = AuthenticationMethod::NONE;
@ -787,7 +789,7 @@ ResponseCode HttpCommTask::handleAuthHeader(HttpRequest* request) const {
} else if (TRI_CaseEqualString(authStr.c_str(), "bearer ", 7)) {
authMethod = AuthenticationMethod::JWT;
}
if (authMethod != AuthenticationMethod::NONE) {
request->setAuthenticationMethod(authMethod);
if (_auth->isActive()) {
@ -797,7 +799,7 @@ ResponseCode HttpCommTask::handleAuthHeader(HttpRequest* request) const {
} else {
request->setAuthenticated(true);
}
if (request->authenticated()) {
events::Authenticated(request, authMethod);
return rest::ResponseCode::OK;
@ -805,7 +807,7 @@ ResponseCode HttpCommTask::handleAuthHeader(HttpRequest* request) const {
events::CredentialsBad(request, authMethod);
return rest::ResponseCode::UNAUTHORIZED;
}
// intentionally falls through
} catch (arangodb::basics::Exception const& ex) {
// translate error
@ -817,7 +819,7 @@ ResponseCode HttpCommTask::handleAuthHeader(HttpRequest* request) const {
return rest::ResponseCode::SERVER_ERROR;
}
}
events::UnknownAuthenticationMethod(request);
return rest::ResponseCode::UNAUTHORIZED;
}

View File

@ -26,6 +26,10 @@
#include <velocypack/Exception.h>
#include "Basics/StringUtils.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "GeneralServer/GeneralCommTask.h"
#include "Logger/Logger.h"
#include "Rest/GeneralRequest.h"
@ -67,7 +71,7 @@ RestHandler::RestHandler(GeneralRequest* request, GeneralResponse* response)
RestHandler::~RestHandler() {
RequestStatistics* stat = _statistics.exchange(nullptr);
if (stat != nullptr) {
stat->release();
}
@ -100,15 +104,173 @@ void RestHandler::setStatistics(RequestStatistics* stat) {
}
}
bool RestHandler::forwardRequest() {
// TODO refactor into a more general/customizable method
//
// The below is mostly copied and only lightly modified from
// RestReplicationHandler::handleTrampolineCoordinator; however, that method
// needs some more specific checks regarding headers and param values, so we
// can't just reuse this method there. Maybe we just need to implement some
// virtual methods to handle param/header filtering?
// TODO verify that vst -> http -> vst conversion works correctly
// TODO verify that async requests work correctly
uint32_t shortId = forwardingTarget();
if (shortId == 0) {
// no need to actually forward
return false;
}
std::string serverId =
ClusterInfo::instance()->getCoordinatorByShortID(shortId);
if ("" == serverId) {
// no mapping in agency, try to handle the request here
return false;
}
LOG_TOPIC(DEBUG, Logger::REQUESTS) <<
"forwarding request " << _request->messageId() << " to " << serverId;
bool useVst = false;
if (_request->transportType() == Endpoint::TransportType::VST) {
useVst = true;
}
std::string const& dbname = _request->databaseName();
std::unordered_map<std::string, std::string> const& oldHeaders =
_request->headers();
std::unordered_map<std::string, std::string>::const_iterator it =
oldHeaders.begin();
std::unordered_map<std::string, std::string> headers;
while (it != oldHeaders.end()) {
std::string const& key = (*it).first;
// ignore the following headers
if (key != StaticStrings::Authorization) {
headers.emplace(key, (*it).second);
}
++it;
}
auto auth = AuthenticationFeature::instance();
if (auth != nullptr && auth->isActive()) {
VPackBuilder builder;
{
VPackObjectBuilder payload{&builder};
payload->add("preferred_username", VPackValue(_request->user()));
}
VPackSlice slice = builder.slice();
headers.emplace(StaticStrings::Authorization,
"bearer " + auth->tokenCache()->generateJwt(slice));
}
auto& values = _request->values();
std::string params;
for (auto const& i : values) {
if (params.empty()) {
params.push_back('?');
} else {
params.push_back('&');
}
params.append(StringUtils::urlEncode(i.first));
params.push_back('=');
params.append(StringUtils::urlEncode(i.second));
}
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE,
TRI_ERROR_SHUTTING_DOWN, "shutting down server");
return true;
}
std::unique_ptr<ClusterCommResult> res;
if (!useVst) {
HttpRequest* httpRequest = dynamic_cast<HttpRequest*>(_request.get());
if (httpRequest == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid request type");
}
// Send a synchronous request to that shard using ClusterComm:
res = cc->syncRequest("", TRI_NewTickServer(), "server:" + serverId,
_request->requestType(),
"/_db/" + StringUtils::urlEncode(dbname) +
_request->requestPath() + params,
httpRequest->body(), headers, 300.0);
} else {
// do we need to handle multiple payloads here? - TODO
// here we switch from vst to http
res = cc->syncRequest("", TRI_NewTickServer(), "server:" + serverId,
_request->requestType(),
"/_db/" + StringUtils::urlEncode(dbname) +
_request->requestPath() + params,
_request->payload().toJson(), headers, 300.0);
}
if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up:
generateError(rest::ResponseCode::BAD, TRI_ERROR_CLUSTER_TIMEOUT,
"timeout within cluster");
return true;
}
if (res->status == CL_COMM_BACKEND_UNAVAILABLE) {
// there is no result
generateError(rest::ResponseCode::BAD, TRI_ERROR_CLUSTER_CONNECTION_LOST,
"lost connection within cluster");
return true;
}
if (res->status == CL_COMM_ERROR) {
// This could be a broken connection or an Http error:
TRI_ASSERT(nullptr != res->result && res->result->isComplete());
// In this case a proper HTTP error was reported by the DBserver,
// we simply forward the result. Intentionally fall through here.
}
bool dummy;
resetResponse(
static_cast<rest::ResponseCode>(res->result->getHttpReturnCode()));
_response->setContentType(
res->result->getHeaderField(StaticStrings::ContentTypeHeader, dummy));
if (!useVst) {
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (_response == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
httpResponse->body().swap(&(res->result->getBody()));
} else {
// need to switch back from http to vst
std::shared_ptr<VPackBuilder> builder = res->result->getBodyVelocyPack();
std::shared_ptr<VPackBuffer<uint8_t>> buf = builder->steal();
_response->setPayload(std::move(*buf),
true);
}
auto const& resultHeaders = res->result->getHeaderFields();
for (auto const& it : resultHeaders) {
_response->setHeader(it.first, it.second);
}
_response->setHeader(StaticStrings::RequestForwardedTo, serverId);
return true;
}
void RestHandler::runHandlerStateMachine() {
TRI_ASSERT(_callback);
while (true) {
switch (_state) {
case HandlerState::PREPARE:
this->prepareEngine();
break;
case HandlerState::EXECUTE: {
int res = this->executeEngine();
if (res != TRI_ERROR_NO_ERROR) {
@ -120,18 +282,18 @@ void RestHandler::runHandlerStateMachine() {
}
break;
}
case HandlerState::PAUSED:
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "Resuming rest handler execution";
TRI_ASSERT(_response != nullptr);
_callback(this);
_state = HandlerState::FINALIZE;
break;
case HandlerState::FINALIZE:
this->finalizeEngine();
break;
case HandlerState::DONE:
case HandlerState::FAILED:
return;
@ -220,7 +382,7 @@ int RestHandler::finalizeEngine() {
_state = HandlerState::FAILED;
_callback(this);
}
return res;
}
@ -292,6 +454,59 @@ int RestHandler::executeEngine() {
return TRI_ERROR_INTERNAL;
}
void RestHandler::generateError(rest::ResponseCode code, int errorNumber,
std::string const& message) {
resetResponse(code);
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
try {
builder.add(VPackValue(VPackValueType::Object));
builder.add(StaticStrings::Error, VPackValue(true));
builder.add(StaticStrings::ErrorMessage, VPackValue(message));
builder.add(StaticStrings::Code,
VPackValue(static_cast<int>(code)));
builder.add(StaticStrings::ErrorNum,
VPackValue(errorNumber));
builder.close();
VPackOptions options(VPackOptions::Defaults);
options.escapeUnicode = true;
try {
TRI_ASSERT(options.escapeUnicode);
if (_request != nullptr) {
_response->setContentType(_request->contentTypeResponse());
}
_response->setPayload(std::move(buffer), true, options);
} catch (...) {
// exception while generating error
}
} catch (...) {
// exception while generating error
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generates an error
////////////////////////////////////////////////////////////////////////////////
void RestHandler::generateError(rest::ResponseCode code, int errorCode) {
char const* message = TRI_errno_string(errorCode);
if (message != nullptr) {
generateError(code, errorCode, std::string(message));
} else {
generateError(code, errorCode, std::string("unknown error"));
}
}
// generates an error
void RestHandler::generateError(arangodb::Result const& r) {
ResponseCode code = GeneralResponse::responseCode(r.errorNumber());
generateError(code, r.errorNumber(), r.errorMessage());
}
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------

View File

@ -32,13 +32,13 @@
namespace arangodb {
class GeneralRequest;
class RequestStatistics;
enum class RestStatus { DONE, WAITING, FAIL};
namespace rest {
class RestHandler : public std::enable_shared_from_this<RestHandler> {
friend class GeneralCommTask;
RestHandler(RestHandler const&) = delete;
RestHandler& operator=(RestHandler const&) = delete;
@ -66,7 +66,7 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
RequestStatistics* stealStatistics() {
return _statistics.exchange(nullptr);
}
void setStatistics(RequestStatistics* stat);
/// Execute the rest handler state machine
@ -74,14 +74,17 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
TRI_ASSERT(_state == HandlerState::PAUSED);
runHandlerStateMachine();
}
/// Execute the rest handler state machine
void runHandler(std::function<void(rest::RestHandler*)> cb) {
TRI_ASSERT(_state == HandlerState::PREPARE);
_callback = std::move(cb);
runHandlerStateMachine();
}
/// @brief forwards the request to the appropriate server
bool forwardRequest();
public:
/// @brief rest handler name
virtual char const* name() const = 0;
@ -104,15 +107,33 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
virtual void handleError(basics::Exception const&) = 0;
protected:
/// @brief determines the possible forwarding target for this request
///
/// This method will be called to determine if the request should be
/// forwarded to another server, and if so, which server. If it should be
/// handled by this server, the method should return 0. Otherwise, this
/// method should return a valid (non-zero) short ID (TransactionID) for the
/// target server.
virtual uint32_t forwardingTarget() { return 0; }
void resetResponse(rest::ResponseCode);
// generates an error
void generateError(rest::ResponseCode, int, std::string const&);
// generates an error
void generateError(rest::ResponseCode, int);
// generates an error
void generateError(arangodb::Result const&);
private:
enum class HandlerState { PREPARE, EXECUTE, PAUSED, FINALIZE, DONE, FAILED };
void runHandlerStateMachine();
int prepareEngine();
int executeEngine();
int finalizeEngine();

View File

@ -103,7 +103,7 @@ void RestBaseHandler::generateResult(
void RestBaseHandler::generateOk(rest::ResponseCode code,
VPackSlice const& payload) {
resetResponse(code);
try {
VPackBuffer<uint8_t> buffer;
VPackBuilder tmp(buffer);
@ -112,7 +112,7 @@ void RestBaseHandler::generateOk(rest::ResponseCode code,
tmp.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
tmp.add("result", payload);
tmp.close();
VPackOptions options(VPackOptions::Defaults);
options.escapeUnicode = true;
writeResult(std::move(buffer), options);
@ -124,16 +124,16 @@ void RestBaseHandler::generateOk(rest::ResponseCode code,
void RestBaseHandler::generateOk(rest::ResponseCode code,
VPackBuilder const& payload) {
resetResponse(code);
try {
VPackBuilder tmp;
tmp.add(VPackValue(VPackValueType::Object));
tmp.add(StaticStrings::Error, VPackValue(false));
tmp.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
tmp.close();
tmp = VPackCollection::merge(tmp.slice(), payload.slice(), false);
VPackOptions options(VPackOptions::Defaults);
options.escapeUnicode = true;
writeResult(tmp.slice(), options);
@ -142,57 +142,6 @@ void RestBaseHandler::generateOk(rest::ResponseCode code,
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generates an error
////////////////////////////////////////////////////////////////////////////////
void RestBaseHandler::generateError(rest::ResponseCode code, int errorCode) {
char const* message = TRI_errno_string(errorCode);
if (message != nullptr) {
generateError(code, errorCode, std::string(message));
} else {
generateError(code, errorCode, std::string("unknown error"));
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generates an error
////////////////////////////////////////////////////////////////////////////////
void RestBaseHandler::generateError(rest::ResponseCode code, int errorCode,
std::string const& message) {
resetResponse(code);
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
try {
builder.add(VPackValue(VPackValueType::Object));
builder.add(StaticStrings::Error, VPackValue(true));
if (message.empty()) {
// prevent empty error messages
builder.add(StaticStrings::ErrorMessage, VPackValue(TRI_errno_string(errorCode)));
} else {
builder.add(StaticStrings::ErrorMessage, VPackValue(message));
}
builder.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
builder.add(StaticStrings::ErrorNum, VPackValue(errorCode));
builder.close();
VPackOptions options(VPackOptions::Defaults);
options.escapeUnicode = true;
writeResult(std::move(buffer), options);
} catch (...) {
// Building the error response failed
}
}
// generates an error
void RestBaseHandler::generateError(arangodb::Result const& r) {
ResponseCode code = GeneralResponse::responseCode(r.errorNumber());
generateError(code, r.errorNumber(), r.errorMessage());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generates a cancel message
////////////////////////////////////////////////////////////////////////////////

View File

@ -58,22 +58,13 @@ class RestBaseHandler : public rest::RestHandler {
template <typename Payload>
void generateResult(rest::ResponseCode, Payload&&,
std::shared_ptr<transaction::Context> context);
/// convenience function akin to generateError,
/// renders payload in 'result' field
void generateOk(rest::ResponseCode, velocypack::Slice const&);
void generateOk(rest::ResponseCode, velocypack::Builder const&);
// generates an error
void generateError(rest::ResponseCode, int);
// generates an error
void generateError(rest::ResponseCode, int, std::string const&);
// generates an error
void generateError(arangodb::Result const&);
// generates a canceled message
void generateCanceled();

View File

@ -33,6 +33,7 @@
#include "Utils/Cursor.h"
#include "Utils/CursorRepository.h"
#include "Transaction/Context.h"
#include "VocBase/ticks.h"
#include <velocypack/Iterator.h>
#include <velocypack/Value.h>
@ -53,11 +54,11 @@ RestCursorHandler::RestCursorHandler(
_isValidForFinalize(false) {}
// returns the queue name
size_t RestCursorHandler::queue() const {
size_t RestCursorHandler::queue() const {
if (ServerState::instance()->isCoordinator()) {
return JobQueue::BACKGROUND_QUEUE;
return JobQueue::BACKGROUND_QUEUE;
}
return JobQueue::STANDARD_QUEUE;
return JobQueue::STANDARD_QUEUE;
}
RestStatus RestCursorHandler::execute() {
@ -123,7 +124,7 @@ void RestCursorHandler::processQuery(VPackSlice const& slice) {
auto options = std::make_shared<VPackBuilder>(buildOptions(slice));
VPackValueLength l;
char const* queryString = querySlice.getString(l);
if (l == 0) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_EMPTY);
}
@ -250,6 +251,26 @@ void RestCursorHandler::processQuery(VPackSlice const& slice) {
}
}
/// @brief returns the short id of the server which should handle this request
uint32_t RestCursorHandler::forwardingTarget() {
rest::RequestType const type = _request->requestType();
if (type != rest::RequestType::PUT && type != rest::RequestType::DELETE_REQ) {
return false;
}
std::vector<std::string> const& suffixes = _request->suffixes();
if (suffixes.size() < 1) {
return false;
}
uint64_t tick = arangodb::basics::StringUtils::uint64(suffixes[0]);
uint32_t sourceServer = TRI_ExtractServerIdFromTick(tick);
return (sourceServer == ServerState::instance()->getShortId())
? 0
: sourceServer;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief register the currently running query
////////////////////////////////////////////////////////////////////////////////
@ -419,7 +440,7 @@ std::shared_ptr<VPackBuilder> RestCursorHandler::buildExtra(
void RestCursorHandler::createCursor() {
if (_request->payload().isEmptyObject()) {
generateError(rest::ResponseCode::BAD, 600);
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_CORRUPTED_JSON);
return;
}
@ -440,7 +461,7 @@ void RestCursorHandler::createCursor() {
// error message generated in parseVelocyPackBody
return;
}
// tell RestCursorHandler::finalizeExecute that the request
// could be parsed successfully and that it may look at it
_isValidForFinalize = true;

View File

@ -73,6 +73,8 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
void processQuery(arangodb::velocypack::Slice const&);
virtual uint32_t forwardingTarget() override;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief register the currently running query
@ -168,13 +170,13 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
//////////////////////////////////////////////////////////////////////////////
bool _queryKilled;
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the finalize operation is allowed to further process
/// the request data. this will not work if the original request cannot be
/// parsed successfully. this is used by RestCursorHandler::finalizeExecute
//////////////////////////////////////////////////////////////////////////////
bool _isValidForFinalize;
};
}

View File

@ -24,12 +24,28 @@
#include "CursorRepository.h"
#include "Basics/MutexLocker.h"
#include "Logger/Logger.h"
#include "Utils/ExecContext.h"
#include "VocBase/ticks.h"
#include "VocBase/vocbase.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
namespace {
bool authorized(std::pair<arangodb::Cursor*, std::string> const& cursor) {
auto context = arangodb::ExecContext::CURRENT;
if (context == nullptr || !arangodb::ExecContext::isAuthEnabled()) {
return true;
}
if (context->isSuperuser()) {
return true;
}
return (cursor.second == context->user());
}
}
using namespace arangodb;
size_t const CursorRepository::MaxCollectCount = 32;
@ -75,7 +91,7 @@ CursorRepository::~CursorRepository() {
MUTEX_LOCKER(mutexLocker, _lock);
for (auto it : _cursors) {
delete it.second;
delete it.second.first;
}
_cursors.clear();
@ -92,10 +108,11 @@ Cursor* CursorRepository::addCursor(std::unique_ptr<Cursor> cursor) {
TRI_ASSERT(cursor->isUsed());
CursorId const id = cursor->id();
std::string user = ExecContext::CURRENT ? ExecContext::CURRENT->user() : "";
{
MUTEX_LOCKER(mutexLocker, _lock);
_cursors.emplace(id, cursor.get());
_cursors.emplace(id, std::make_pair(cursor.get(), user));
}
return cursor.release();
@ -113,7 +130,7 @@ Cursor* CursorRepository::createFromQueryResult(
double ttl, bool count) {
TRI_ASSERT(result.result != nullptr);
CursorId const id = TRI_NewTickServer();
CursorId const id = TRI_NewServerSpecificTick(); // embedded server id
std::unique_ptr<Cursor> cursor;
cursor.reset(new VelocyPackCursor(
@ -134,12 +151,12 @@ bool CursorRepository::remove(CursorId id, Cursor::CursorType type) {
MUTEX_LOCKER(mutexLocker, _lock);
auto it = _cursors.find(id);
if (it == _cursors.end()) {
if (it == _cursors.end() || !::authorized(it->second)) {
// not found
return false;
}
cursor = (*it).second;
cursor = (*it).second.first;
if (cursor->isDeleted()) {
// already deleted
@ -181,12 +198,12 @@ Cursor* CursorRepository::find(CursorId id, Cursor::CursorType type, bool& busy)
MUTEX_LOCKER(mutexLocker, _lock);
auto it = _cursors.find(id);
if (it == _cursors.end()) {
if (it == _cursors.end() || !::authorized(it->second)) {
// not found
return nullptr;
}
cursor = (*it).second;
cursor = (*it).second.first;
if (cursor->isDeleted()) {
// already deleted
@ -240,7 +257,7 @@ bool CursorRepository::containsUsedCursor() {
MUTEX_LOCKER(mutexLocker, _lock);
for (auto it : _cursors) {
if (it.second->isUsed()) {
if (it.second.first->isUsed()) {
return true;
}
}
@ -262,7 +279,7 @@ bool CursorRepository::garbageCollect(bool force) {
MUTEX_LOCKER(mutexLocker, _lock);
for (auto it = _cursors.begin(); it != _cursors.end(); /* no hoisting */) {
auto cursor = (*it).second;
auto cursor = (*it).second.first;
if (cursor->isUsed()) {
// must not destroy used cursors

View File

@ -62,7 +62,7 @@ class CursorRepository {
////////////////////////////////////////////////////////////////////////////////
Cursor* addCursor(std::unique_ptr<Cursor> cursor);
//////////////////////////////////////////////////////////////////////////////
/// @brief creates a cursor and stores it in the registry
/// the cursor will be returned with the usage flag set to true. it must be
@ -123,7 +123,7 @@ class CursorRepository {
/// @brief list of current cursors
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<CursorId, Cursor*> _cursors;
std::unordered_map<CursorId, std::pair<Cursor*, std::string>> _cursors;
//////////////////////////////////////////////////////////////////////////////
/// @brief maximum number of cursors to garbage-collect in one go

View File

@ -23,6 +23,7 @@
#include "ticks.h"
#include "Basics/HybridLogicalClock.h"
#include "Cluster/ServerState.h"
using namespace arangodb;
using namespace arangodb::basics;
@ -55,13 +56,36 @@ void TRI_UpdateTickServer(TRI_voc_tick_t tick) {
auto expected = CurrentTick.load(std::memory_order_relaxed);
// only update global tick if less than the specified value...
while (expected < t &&
!CurrentTick.compare_exchange_weak(expected, t,
std::memory_order_release,
std::memory_order_relaxed)) {
while (expected < t && !CurrentTick.compare_exchange_weak(
expected, t, std::memory_order_release,
std::memory_order_relaxed)) {
expected = CurrentTick.load(std::memory_order_relaxed);
}
}
/// @brief returns the current tick counter
TRI_voc_tick_t TRI_CurrentTickServer() { return CurrentTick; }
/// @brief generates a new tick which also encodes this server's id
TRI_voc_tick_t TRI_NewServerSpecificTick() {
static constexpr uint64_t LowerMask{0x000000FFFFFFFFFF};
static constexpr uint64_t UpperMask{0xFFFFFF0000000000};
static constexpr size_t UpperShift{40};
uint64_t lower = TRI_NewTickServer() & LowerMask;
uint64_t upper =
(static_cast<uint64_t>(ServerState::instance()->getShortId())
<< UpperShift) &
UpperMask;
uint64_t tick = (upper | lower);
return static_cast<TRI_voc_tick_t>(tick);
}
/// @brief extracts the server id from a server-specific tick
uint32_t TRI_ExtractServerIdFromTick(TRI_voc_tick_t tick) {
static constexpr uint64_t Mask{0x0000000000FFFFFF};
static constexpr size_t Shift{40};
uint32_t shortId = static_cast<uint32_t>((tick >> Shift) & Mask);
return shortId;
}

View File

@ -44,4 +44,10 @@ void TRI_UpdateTickServer(TRI_voc_tick_t);
/// @brief returns the current tick counter
TRI_voc_tick_t TRI_CurrentTickServer();
/// @brief generates a new tick which also encodes this server's id
TRI_voc_tick_t TRI_NewServerSpecificTick();
/// @brief extracts the server id from a server-specific tick
uint32_t TRI_ExtractServerIdFromTick(TRI_voc_tick_t);
#endif

View File

@ -0,0 +1,125 @@
/* jshint strict: false, sub: true */
/* global print */
'use strict';
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dan Larkin-York
/// @author Copyright 2018, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
const functionsDocumentation = {
'load_balancing': 'load balancing tests'
};
const optionsDocumentation = [
' - `skipLoadBalancing : testing load_balancing will be skipped.'
];
const tu = require('@arangodb/test-utils');
// const BLUE = require('internal').COLORS.COLOR_BLUE;
const CYAN = require('internal').COLORS.COLOR_CYAN;
// const GREEN = require('internal').COLORS.COLOR_GREEN;
const RED = require('internal').COLORS.COLOR_RED;
const RESET = require('internal').COLORS.COLOR_RESET;
// const YELLOW = require('internal').COLORS.COLOR_YELLOW;
const download = require('internal').download;
const testPaths = {
'load_balancing': 'js/client/tests/load-balancing'
};
////////////////////////////////////////////////////////////////////////////////
/// @brief TEST: load_balancing
////////////////////////////////////////////////////////////////////////////////
function loadBalancingClient (options) {
if (options.skipLoadBalancing === true) {
print('skipping Load Balancing tests!');
return {
load_balancing: {
status: true,
skipped: true
}
};
}
print(CYAN + 'Load Balancing tests...' + RESET);
const excludeAuth = (fn) => { return (fn.indexOf('-auth') === -1); };
const excludeAsync = (fn) => { return (fn.indexOf('-async') === -1); };
let testCases = tu.scanTestPath(testPaths.load_balancing)
.filter(excludeAuth)
.filter(excludeAsync);
options.cluster = true;
if (options.coordinators < 2) {
options.coordinators = 2;
}
return tu.performTests(options, testCases, 'load_balancing', tu.runInArangosh, {
'server.authentication': 'false'
});
}
////////////////////////////////////////////////////////////////////////////////
/// @brief TEST: load_balancing_auth
////////////////////////////////////////////////////////////////////////////////
function loadBalancingAuthClient (options) {
if (options.skipLoadBalancing === true) {
print('skipping Load Balancing tests!');
return {
load_balancing_auth: {
status: true,
skipped: true
}
};
}
print(CYAN + 'Load Balancing with Authentication tests...' + RESET);
const excludeNoAuth = (fn) => { return (fn.indexOf('-noauth') === -1); };
const excludeAsync = (fn) => { return (fn.indexOf('-async') === -1); };
let testCases = tu.scanTestPath(testPaths.load_balancing)
.filter(excludeNoAuth)
.filter(excludeAsync);
options.cluster = true;
if (options.coordinators < 2) {
options.coordinators = 2;
}
options.username = 'root';
options.password = '';
return tu.performTests(options, testCases, 'load_balancing', tu.runInArangosh, {
'server.authentication': 'true',
'server.jwt-secret': 'haxxmann'
});
}
exports.setup = function (testFns, defaultFns, opts, fnDocs, optionsDoc) {
testFns['load_balancing'] = loadBalancingClient;
testFns['load_balancing_auth'] = loadBalancingAuthClient;
opts['skipLoadBalancing'] = false;
defaultFns.push('load_balancing');
for (var attrname in functionsDocumentation) { fnDocs[attrname] = functionsDocumentation[attrname]; }
for (var i = 0; i < optionsDocumentation.length; i++) { optionsDoc.push(optionsDocumentation[i]); }
};

View File

@ -0,0 +1,285 @@
/* jshint globalstrict:true, strict:true, maxlen: 5000 */
/* global assertTrue, assertFalse, assertEqual, require*/
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dan Larkin-York
/// @author Copyright 2018, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
'use strict';
const jsunity = require("jsunity");
const base64Encode = require('internal').base64Encode;
const db = require("internal").db;
const request = require("@arangodb/request");
const url = require('url');
const userModule = require("@arangodb/users");
const _ = require("lodash");
function getCoordinators() {
const isCoordinator = (d) => (_.toLower(d.role) === 'coordinator');
const toEndpoint = (d) => (d.endpoint);
const endpointToURL = (endpoint) => {
if (endpoint.substr(0, 6) === 'ssl://') {
return 'https://' + endpoint.substr(6);
}
var pos = endpoint.indexOf('://');
if (pos === -1) {
return 'http://' + endpoint;
}
return 'http' + endpoint.substr(pos);
};
const instanceInfo = JSON.parse(require('internal').env.INSTANCEINFO);
return instanceInfo.arangods.filter(isCoordinator)
.map(toEndpoint)
.map(endpointToURL);
}
const servers = getCoordinators();
function CursorSyncAuthSuite () {
'use strict';
const cns = ["animals", "fruits"];
const keys = [
["ant", "bird", "cat", "dog"],
["apple", "banana", "coconut", "date"]
];
let cs = [];
let coordinators = [];
const users = [
{ username: 'alice', password: 'pass1' },
{ username: 'bob', password: 'pass2' },
];
const baseCursorUrl = `/_api/cursor`;
function sendRequest(auth, method, endpoint, body, usePrimary) {
let res;
const i = usePrimary ? 0 : 1;
try {
const envelope = {
body,
headers: {
authorization:
`Basic ${base64Encode(auth.username + ':' + auth.password)}`
},
json: true,
method,
url: `${coordinators[i]}${endpoint}`
};
res = request(envelope);
} catch(err) {
console.error(`Exception processing ${method} ${endpoint}`, err.stack);
return {};
}
var resultBody = res.body;
if (typeof resultBody === "string") {
resultBody = JSON.parse(resultBody);
}
return resultBody;
}
return {
setUpAll: function() {
coordinators = getCoordinators();
if (coordinators.length < 2) {
throw new Error('Expecting at least two coordinators');
}
cs = [];
for (let i = 0; i < cns.length; i++) {
db._drop(cns[i]);
cs.push(db._create(cns[i]));
assertTrue(cs[i].name() === cns[i]);
for (let key in keys[i]) {
cs[i].save({ _key: key });
}
}
userModule.save(users[0].username, users[0].password);
userModule.save(users[1].username, users[1].password);
userModule.grantDatabase(users[0].username, '_system', 'ro');
userModule.grantDatabase(users[1].username, '_system', 'ro');
userModule.grantCollection(users[0].username, '_system', cns[0], 'ro');
userModule.grantCollection(users[1].username, '_system', cns[0], 'ro');
userModule.grantCollection(users[0].username, '_system', cns[1], 'ro');
userModule.grantCollection(users[1].username, '_system', cns[1], 'none');
},
tearDownAll: function() {
db._drop(cns[0]);
db._drop(cns[1]);
},
testCursorForwardingSameUserBasic: function() {
let url = baseCursorUrl;
const query = {
query: `FOR doc IN @@coll LIMIT 4 RETURN doc`,
count: true,
batchSize: 2,
bindVars: {
"@coll": cns[0]
}
};
let result = sendRequest(users[0], 'POST', url, query, true);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 201);
assertTrue(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
const cursorId = result.id;
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[0], 'PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 200);
assertFalse(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[0], 'PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertTrue(result.error);
assertEqual(result.code, 404);
},
testCursorForwardingSameUserDeletion: function() {
let url = baseCursorUrl;
const query = {
query: `FOR doc IN @@coll LIMIT 4 RETURN doc`,
count: true,
batchSize: 2,
bindVars: {
"@coll": cns[0]
}
};
let result = sendRequest(users[0], 'POST', url, query, true);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 201);
assertTrue(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
const cursorId = result.id;
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[0], 'DELETE', url, {}, false);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 202);
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[0], 'PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertTrue(result.error);
assertEqual(result.code, 404);
},
testCursorForwardingDifferentUser: function() {
let url = baseCursorUrl;
const query = {
query: `FOR doc IN @@coll LIMIT 4 RETURN doc`,
count: true,
batchSize: 2,
bindVars: {
"@coll": cns[0]
}
};
let result = sendRequest(users[0], 'POST', url, query, true);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 201);
assertTrue(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
const cursorId = result.id;
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[1], 'PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertTrue(result.error);
assertEqual(result.code, 404);
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[0], 'DELETE', url, {}, false);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 202);
},
testCursorForwardingDifferentUserDelete: function() {
let url = baseCursorUrl;
const query = {
query: `FOR doc IN @@coll LIMIT 4 RETURN doc`,
count: true,
batchSize: 2,
bindVars: {
"@coll": cns[0]
}
};
let result = sendRequest(users[0], 'POST', url, query, true);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 201);
assertTrue(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
const cursorId = result.id;
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[1], 'DELETE', url, {}, false);
assertFalse(result === undefined || result === {});
assertTrue(result.error);
assertEqual(result.code, 404);
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest(users[0], 'DELETE', url, {}, false);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 202);
},
};
}
jsunity.run(CursorSyncAuthSuite);
return jsunity.done();

View File

@ -0,0 +1,194 @@
/* jshint globalstrict:true, strict:true, maxlen: 5000 */
/* global assertTrue, assertFalse, assertEqual, require*/
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dan Larkin-York
/// @author Copyright 2018, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
'use strict';
const jsunity = require("jsunity");
const db = require("internal").db;
const request = require("@arangodb/request");
const url = require('url');
const _ = require("lodash");
function getCoordinators() {
const isCoordinator = (d) => (_.toLower(d.role) === 'coordinator');
const toEndpoint = (d) => (d.endpoint);
const endpointToURL = (endpoint) => {
if (endpoint.substr(0, 6) === 'ssl://') {
return 'https://' + endpoint.substr(6);
}
var pos = endpoint.indexOf('://');
if (pos === -1) {
return 'http://' + endpoint;
}
return 'http' + endpoint.substr(pos);
};
const instanceInfo = JSON.parse(require('internal').env.INSTANCEINFO);
return instanceInfo.arangods.filter(isCoordinator)
.map(toEndpoint)
.map(endpointToURL);
}
const servers = getCoordinators();
function CursorSyncSuite () {
'use strict';
const cns = ["animals", "fruits"];
const keys = [
["ant", "bird", "cat", "dog"],
["apple", "banana", "coconut", "date"]
];
let cs = [];
let coordinators = [];
const baseCursorUrl = `/_api/cursor`;
function sendRequest(method, endpoint, body, usePrimary) {
let res;
const i = usePrimary ? 0 : 1;
try {
const envelope = {
body,
json: true,
method,
url: `${coordinators[i]}${endpoint}`
};
res = request(envelope);
} catch(err) {
console.error(`Exception processing ${method} ${endpoint}`, err.stack);
return {};
}
var resultBody = res.body;
if (typeof resultBody === "string") {
resultBody = JSON.parse(resultBody);
}
return resultBody;
}
return {
setUpAll: function() {
coordinators = getCoordinators();
if (coordinators.length < 2) {
throw new Error('Expecting at least two coordinators');
}
cs = [];
for (let i = 0; i < cns.length; i++) {
db._drop(cns[i]);
cs.push(db._create(cns[i]));
assertTrue(cs[i].name() === cns[i]);
for (let key in keys[i]) {
cs[i].save({ _key: key });
}
}
},
tearDownAll: function() {
db._drop(cns[0]);
db._drop(cns[1]);
},
testCursorForwardingBasic: function() {
let url = baseCursorUrl;
const query = {
query: `FOR doc IN @@coll LIMIT 4 RETURN doc`,
count: true,
batchSize: 2,
bindVars: {
"@coll": cns[0]
}
};
let result = sendRequest('POST', url, query, true);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 201);
assertTrue(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
const cursorId = result.id;
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest('PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 200);
assertFalse(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest('PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertTrue(result.error);
assertEqual(result.code, 404);
},
testCursorForwardingDeletion: function() {
let url = baseCursorUrl;
const query = {
query: `FOR doc IN @@coll LIMIT 4 RETURN doc`,
count: true,
batchSize: 2,
bindVars: {
"@coll": cns[0]
}
};
let result = sendRequest('POST', url, query, true);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 201);
assertTrue(result.hasMore);
assertEqual(result.count, 4);
assertEqual(result.result.length, 2);
const cursorId = result.id;
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest('DELETE', url, {}, false);
assertFalse(result === undefined || result === {});
assertFalse(result.error);
assertEqual(result.code, 202);
url = `${baseCursorUrl}/${cursorId}`;
result = sendRequest('PUT', url, {}, false);
assertFalse(result === undefined || result === {});
assertTrue(result.error);
assertEqual(result.code, 404);
},
};
}
jsunity.run(CursorSyncSuite);
return jsunity.done();

View File

@ -111,6 +111,9 @@ std::string const StaticStrings::MultiPartContentType("multipart/form-data");
std::string const StaticStrings::NoSniff("nosniff");
std::string const StaticStrings::Origin("origin");
std::string const StaticStrings::Queue("x-arango-queue");
std::string const StaticStrings::RequestForwardedTo(
"x-arango-request-forwarded-to");
std::string const StaticStrings::ResponseCode("x-arango-response-code");
std::string const StaticStrings::Server("server");
std::string const StaticStrings::StartThread("x-arango-start-thread");
std::string const StaticStrings::WwwAuthenticate("www-authenticate");

View File

@ -62,8 +62,8 @@ class StaticStrings {
static std::string const SilentString;
static std::string const WaitForSyncString;
static std::string const IsSynchronousReplicationString;
// database and collection names
// database and collection names
static std::string const SystemDatabase;
// HTTP headers
@ -106,6 +106,8 @@ class StaticStrings {
static std::string const NoSniff;
static std::string const Origin;
static std::string const Queue;
static std::string const RequestForwardedTo;
static std::string const ResponseCode;
static std::string const Server;
static std::string const StartThread;
static std::string const WwwAuthenticate;

View File

@ -110,7 +110,7 @@ class GeneralRequest {
/// to any specific resource
bool authenticated() const { return _authenticated; }
void setAuthenticated(bool a) { _authenticated = a; }
// @brief User sending this request
std::string const& user() const { return _user; }
void setUser(std::string const& user) { _user = user; }
@ -121,7 +121,7 @@ class GeneralRequest {
/// @brief set request context and whether this requests is allowed
/// to delete it
void setRequestContext(RequestContext*, bool isOwner);
RequestType requestType() const { return _type; }
void setRequestType(RequestType type) { _type = type; }
@ -150,7 +150,7 @@ class GeneralRequest {
// Returns the request path suffixes in non-URL-decoded form
std::vector<std::string> const& suffixes() const { return _suffixes; }
// Returns the request path suffixes in URL-decoded form. Note: this will
// re-compute the suffix list on every call!
std::vector<std::string> decodedSuffixes() const;
@ -178,8 +178,8 @@ class GeneralRequest {
bool& found) const = 0;
template <typename T>
T parsedValue(std::string const& key, T valueNotFound);
virtual std::unordered_map<std::string, std::string> values() const = 0;
virtual std::unordered_map<std::string, std::string> const& values() const = 0;
virtual std::unordered_map<std::string, std::vector<std::string>>
arrayValues() const = 0;

View File

@ -85,7 +85,7 @@ class HttpRequest final : public GeneralRequest {
std::string const& value(std::string const& key) const override;
std::string const& value(std::string const& key, bool& found) const override;
std::unordered_map<std::string, std::string> values() const override {
std::unordered_map<std::string, std::string> const& values() const override {
return _values;
}

View File

@ -81,7 +81,7 @@ class VstRequest final : public GeneralRequest {
std::string const& header(std::string const& key, bool& found) const override;
// values are query paramteres
std::unordered_map<std::string, std::string> values() const override {
std::unordered_map<std::string, std::string> const& values() const override {
return _values;
}
std::unordered_map<std::string, std::vector<std::string>> arrayValues()