//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "RestWalAccessHandler.h" #include "Basics/ScopeGuard.h" #include "Basics/StaticStrings.h" #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" #include "Replication/ReplicationFeature.h" #include "Replication/Syncer.h" #include "Replication/common-defines.h" #include "Replication/utilities.h" #include "Rest/HttpResponse.h" #include "Rest/Version.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/ServerIdFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/WalAccess.h" #include "Transaction/Helpers.h" #include "Utils/CollectionNameResolver.h" #include "VocBase/LogicalCollection.h" #include #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::rest; struct MyTypeHandler final : public VPackCustomTypeHandler { explicit MyTypeHandler(TRI_vocbase_t& vocbase) : resolver(vocbase) {} ~MyTypeHandler() = default; void dump(VPackSlice const& value, VPackDumper* dumper, VPackSlice const& base) override final { dumper->appendString(toString(value, nullptr, base)); } std::string toString(VPackSlice const& value, VPackOptions const* options, VPackSlice const& base) override final { return transaction::helpers::extractIdString(&resolver, value, base); } CollectionNameResolver resolver; }; RestWalAccessHandler::RestWalAccessHandler(GeneralRequest* request, GeneralResponse* response) : RestVocbaseBaseHandler(request, response) {} bool RestWalAccessHandler::parseFilter(WalAccess::Filter& filter) { // determine start and end tick filter.tickStart = _request->parsedValue("from", filter.tickStart); filter.tickLastScanned = _request->parsedValue("lastScanned", filter.tickLastScanned); // determine end tick for dump filter.tickEnd = _request->parsedValue("to", filter.tickEnd); if (filter.tickStart > filter.tickEnd || filter.tickEnd == 0) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return false; } bool global = _request->parsedValue("global", false); if (global) { if (!_vocbase.isSystem()) { generateError( rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN, "global tailing is only possible from within _system database"); return false; } } else { // filter for database filter.vocbase = _vocbase.id(); // extract collection bool found = false; std::string const& value2 = _request->value("collection", found); if (found) { auto c = _vocbase.lookupCollection(value2); if (c == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); return false; } // filter for collection filter.collection = c->id(); } } filter.includeSystem = _request->parsedValue("includeSystem", filter.includeSystem); filter.includeFoxxQueues = _request->parsedValue("includeFoxxQueues", false); // grab list of transactions from the body value if (_request->requestType() == arangodb::rest::RequestType::PUT) { filter.firstRegularTick = _request->parsedValue("firstRegularTick", 0); // copy default options VPackOptions options = VPackOptions::Defaults; options.checkAttributeUniqueness = true; VPackSlice slice; try { slice = _request->payload(&options); } catch (...) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid body value. expecting array"); return false; } if (!slice.isArray()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid body value. expecting array"); return false; } for (auto const& id : VPackArrayIterator(slice)) { if (!id.isString()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid body value. expecting array of ids"); return false; } filter.transactionIds.emplace(StringUtils::uint64(id.copyString())); } } return true; } RestStatus RestWalAccessHandler::execute() { if (ServerState::instance()->isCoordinator()) { generateError(rest::ResponseCode::NOT_IMPLEMENTED, TRI_ERROR_CLUSTER_UNSUPPORTED, "'/_api/wal' is not yet supported in a cluster"); return RestStatus::DONE; } if (ExecContext::CURRENT == nullptr || !ExecContext::CURRENT->isAdminUser()) { generateError(ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN); return RestStatus::DONE; } std::vector suffixes = _request->decodedSuffixes(); if (suffixes.empty()) { generateError(ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expected GET _api/wal/[tail|range|lastTick]>"); return RestStatus::DONE; } StorageEngine* engine = EngineSelectorFeature::ENGINE; TRI_ASSERT(engine != nullptr); // Engine not loaded. Startup broken WalAccess const* wal = engine->walAccess(); TRI_ASSERT(wal != nullptr); if (suffixes[0] == "range" && _request->requestType() == RequestType::GET) { handleCommandTickRange(wal); } else if (suffixes[0] == "lastTick" && _request->requestType() == RequestType::GET) { handleCommandLastTick(wal); } else if (suffixes[0] == "tail" && (_request->requestType() == RequestType::GET || _request->requestType() == RequestType::PUT)) { handleCommandTail(wal); } else if (suffixes[0] == "open-transactions" && _request->requestType() == RequestType::GET) { handleCommandDetermineOpenTransactions(wal); } else { generateError( ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expected GET _api/wal/[tail|range|lastTick|open-transactions]>"); } return RestStatus::DONE; } void RestWalAccessHandler::handleCommandTickRange(WalAccess const* wal) { std::pair minMax; Result r = wal->tickRange(minMax); if (r.ok()) { /// {"tickMin":"123", "tickMax":"456", "version":"3.2", "serverId":"abc"} VPackBuilder result; result.openObject(); result.add("time", VPackValue(utilities::timeString())); // "state" part result.add("tickMin", VPackValue(std::to_string(minMax.first))); result.add("tickMax", VPackValue(std::to_string(minMax.second))); { // "server" part VPackObjectBuilder server(&result, "server", true); server->add("version", VPackValue(ARANGODB_VERSION)); server->add("serverId", VPackValue(std::to_string(ServerIdFeature::getId()))); } result.close(); generateResult(rest::ResponseCode::OK, result.slice()); } else { generateError(r); } } void RestWalAccessHandler::handleCommandLastTick(WalAccess const* wal) { VPackBuilder result; result.openObject(); result.add("time", VPackValue(utilities::timeString())); // "state" part result.add("tick", VPackValue(std::to_string(wal->lastTick()))); { // "server" part VPackObjectBuilder server(&result, "server", true); server->add("version", VPackValue(ARANGODB_VERSION)); server->add("serverId", VPackValue(std::to_string(ServerIdFeature::getId()))); } result.close(); generateResult(rest::ResponseCode::OK, result.slice()); } void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { // track the number of parallel invocations of the tailing API auto* rf = application_features::ApplicationServer::getFeature("Replication"); // this may throw when too many threads are going into tailing rf->trackTailingStart(); auto guard = scopeGuard([rf]() { rf->trackTailingEnd(); }); bool const useVst = (_request->transportType() == Endpoint::TransportType::VST); WalAccess::Filter filter; if (!parseFilter(filter)) { return; } // check for serverId TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); std::string const clientInfo = _request->value("clientInfo"); // check if a barrier id was specified in request TRI_voc_tid_t barrierId = _request->parsedValue("barrier", static_cast(0)); grantTemporaryRights(); bool found = false; size_t chunkSize = 1024 * 1024; std::string const& value5 = _request->value("chunkSize", found); if (found) { chunkSize = static_cast(StringUtils::uint64(value5)); chunkSize = std::min((size_t)128 * 1024 * 1024, chunkSize); } WalAccessResult result; std::map> handlers; VPackOptions opts = VPackOptions::Defaults; auto prepOpts = [&handlers, &opts](TRI_vocbase_t& vocbase) -> void { auto const& it = handlers.find(vocbase.id()); if (it == handlers.end()) { auto res = handlers.emplace(vocbase.id(), std::make_unique(vocbase)); opts.customTypeHandler = res.first->second.get(); } else { opts.customTypeHandler = it->second.get(); } }; size_t length = 0; if (useVst) { result = wal->tail(filter, chunkSize, barrierId, [&](TRI_vocbase_t* vocbase, VPackSlice const& marker) { length++; if (vocbase != nullptr) { // database drop has no vocbase prepOpts(*vocbase); } _response->addPayload(marker, &opts, true); }); } else { HttpResponse* httpResponse = dynamic_cast(_response.get()); TRI_ASSERT(httpResponse); if (httpResponse == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } basics::StringBuffer& buffer = httpResponse->body(); basics::VPackStringBufferAdapter adapter(buffer.stringBuffer()); // note: we need the CustomTypeHandler here VPackDumper dumper(&adapter, &opts); result = wal->tail(filter, chunkSize, barrierId, [&](TRI_vocbase_t* vocbase, VPackSlice const& marker) { length++; if (vocbase != nullptr) { // database drop has no vocbase prepOpts(*vocbase); } dumper.dump(marker); buffer.appendChar('\n'); // LOG_TOPIC("cda47", INFO, Logger::REPLICATION) << // marker.toJson(&opts); }); } if (result.fail()) { generateError(std::move(result).result()); return; } // transfer ownership of the buffer contents _response->setContentType(rest::ContentType::DUMP); TRI_ASSERT(result.latestTick() >= result.lastIncludedTick()); TRI_ASSERT(result.latestTick() >= result.lastScannedTick()); // set headers bool const checkMore = result.lastIncludedTick() > 0 && result.lastIncludedTick() < result.latestTick(); _response->setHeaderNC(StaticStrings::ReplicationHeaderCheckMore, checkMore ? "true" : "false"); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastIncluded, StringUtils::itoa(result.lastIncludedTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, StringUtils::itoa(result.lastScannedTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, StringUtils::itoa(result.latestTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, result.fromTickIncluded() ? "true" : "false"); if (length > 0) { _response->setResponseCode(rest::ResponseCode::OK); LOG_TOPIC("078ad", DEBUG, Logger::REPLICATION) << "WAL tailing after " << filter.tickStart << ", lastIncludedTick " << result.lastIncludedTick() << ", fromTickIncluded " << result.fromTickIncluded(); } else { LOG_TOPIC("29624", DEBUG, Logger::REPLICATION) << "No more data in WAL after " << filter.tickStart; _response->setResponseCode(rest::ResponseCode::NO_CONTENT); } DatabaseFeature::DATABASE->enumerateDatabases([&](TRI_vocbase_t& vocbase) -> void { vocbase.replicationClients().track(syncerId, clientId, clientInfo, filter.tickStart, replutils::BatchInfo::DefaultTimeout); }); } void RestWalAccessHandler::handleCommandDetermineOpenTransactions(WalAccess const* wal) { // determine start and end tick std::pair minMax; Result res = wal->tickRange(minMax); if (res.fail()) { generateError(res); return; } bool found = false; std::string const& value1 = _request->value("from", found); if (found) { minMax.first = static_cast(StringUtils::uint64(value1)); } // determine end tick for dump std::string const& value2 = _request->value("to", found); if (found) { minMax.second = static_cast(StringUtils::uint64(value2)); } if (minMax.first > minMax.second || minMax.second == 0) { generateError(ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return; } // check whether a database was specified WalAccess::Filter filter; filter.tickStart = minMax.first; filter.tickEnd = minMax.second; if (!parseFilter(filter)) { return; } VPackBuffer buffer; VPackBuilder builder(buffer); builder.openArray(); WalAccessResult r = wal->openTransactions(filter, [&](TRI_voc_tick_t tick, TRI_voc_tid_t tid) { builder.add(VPackValue(std::to_string(tid))); }); builder.close(); _response->setContentType(rest::ContentType::DUMP); if (res.fail()) { generateError(res); } else { auto cc = r.lastIncludedTick() != 0 ? ResponseCode::OK : ResponseCode::NO_CONTENT; generateResult(cc, std::move(buffer)); _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, r.fromTickIncluded() ? "true" : "false"); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastIncluded, StringUtils::itoa(r.lastIncludedTick())); } } /// @brief Grant temporary restore rights void RestWalAccessHandler::grantTemporaryRights() { if (ExecContext::CURRENT != nullptr) { if (ExecContext::CURRENT->databaseAuthLevel() == auth::Level::RW) { // If you have administrative access on this database, // we grant you everything for restore. ExecContext::CURRENT = nullptr; } } }