1
0
Fork 0
arangodb/arangod/GeneralServer/RestHandler.cpp

545 lines
18 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
////////////////////////////////////////////////////////////////////////////////
#include "RestHandler.h"
#include <velocypack/Exception.h>
#include "Basics/RecursiveLocker.h"
#include "Basics/StringUtils.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Logger/LogMacros.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Rest/GeneralRequest.h"
#include "Rest/HttpResponse.h"
#include "Statistics/RequestStatistics.h"
#include "Utils/ExecContext.h"
#include "VocBase/ticks.h"
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::rest;
thread_local RestHandler const* RestHandler::CURRENT_HANDLER = nullptr;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
RestHandler::RestHandler(application_features::ApplicationServer& server,
GeneralRequest* request, GeneralResponse* response)
:
_request(request),
_response(response),
_server(server),
_statistics(nullptr),
_handlerId(0),
_state(HandlerState::PREPARE),
_canceled(false) {}
RestHandler::~RestHandler() {
if (_statistics != nullptr) {
_statistics->release();
}
}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
void RestHandler::assignHandlerId() {
_handlerId = TRI_NewServerSpecificTick();
}
uint64_t RestHandler::messageId() const {
uint64_t messageId = 0UL;
auto req = _request.get();
auto res = _response.get();
if (req) {
messageId = req->messageId();
} else if (res) {
messageId = res->messageId();
} else {
LOG_TOPIC("4651e", WARN, Logger::COMMUNICATION)
<< "could not find corresponding request/response";
}
return messageId;
}
RequestStatistics* RestHandler::stealStatistics() {
RequestStatistics* ptr = _statistics;
_statistics = nullptr;
return ptr;
}
void RestHandler::setStatistics(RequestStatistics* stat) {
RequestStatistics* old = _statistics;
_statistics = stat;
if (old != nullptr) {
old->release();
}
}
futures::Future<Result> RestHandler::forwardRequest(bool& forwarded) {
forwarded = false;
if (!ServerState::instance()->isCoordinator()) {
return futures::makeFuture(Result());
}
std::string serverId = forwardingTarget();
if (serverId.empty()) {
// no need to actually forward
return futures::makeFuture(Result());
}
NetworkFeature const& nf = server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
if (pool == nullptr) {
// nullptr happens only during controlled shutdown
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE,
TRI_ERROR_SHUTTING_DOWN, "shutting down server");
return futures::makeFuture(Result(TRI_ERROR_SHUTTING_DOWN));
}
LOG_TOPIC("38d99", DEBUG, Logger::REQUESTS)
<< "forwarding request " << _request->messageId() << " to " << serverId;
forwarded = true;
bool useVst = false;
if (_request->transportType() == Endpoint::TransportType::VST) {
useVst = true;
}
std::string const& dbname = _request->databaseName();
std::map<std::string, std::string> headers{_request->headers().begin(),
_request->headers().end()};
if (headers.find(StaticStrings::Authorization) == headers.end()) {
// No authorization header is set, this is in particular the case if this
// request is coming in with VelocyStream, where the authentication happens
// once at the beginning of the connection and not with every request.
// In this case, we have to produce a proper JWT token as authorization:
auto auth = AuthenticationFeature::instance();
if (auth != nullptr && auth->isActive()) {
// when in superuser mode, username is empty
// in this case ClusterComm will add the default superuser token
std::string const& username = _request->user();
if (!username.empty()) {
VPackBuilder builder;
{
VPackObjectBuilder payload{&builder};
payload->add("preferred_username", VPackValue(username));
}
VPackSlice slice = builder.slice();
headers.emplace(StaticStrings::Authorization,
"bearer " + auth->tokenCache().generateJwt(slice));
}
}
}
network::RequestOptions options;
options.database = dbname;
options.timeout = network::Timeout(300);
options.contentType = rest::contentTypeToString(_request->contentType());
options.acceptType = rest::contentTypeToString(_request->contentTypeResponse());
for (auto const& i : _request->values()) {
options.param(i.first, i.second);
}
auto requestType =
fuerte::from_string(GeneralRequest::translateMethod(_request->requestType()));
VPackStringRef resPayload = _request->rawPayload();
VPackBuffer<uint8_t> payload(resPayload.size());
payload.append(resPayload.data(), resPayload.size());
auto future = network::sendRequest(pool, "server:" + serverId, requestType,
_request->requestPath(),
std::move(payload), options, std::move(headers));
auto cb = [this, serverId, useVst,
self = shared_from_this()](network::Response&& response) -> Result {
int res = network::fuerteToArangoErrorCode(response);
if (res != TRI_ERROR_NO_ERROR) {
generateError(res);
return Result(res);
}
resetResponse(static_cast<rest::ResponseCode>(response.response->statusCode()));
_response->setContentType(fuerte::v1::to_string(response.response->contentType()));
if (!useVst) {
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (_response == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
httpResponse->body() = response.response->payloadAsString();
} else {
_response->setPayload(std::move(*response.response->stealPayload()), true);
}
auto const& resultHeaders = response.response->messageHeader().meta();
for (auto const& it : resultHeaders) {
_response->setHeader(it.first, it.second);
}
_response->setHeaderNC(StaticStrings::RequestForwardedTo, serverId);
return Result();
};
return std::move(future).thenValue(cb);
}
void RestHandler::handleExceptionPtr(std::exception_ptr eptr) noexcept {
try {
if (eptr) {
std::rethrow_exception(eptr);
}
} catch (Exception const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("11929", WARN, arangodb::Logger::FIXME)
<< "caught exception in " << name() << ": " << ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
handleError(ex);
} catch (arangodb::velocypack::Exception const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("fdcbc", WARN, arangodb::Logger::FIXME)
<< "caught velocypack exception in " << name() << ": "
<< ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
bool const isParseError =
(ex.errorCode() == arangodb::velocypack::Exception::ParseError ||
ex.errorCode() == arangodb::velocypack::Exception::UnexpectedControlCharacter);
Exception err(isParseError ? TRI_ERROR_HTTP_CORRUPTED_JSON : TRI_ERROR_INTERNAL,
std::string("VPack error: ") + ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (std::bad_alloc const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("5c9f6", WARN, arangodb::Logger::FIXME)
<< "caught memory exception in " << name() << ": "
<< ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_OUT_OF_MEMORY, ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (std::exception const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("252ea", WARN, arangodb::Logger::FIXME)
<< "caught exception in " << name() << ": " << ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_INTERNAL, ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (...) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("f729d", WARN, arangodb::Logger::FIXME) << "caught unknown exception in " << name();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_INTERNAL, __FILE__, __LINE__);
handleError(err);
}
}
void RestHandler::runHandlerStateMachine() {
TRI_ASSERT(_callback);
RECURSIVE_MUTEX_LOCKER(_executionMutex, _executionMutexOwner);
while (true) {
switch (_state) {
case HandlerState::PREPARE:
prepareEngine();
break;
case HandlerState::EXECUTE: {
executeEngine(/*isContinue*/false);
if (_state == HandlerState::PAUSED) {
shutdownExecute(false);
LOG_TOPIC("23a33", DEBUG, Logger::COMMUNICATION)
<< "Pausing rest handler execution " << this;
return; // stop state machine
}
break;
}
case HandlerState::CONTINUED: {
executeEngine(/*isContinue*/true);
if (_state == HandlerState::PAUSED) {
shutdownExecute(/*isFinalized*/false);
LOG_TOPIC("23727", DEBUG, Logger::COMMUNICATION)
<< "Pausing rest handler execution " << this;
return; // stop state machine
}
break;
}
case HandlerState::PAUSED:
LOG_TOPIC("ae26f", DEBUG, Logger::COMMUNICATION)
<< "Resuming rest handler execution " << this;
_state = HandlerState::CONTINUED;
break;
case HandlerState::FINALIZE:
RequestStatistics::SET_REQUEST_END(_statistics);
RestHandler::CURRENT_HANDLER = this;
// shutdownExecute is noexcept
shutdownExecute(true); // may not be moved down
RestHandler::CURRENT_HANDLER = nullptr;
_state = HandlerState::DONE;
// compress response if required
compressResponse();
// Callback may stealStatistics!
_callback(this);
break;
case HandlerState::FAILED:
RequestStatistics::SET_REQUEST_END(_statistics);
// Callback may stealStatistics!
_callback(this);
// No need to finalize here!
return;
case HandlerState::DONE:
return;
}
}
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
void RestHandler::prepareEngine() {
// set end immediately so we do not get netative statistics
RequestStatistics::SET_REQUEST_START_END(_statistics);
if (_canceled) {
_state = HandlerState::FAILED;
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_REQUEST_CANCELED,
"request has been canceled by user", __FILE__, __LINE__);
handleError(err);
return;
}
try {
prepareExecute(false);
_state = HandlerState::EXECUTE;
return;
} catch (Exception const& ex) {
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
handleError(ex);
} catch (std::exception const& ex) {
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_INTERNAL, ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (...) {
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_INTERNAL, __FILE__, __LINE__);
handleError(err);
}
_state = HandlerState::FAILED;
}
/// Execute the rest handler state machine. Retry the wakeup,
/// returns true if _state == PAUSED, false otherwise
bool RestHandler::wakeupHandler() {
RECURSIVE_MUTEX_LOCKER(_executionMutex, _executionMutexOwner);
if (_state == HandlerState::PAUSED) {
runHandlerStateMachine(); // may change _state
return _state == HandlerState::PAUSED;
}
return false;
}
void RestHandler::executeEngine(bool isContinue) {
ExecContext* exec = static_cast<ExecContext*>(_request->requestContext());
ExecContextScope scope(exec);
RestHandler::CURRENT_HANDLER = this;
try {
RestStatus result = RestStatus::DONE;
if (isContinue) {
// only need to run prepareExecute() again when we are continuing
// otherwise prepareExecute() was already run in the PREPARE phase
prepareExecute(true);
result = continueExecute();
} else {
result = execute();
}
RestHandler::CURRENT_HANDLER = nullptr;
if (result == RestStatus::WAITING) {
_state = HandlerState::PAUSED; // wait for someone to continue the state
// machine
return;
}
if (_response == nullptr) {
Exception err(TRI_ERROR_INTERNAL, "no response received from handler",
__FILE__, __LINE__);
handleError(err);
}
_state = HandlerState::FINALIZE;
return;
} catch (Exception const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("11928", WARN, arangodb::Logger::FIXME)
<< "caught exception in " << name() << ": " << ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
handleError(ex);
} catch (arangodb::velocypack::Exception const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("fdcbb", WARN, arangodb::Logger::FIXME)
<< "caught velocypack exception in " << name() << ": "
<< ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
bool const isParseError =
(ex.errorCode() == arangodb::velocypack::Exception::ParseError ||
ex.errorCode() == arangodb::velocypack::Exception::UnexpectedControlCharacter);
Exception err(isParseError ? TRI_ERROR_HTTP_CORRUPTED_JSON : TRI_ERROR_INTERNAL,
std::string("VPack error: ") + ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (std::bad_alloc const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("5c9f5", WARN, arangodb::Logger::FIXME)
<< "caught memory exception in " << name() << ": "
<< ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_OUT_OF_MEMORY, ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (std::exception const& ex) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("252e9", WARN, arangodb::Logger::FIXME)
<< "caught exception in " << name() << ": " << ex.what();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_INTERNAL, ex.what(), __FILE__, __LINE__);
handleError(err);
} catch (...) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC("f729c", WARN, arangodb::Logger::FIXME) << "caught unknown exception in " << name();
#endif
RequestStatistics::SET_EXECUTE_ERROR(_statistics);
Exception err(TRI_ERROR_INTERNAL, __FILE__, __LINE__);
handleError(err);
}
RestHandler::CURRENT_HANDLER = nullptr;
_state = HandlerState::FAILED;
}
void RestHandler::generateError(rest::ResponseCode code, int errorNumber,
std::string const& message) {
resetResponse(code);
if (_request->requestType() != rest::RequestType::HEAD) {
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
try {
builder.add(VPackValue(VPackValueType::Object));
builder.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
builder.add(StaticStrings::Error, VPackValue(true));
builder.add(StaticStrings::ErrorMessage, VPackValue(message));
builder.add(StaticStrings::ErrorNum, VPackValue(errorNumber));
builder.close();
VPackOptions options(VPackOptions::Defaults);
options.escapeUnicode = true;
TRI_ASSERT(options.escapeUnicode);
if (_request != nullptr) {
_response->setContentType(_request->contentTypeResponse());
}
_response->setPayload(std::move(buffer), true, options,
/*resolveExternals*/false);
} catch (...) {
// exception while generating error
}
}
}
void RestHandler::compressResponse() {
if (_response->isCompressionAllowed()) {
switch (_request->acceptEncoding()) {
case rest::EncodingType::DEFLATE:
_response->deflate();
_response->setHeaderNC(StaticStrings::ContentEncoding, StaticStrings::EncodingDeflate);
break;
default:
break;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generates an error
////////////////////////////////////////////////////////////////////////////////
void RestHandler::generateError(rest::ResponseCode code, int errorCode) {
char const* message = TRI_errno_string(errorCode);
if (message != nullptr) {
generateError(code, errorCode, std::string(message));
} else {
generateError(code, errorCode, std::string("unknown error"));
}
}
// generates an error
void RestHandler::generateError(arangodb::Result const& r) {
ResponseCode code = GeneralResponse::responseCode(r.errorNumber());
generateError(code, r.errorNumber(), r.errorMessage());
}
// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------
void RestHandler::resetResponse(rest::ResponseCode code) {
TRI_ASSERT(_response != nullptr);
_response->reset(code);
}