//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2019 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Tobias Gödderz //////////////////////////////////////////////////////////////////////////////// #include "RemoteExecutor.h" #include "Aql/ClusterNodes.h" #include "Aql/ExecutionEngine.h" #include "Aql/ExecutorInfos.h" #include "Aql/InputAqlItemRow.h" #include "Aql/Query.h" #include "Basics/MutexLocker.h" #include "Basics/RecursiveLocker.h" #include "Basics/StringBuffer.h" #include "Basics/VelocyPackHelper.h" #include "Cluster/ServerState.h" #include "Logger/LogMacros.h" #include "Network/ConnectionPool.h" #include "Network/NetworkFeature.h" #include "Network/Utils.h" #include "Rest/CommonDefines.h" #include "VocBase/vocbase.h" #include #include #include #include using namespace arangodb; using namespace arangodb::aql; using arangodb::basics::VelocyPackHelper; namespace { /// @brief timeout constexpr std::chrono::seconds kDefaultTimeOutSecs(3600); } // namespace ExecutionBlockImpl::ExecutionBlockImpl( ExecutionEngine* engine, RemoteNode const* node, ExecutorInfos&& infos, std::string const& server, std::string const& ownName, std::string const& queryId) : ExecutionBlock(engine, node), _infos(std::move(infos)), _query(*engine->getQuery()), _server(server), _ownName(ownName), _queryId(queryId), _isResponsibleForInitializeCursor(node->isResponsibleForInitializeCursor()), _lastError(TRI_ERROR_NO_ERROR), _lastTicket(0), _hasTriggeredShutdown(false) { TRI_ASSERT(!queryId.empty()); TRI_ASSERT((arangodb::ServerState::instance()->isCoordinator() && ownName.empty()) || (!arangodb::ServerState::instance()->isCoordinator() && !ownName.empty())); } std::pair ExecutionBlockImpl::getSome(size_t atMost) { traceGetSomeBegin(atMost); auto result = getSomeWithoutTrace(atMost); return traceGetSomeEnd(result.first, std::move(result.second)); } std::pair ExecutionBlockImpl::getSomeWithoutTrace(size_t atMost) { // silence tests -- we need to introduce new failure tests for fetchers TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome1") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome2") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome3") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } if (getQuery().killed()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); } // For every call we simply forward via HTTP if (_lastError.fail()) { TRI_ASSERT(_lastResponse == nullptr); Result res = _lastError; _lastError.reset(); // we were called with an error need to throw it. THROW_ARANGO_EXCEPTION(res); } if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); // We do not have an error but a result, all is good // We have an open result still. auto response = std::move(_lastResponse); // Result is the response which will be a serialized AqlItemBlock // both must be reset before return or throw TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); VPackSlice responseBody = response->slice(); ExecutionState state = ExecutionState::HASMORE; if (VelocyPackHelper::getBooleanValue(responseBody, "done", true)) { state = ExecutionState::DONE; } if (responseBody.hasKey("data")) { SharedAqlItemBlockPtr r = _engine->itemBlockManager().requestAndInitBlock(responseBody); return {state, std::move(r)}; } return {ExecutionState::DONE, nullptr}; } // We need to send a request here VPackBuffer buffer; { VPackBuilder builder(buffer); builder.openObject(); builder.add("atMost", VPackValue(atMost)); builder.close(); traceGetSomeRequest(builder.slice(), atMost); } auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/getSome/", std::move(buffer)); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } return {ExecutionState::WAITING, nullptr}; } std::pair ExecutionBlockImpl::skipSome(size_t atMost) { traceSkipSomeBegin(atMost); auto result = skipSomeWithoutTrace(atMost); return traceSkipSomeEnd(result.first, result.second); } std::pair ExecutionBlockImpl::skipSomeWithoutTrace(size_t atMost) { if (_lastError.fail()) { TRI_ASSERT(_lastResponse == nullptr); Result res = _lastError; _lastError.reset(); // we were called with an error need to throw it. THROW_ARANGO_EXCEPTION(res); } if (getQuery().killed()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); } if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); // We have an open result still. // Result is the response which will be a serialized AqlItemBlock auto response = std::move(_lastResponse); // both must be reset before return or throw TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); VPackSlice slice = response->slice(); if (!slice.hasKey(StaticStrings::Error) || slice.get(StaticStrings::Error).getBoolean()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION); } size_t skipped = 0; VPackSlice s = slice.get("skipped"); if (s.isNumber()) { auto value = s.getNumericValue(); if (value < 0) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "skipped cannot be negative"); } skipped = s.getNumericValue(); } // TODO Check if we can get better with HASMORE/DONE if (skipped == 0) { return {ExecutionState::DONE, skipped}; } return {ExecutionState::HASMORE, skipped}; } // For every call we simply forward via HTTP VPackBuffer buffer; { VPackBuilder builder(buffer); builder.openObject(/*unindexed*/ true); builder.add("atMost", VPackValue(atMost)); builder.close(); traceSkipSomeRequest(builder.slice(), atMost); } auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/skipSome/", std::move(buffer)); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } return {ExecutionState::WAITING, 0}; } std::pair ExecutionBlockImpl::initializeCursor( InputAqlItemRow const& input) { // For every call we simply forward via HTTP if (!_isResponsibleForInitializeCursor) { // do nothing... return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } if (!input.isInitialized()) { // we simply ignore the initialCursor request, as the remote side // will initialize the cursor lazily return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } if (getQuery().killed()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); } if (_lastResponse != nullptr || _lastError.fail()) { // We have an open result still. auto response = std::move(_lastResponse); // Result is the response which is an object containing the ErrorCode VPackSlice slice = response->slice(); if (slice.hasKey("code")) { return {ExecutionState::DONE, slice.get("code").getNumericValue()}; } return {ExecutionState::DONE, TRI_ERROR_INTERNAL}; } VPackOptions options(VPackOptions::Defaults); options.buildUnindexedArrays = true; options.buildUnindexedObjects = true; VPackBuffer buffer; VPackBuilder builder(buffer, &options); builder.openObject(/*unindexed*/ true); // Backwards Compatibility 3.3 // NOTE: Removing this breaks tests in current devel - is this really for // bc only? builder.add("exhausted", VPackValue(false)); // Used in 3.4.0 onwards builder.add("done", VPackValue(false)); builder.add("error", VPackValue(false)); // NOTE API change. Before all items have been send. // Now only the one output row is send. builder.add("pos", VPackValue(0)); builder.add(VPackValue("items")); builder.openObject(/*unindexed*/ true); input.toVelocyPack(_engine->getQuery()->trx(), builder); builder.close(); builder.close(); auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/initializeCursor/", std::move(buffer)); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } /// @brief shutdown, will be called exactly once for the whole query std::pair ExecutionBlockImpl::shutdown(int errorCode) { if (!_isResponsibleForInitializeCursor) { // do nothing... return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } if (!_hasTriggeredShutdown) { std::lock_guard guard(_communicationMutex); std::ignore = generateNewTicket(); _hasTriggeredShutdown = true; } if (_lastError.fail()) { TRI_ASSERT(_lastResponse == nullptr); Result res = _lastError; _lastError.reset(); if (res.is(TRI_ERROR_QUERY_NOT_FOUND)) { // Ignore query not found errors, they should do no harm. // However, as only the snippet with _isResponsibleForInitializeCursor // should now call shutdown, this should not usually happen, so emit a // warning. LOG_TOPIC("8d035", WARN, Logger::AQL) << "During AQL query shutdown: " << "Query ID " << _queryId << " not found on " << _server; return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; } // we were called with an error and need to throw it. THROW_ARANGO_EXCEPTION(res); } if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); auto response = std::move(_lastResponse); // both must be reset before return or throw TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); VPackSlice slice = response->slice(); if (slice.isObject()) { if (slice.hasKey("stats")) { ExecutionStats newStats(slice.get("stats")); _engine->_stats.add(newStats); } // read "warnings" attribute if present and add it to our query VPackSlice warnings = slice.get("warnings"); if (warnings.isArray()) { auto query = _engine->getQuery(); for (VPackSlice it : VPackArrayIterator(warnings)) { if (it.isObject()) { VPackSlice code = it.get("code"); VPackSlice message = it.get("message"); if (code.isNumber() && message.isString()) { query->registerWarning(code.getNumericValue(), message.copyString().c_str()); } } } } if (slice.hasKey("code")) { return {ExecutionState::DONE, slice.get("code").getNumericValue()}; } } return {ExecutionState::DONE, TRI_ERROR_INTERNAL}; } // Already sent a shutdown request, but haven't got an answer yet. if (_didSendShutdownRequest) { return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } _didSendShutdownRequest = true; // For every call we simply forward via HTTP VPackBuffer buffer; VPackBuilder builder(buffer); builder.openObject(/*unindexed*/ true); builder.add("code", VPackValue(errorCode)); builder.close(); traceShutdownRequest(builder.slice(), errorCode); auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/shutdown/", std::move(buffer)); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } namespace { Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err, fuerte::Response* response) { TRI_ASSERT(err != fuerte::Error::NoError || response->statusCode() >= 400); std::string msg; if (spec.shardId.empty()) { msg.append("Error message received from cluster node '") .append(spec.serverId) .append("': "); } else { msg.append("Error message received from shard '") .append(spec.shardId) .append("' on cluster node '") .append(spec.serverId) .append("': "); } int res = TRI_ERROR_INTERNAL; if (err != fuerte::Error::NoError) { res = network::fuerteToArangoErrorCode(err); msg.append(TRI_errno_string(res)); } else { VPackSlice slice = response->slice(); if (slice.isObject()) { VPackSlice err = slice.get(StaticStrings::Error); if (err.isBool() && err.getBool()) { res = VelocyPackHelper::readNumericValue(slice, StaticStrings::ErrorNum, res); VPackStringRef ref = VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage, "(no valid error in response)"); msg.append(ref.data(), ref.size()); } } } return Result(res, std::move(msg)); } } // namespace Result ExecutionBlockImpl::sendAsyncRequest(fuerte::RestVerb type, std::string const& urlPart, VPackBuffer body) { NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature(); network::ConnectionPool* pool = nf.pool(); if (!pool) { // nullptr only happens on controlled shutdown return {TRI_ERROR_SHUTTING_DOWN}; } std::string url = std::string("/_db/") + arangodb::basics::StringUtils::urlEncode(_engine->getQuery()->vocbase().name()) + urlPart + _queryId; arangodb::network::EndpointSpec spec; int res = network::resolveDestination(nf, _server, spec); if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?! return Result(res); } TRI_ASSERT(!spec.endpoint.empty()); auto req = fuerte::createRequest(type, url, {}, std::move(body)); // Later, we probably want to set these sensibly: req->timeout(kDefaultTimeOutSecs); if (!_ownName.empty()) { req->header.addMeta("Shard-Id", _ownName); } network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint); std::lock_guard guard(_communicationMutex); auto ticket = generateNewTicket(); std::shared_ptr conn = ref.connection(); conn->sendRequest(std::move(req), [this, ticket, spec, sharedState = _query.sharedState(), ref(std::move(ref))](fuerte::Error err, std::unique_ptr, std::unique_ptr res) { // `this` is only valid as long as sharedState is valid. // So we must execute this under sharedState's mutex. sharedState->execute([&] { std::lock_guard guard(_communicationMutex); if (_lastTicket == ticket) { if (err != fuerte::Error::NoError || res->statusCode() >= 400) { _lastError = handleErrorResponse(spec, err, res.get()); } else { _lastResponse = std::move(res); } } }); }); ++_engine->_stats.requests; return {TRI_ERROR_NO_ERROR}; } void ExecutionBlockImpl::traceGetSomeRequest(VPackSlice const slice, size_t const atMost) { using namespace std::string_literals; traceRequest("getSome", slice, "atMost="s + std::to_string(atMost)); } void ExecutionBlockImpl::traceSkipSomeRequest(VPackSlice const slice, size_t const atMost) { using namespace std::string_literals; traceRequest("skipSome", slice, "atMost="s + std::to_string(atMost)); } void ExecutionBlockImpl::traceShutdownRequest(VPackSlice const slice, int const errorCode) { using namespace std::string_literals; traceRequest("shutdown", slice, "errorCode="s + std::to_string(errorCode)); } void ExecutionBlockImpl::traceRequest(char const* const rpc, VPackSlice const slice, std::string const& args) { if (_profile >= PROFILE_LEVEL_TRACE_1) { auto const queryId = this->_engine->getQuery()->id(); auto const remoteQueryId = _queryId; LOG_TOPIC("92c71", INFO, Logger::QUERIES) << "[query#" << queryId << "] remote request sent: " << rpc << (args.empty() ? "" : " ") << args << " registryId=" << remoteQueryId; } } unsigned ExecutionBlockImpl::generateNewTicket() { // Assumes that _communicationMutex is locked in the caller ++_lastTicket; _lastError.reset(TRI_ERROR_NO_ERROR); _lastResponse.reset(); return _lastTicket; }