//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "MMFilesRestReplicationHandler.h" #include "Auth/Common.h" #include "Basics/VelocyPackHelper.h" #include "Logger/Logger.h" #include "MMFiles/MMFilesCollectionKeys.h" #include "MMFiles/MMFilesEngine.h" #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/mmfiles-replication-dump.h" #include "Replication/InitialSyncer.h" #include "RestServer/DatabaseFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "Transaction/StandaloneContext.h" #include "Utils/CollectionGuard.h" #include "Utils/CollectionKeysRepository.h" #include "Utils/CollectionNameResolver.h" #include "Utils/ExecContext.h" #include "Utils/OperationOptions.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" #include #include #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::rest; MMFilesRestReplicationHandler::MMFilesRestReplicationHandler(GeneralRequest* request, GeneralResponse* response) : RestReplicationHandler(request, response) {} MMFilesRestReplicationHandler::~MMFilesRestReplicationHandler() {} /// @brief insert the applier action into an action list void MMFilesRestReplicationHandler::insertClient(TRI_voc_tick_t lastServedTick) { bool found; std::string const& value = _request->value("serverId", found); if (found && !value.empty() && value != "none") { TRI_server_id_t serverId = static_cast(StringUtils::uint64(value)); if (serverId > 0) { _vocbase->updateReplicationClient(serverId, lastServedTick, InitialSyncer::defaultBatchTimeout); } } } void MMFilesRestReplicationHandler::handleCommandBatch() { // extract the request type auto const type = _request->requestType(); auto const& suffixes = _request->suffixes(); size_t const len = suffixes.size(); TRI_ASSERT(len >= 1); if (type == rest::RequestType::POST) { // create a new blocker std::shared_ptr input = _request->toVelocyPackBuilderPtr(); if (input == nullptr || !input->slice().isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); return; } // extract ttl double expires = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 30.0); TRI_voc_tick_t id; MMFilesEngine* engine = static_cast(EngineSelectorFeature::ENGINE); int res = engine->insertCompactionBlocker(_vocbase, expires, id); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } VPackBuilder b; b.add(VPackValue(VPackValueType::Object)); b.add("id", VPackValue(std::to_string(id))); b.close(); generateResult(rest::ResponseCode::OK, b.slice()); return; } if (type == rest::RequestType::PUT && len >= 2) { // extend an existing blocker TRI_voc_tick_t id = static_cast(StringUtils::uint64(suffixes[1])); auto input = _request->toVelocyPackBuilderPtr(); if (input == nullptr || !input->slice().isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); return; } // extract ttl double expires = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 30.0); // now extend the blocker MMFilesEngine* engine = static_cast(EngineSelectorFeature::ENGINE); int res = engine->extendCompactionBlocker(_vocbase, id, expires); if (res == TRI_ERROR_NO_ERROR) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { generateError(GeneralResponse::responseCode(res), res); } return; } if (type == rest::RequestType::DELETE_REQ && len >= 2) { // delete an existing blocker TRI_voc_tick_t id = static_cast(StringUtils::uint64(suffixes[1])); MMFilesEngine* engine = static_cast(EngineSelectorFeature::ENGINE); int res = engine->removeCompactionBlocker(_vocbase, id); if (res == TRI_ERROR_NO_ERROR) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { generateError(GeneralResponse::responseCode(res), res); } return; } // we get here if anything above is invalid generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); } /// @brief add or remove a WAL logfile barrier void MMFilesRestReplicationHandler::handleCommandBarrier() { // extract the request type auto const type = _request->requestType(); std::vector const& suffixes = _request->suffixes(); size_t const len = suffixes.size(); TRI_ASSERT(len >= 1); if (type == rest::RequestType::POST) { // create a new barrier std::shared_ptr input = _request->toVelocyPackBuilderPtr(); if (input == nullptr || !input->slice().isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); return; } // extract ttl double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 30.0); TRI_voc_tick_t minTick = 0; VPackSlice const v = input->slice().get("tick"); if (v.isString()) { minTick = StringUtils::uint64(v.copyString()); } else if (v.isNumber()) { minTick = v.getNumber(); } if (minTick == 0) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid tick value"); return; } TRI_voc_tick_t id = MMFilesLogfileManager::instance()->addLogfileBarrier(_vocbase->id(), minTick, ttl); VPackBuilder b; b.add(VPackValue(VPackValueType::Object)); std::string const idString(std::to_string(id)); b.add("id", VPackValue(idString)); b.close(); generateResult(rest::ResponseCode::OK, b.slice()); return; } if (type == rest::RequestType::PUT && len >= 2) { // extend an existing barrier TRI_voc_tick_t id = StringUtils::uint64(suffixes[1]); std::shared_ptr input = _request->toVelocyPackBuilderPtr(); if (input == nullptr || !input->slice().isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); return; } // extract ttl double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 30.0); TRI_voc_tick_t minTick = 0; VPackSlice const v = input->slice().get("tick"); if (v.isString()) { minTick = StringUtils::uint64(v.copyString()); } else if (v.isNumber()) { minTick = v.getNumber(); } if (MMFilesLogfileManager::instance()->extendLogfileBarrier(id, ttl, minTick)) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { int res = TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; generateError(GeneralResponse::responseCode(res), res); } return; } if (type == rest::RequestType::DELETE_REQ && len >= 2) { // delete an existing barrier TRI_voc_tick_t id = StringUtils::uint64(suffixes[1]); if (MMFilesLogfileManager::instance()->removeLogfileBarrier(id)) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { int res = TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; generateError(GeneralResponse::responseCode(res), res); } return; } if (type == rest::RequestType::GET) { // fetch all barriers auto ids = MMFilesLogfileManager::instance()->getLogfileBarriers(); VPackBuilder b; b.add(VPackValue(VPackValueType::Array)); for (auto& it : ids) { b.add(VPackValue(std::to_string(it))); } b.close(); generateResult(rest::ResponseCode::OK, b.slice()); return; } // we get here if anything above is invalid generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); } void MMFilesRestReplicationHandler::handleCommandLoggerFollow() { bool useVst = false; if (_request->transportType() == Endpoint::TransportType::VST) { useVst = true; } // determine start and end tick MMFilesLogfileManagerState const state = MMFilesLogfileManager::instance()->state(); TRI_voc_tick_t tickStart = 0; TRI_voc_tick_t tickEnd = UINT64_MAX; TRI_voc_tick_t firstRegularTick = 0; bool found; std::string const& value1 = _request->value("from", found); if (found) { tickStart = static_cast(StringUtils::uint64(value1)); } // determine end tick for dump std::string const& value2 = _request->value("to", found); if (found) { tickEnd = static_cast(StringUtils::uint64(value2)); } if (found && (tickStart > tickEnd || tickEnd == 0)) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return; } // check if a barrier id was specified in request TRI_voc_tid_t barrierId = 0; std::string const& value3 = _request->value("barrier", found); if (found) { barrierId = static_cast(StringUtils::uint64(value3)); } bool includeSystem = true; std::string const& value4 = _request->value("includeSystem", found); if (found) { includeSystem = StringUtils::boolean(value4); } // grab list of transactions from the body value std::unordered_set transactionIds; if (_request->requestType() == arangodb::rest::RequestType::PUT) { std::string const& value5 = _request->value("firstRegularTick", found); if (found) { firstRegularTick = static_cast(StringUtils::uint64(value5)); } // copy default options VPackOptions options = VPackOptions::Defaults; options.checkAttributeUniqueness = true; VPackSlice slice; try { slice = _request->payload(&options); } catch (...) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid body value. expecting array"); return; } if (!slice.isArray()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid body value. expecting array"); return; } for (auto const& id : VPackArrayIterator(slice)) { if (!id.isString()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid body value. expecting array of ids"); return; } transactionIds.emplace(StringUtils::uint64(id.copyString())); } } grantTemporaryRights(); // extract collection TRI_voc_cid_t cid = 0; std::string const& value6 = _request->value("collection", found); if (found) { arangodb::LogicalCollection* c = _vocbase->lookupCollection(value6); if (c == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); return; } cid = c->cid(); } if (barrierId > 0) { // extend the WAL logfile barrier MMFilesLogfileManager::instance()->extendLogfileBarrier(barrierId, 180, tickStart); } auto ctx = transaction::StandaloneContext::Create(_vocbase); // initialize the dump container MMFilesReplicationDumpContext dump(ctx, static_cast(determineChunkSize()), includeSystem, cid, useVst); // and dump int res = MMFilesDumpLogReplication(&dump, transactionIds, firstRegularTick, tickStart, tickEnd, false); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::responseCode(res), res); return; } bool const checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state.lastCommittedTick); // generate the result size_t length = 0; if (useVst) { length = dump._slices.size(); } else { length = TRI_LengthStringBuffer(dump._buffer); } if (length == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); } // transfer ownership of the buffer contents _response->setContentType(rest::ContentType::DUMP); // set headers _response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE, checkMore ? "true" : "false"); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTINCLUDED, StringUtils::itoa(dump._lastFoundTick)); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(state.lastCommittedTick)); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTSCANNED, StringUtils::itoa(dump._lastScannedTick)); _response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true"); _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, dump._fromTickIncluded ? "true" : "false"); if (length > 0) { if (useVst) { for (auto message : dump._slices) { _response->addPayload(std::move(message), &dump._vpackOptions, true); } } else { HttpResponse* httpResponse = dynamic_cast(_response.get()); if (httpResponse == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } /*std::string ll(TRI_BeginStringBuffer(dump._buffer), TRI_LengthStringBuffer(dump._buffer)); for (std::string const& str : basics::StringUtils::split(ll, '\n')) { if (!str.empty()) LOG_TOPIC(WARN, Logger::FIXME) << str; }*/ // transfer ownership of the buffer contents httpResponse->body().set(dump._buffer); // to avoid double freeing TRI_StealStringBuffer(dump._buffer); } } // insert the start tick (minus 1 to be on the safe side) as the // minimum tick we need to keep on the master. we cannot be sure // the master's response makes it to the slave safely, so we must // not insert the maximum of the WAL entries we sent. if we did, // and the response does not make it to the slave, the master will // note a higher tick than the slave will have received, which may // lead to the master eventually deleting a WAL section that the // slave will still request later insertClient(tickStart == 0 ? tickStart : tickStart - 1); } /// @brief run the command that determines which transactions were open at /// a given tick value /// this is an internal method use by ArangoDB's replication that should not /// be called by client drivers directly void MMFilesRestReplicationHandler::handleCommandDetermineOpenTransactions() { // determine start and end tick MMFilesLogfileManagerState const state = MMFilesLogfileManager::instance()->state(); TRI_voc_tick_t tickStart = 0; TRI_voc_tick_t tickEnd = state.lastCommittedTick; bool found; std::string const& value1 = _request->value("from", found); if (found) { tickStart = static_cast(StringUtils::uint64(value1)); } // determine end tick for dump std::string const& value2 = _request->value("to", found); if (found) { tickEnd = static_cast(StringUtils::uint64(value2)); } if (found && (tickStart > tickEnd || tickEnd == 0)) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return; } auto ctx = transaction::StandaloneContext::Create(_vocbase); // initialize the dump container MMFilesReplicationDumpContext dump(ctx, static_cast(determineChunkSize()), false, 0); // and dump int res = MMFilesDetermineOpenTransactionsReplication(&dump, tickStart, tickEnd); if (res != TRI_ERROR_NO_ERROR) { std::string const err = "failed to determine open transactions"; LOG_TOPIC(ERR, Logger::REPLICATION) << err; generateError(rest::ResponseCode::BAD, res, err); return; } // generate the result size_t const length = TRI_LengthStringBuffer(dump._buffer); if (length == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); } HttpResponse* httpResponse = dynamic_cast(_response.get()); if (_response == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } _response->setContentType(rest::ContentType::DUMP); _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, dump._fromTickIncluded ? "true" : "false"); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(dump._lastFoundTick)); if (length > 0) { // transfer ownership of the buffer contents httpResponse->body().set(dump._buffer); // to avoid double freeing TRI_StealStringBuffer(dump._buffer); } } void MMFilesRestReplicationHandler::handleCommandInventory() { TRI_voc_tick_t tick = TRI_CurrentTickServer(); bool found; // include system collections? bool includeSystem = true; { std::string const& value = _request->value("includeSystem", found); if (found) { includeSystem = StringUtils::boolean(value); } } // produce inventory for all databases? bool global = false; { std::string const& value = _request->value("global", found); if (found) { global = StringUtils::boolean(value); } } if (global && _request->databaseName() != StaticStrings::SystemDatabase) { generateError( rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN, "global inventory can only be created from within _system database"); return; } auto nameFilter = [includeSystem](LogicalCollection const* collection) { std::string const cname = collection->name(); if (!includeSystem && !cname.empty() && cname[0] == '_') { // exclude all system collections return false; } if (TRI_ExcludeCollectionReplication(cname, includeSystem)) { // collection is excluded from replication return false; } // all other cases should be included return true; }; // collections and indexes VPackBuilder inventoryBuilder; if (global) { DatabaseFeature::DATABASE->inventory(inventoryBuilder, tick, nameFilter); } else { _vocbase->inventory(inventoryBuilder, tick, nameFilter); } VPackSlice const inventory = inventoryBuilder.slice(); VPackBuilder builder; builder.openObject(); if (global) { TRI_ASSERT(inventory.isObject()); builder.add("databases", inventory); } else { // add collections data TRI_ASSERT(inventory.isArray()); builder.add("collections", inventory); } // "state" builder.add("state", VPackValue(VPackValueType::Object)); MMFilesLogfileManagerState const s = MMFilesLogfileManager::instance()->state(); builder.add("running", VPackValue(true)); builder.add("lastLogTick", VPackValue(std::to_string(s.lastCommittedTick))); builder.add("lastUncommittedLogTick", VPackValue(std::to_string(s.lastAssignedTick))); builder.add("totalEvents", VPackValue(s.numEvents + s.numEventsSync)); builder.add("time", VPackValue(s.timeString)); builder.close(); // state std::string const tickString(std::to_string(tick)); builder.add("tick", VPackValue(tickString)); builder.close(); // top level generateResult(rest::ResponseCode::OK, builder.slice()); } /// @brief produce list of keys for a specific collection void MMFilesRestReplicationHandler::handleCommandCreateKeys() { std::string const& collection = _request->value("collection"); if (collection.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid collection parameter"); return; } TRI_voc_tick_t tickEnd = UINT64_MAX; // determine end tick for keys bool found; std::string const& value = _request->value("to", found); if (found) { tickEnd = static_cast(StringUtils::uint64(value)); } arangodb::LogicalCollection* c = _vocbase->lookupCollection(collection); if (c == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); return; } auto guard = std::make_unique(_vocbase, c->cid(), false); arangodb::LogicalCollection* col = guard->collection(); TRI_ASSERT(col != nullptr); // turn off the compaction for the collection MMFilesEngine* engine = static_cast(EngineSelectorFeature::ENGINE); TRI_voc_tick_t id; int res = engine->insertCompactionBlocker(_vocbase, 1200.0, id); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } // initialize a container with the keys auto keys = std::make_unique(_vocbase, std::move(guard), id, 900.0); std::string const idString(std::to_string(keys->id())); keys->create(tickEnd); size_t const count = keys->count(); auto keysRepository = _vocbase->collectionKeys(); keysRepository->store(keys.get()); keys.release(); VPackBuilder result; result.add(VPackValue(VPackValueType::Object)); result.add("id", VPackValue(idString)); result.add("count", VPackValue(count)); result.close(); generateResult(rest::ResponseCode::OK, result.slice()); } /// @brief returns all key ranges void MMFilesRestReplicationHandler::handleCommandGetKeys() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting GET /_api/replication/keys/"); return; } static uint64_t const DefaultChunkSize = 5000; uint64_t chunkSize = DefaultChunkSize; // determine chunk size bool found; std::string const& value = _request->value("chunkSize", found); if (found) { chunkSize = StringUtils::uint64(value); if (chunkSize < 100) { chunkSize = DefaultChunkSize; } else if (chunkSize > 20000) { chunkSize = 20000; } } std::string const& id = suffixes[1]; auto keysRepository = _vocbase->collectionKeys(); TRI_ASSERT(keysRepository != nullptr); auto collectionKeysId = static_cast(arangodb::basics::StringUtils::uint64(id)); auto collectionKeys = keysRepository->find(collectionKeysId); if (collectionKeys == nullptr) { generateError(GeneralResponse::responseCode(TRI_ERROR_CURSOR_NOT_FOUND), TRI_ERROR_CURSOR_NOT_FOUND); return; } try { VPackBuilder b; b.add(VPackValue(VPackValueType::Array)); TRI_voc_tick_t max = static_cast(collectionKeys->count()); for (TRI_voc_tick_t from = 0; from < max; from += chunkSize) { TRI_voc_tick_t to = from + chunkSize; if (to > max) { to = max; } auto result = collectionKeys->hashChunk(static_cast(from), static_cast(to)); // Add a chunk b.add(VPackValue(VPackValueType::Object)); b.add("low", VPackValue(std::get<0>(result))); b.add("high", VPackValue(std::get<1>(result))); b.add("hash", VPackValue(std::to_string(std::get<2>(result)))); b.close(); } b.close(); collectionKeys->release(); generateResult(rest::ResponseCode::OK, b.slice()); } catch (...) { collectionKeys->release(); throw; } } /// @brief returns date for a key range void MMFilesRestReplicationHandler::handleCommandFetchKeys() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting PUT /_api/replication/keys/"); return; } static uint64_t const DefaultChunkSize = 5000; uint64_t chunkSize = DefaultChunkSize; // determine chunk size bool found; std::string const& value1 = _request->value("chunkSize", found); if (found) { chunkSize = StringUtils::uint64(value1); if (chunkSize < 100) { chunkSize = DefaultChunkSize; } else if (chunkSize > 20000) { chunkSize = 20000; } } std::string const& value2 = _request->value("chunk", found); size_t chunk = 0; if (found) { chunk = static_cast(StringUtils::uint64(value2)); } std::string const& value3 = _request->value("type", found); bool keys = true; if (value3 == "keys") { keys = true; } else if (value3 == "docs") { keys = false; } else { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid 'type' value"); return; } size_t offsetInChunk = 0; size_t maxChunkSize = SIZE_MAX; std::string const& value4 = _request->value("offset", found); if (found) { offsetInChunk = static_cast(StringUtils::uint64(value4)); // "offset" was introduced with ArangoDB 3.3. if the client sends it, // it means we can adapt the result size dynamically and the client // may refetch data for the same chunk maxChunkSize = 8 * 1024 * 1024; // if a client does not send an "offset" parameter at all, we are // not sure if it supports this protocol (3.2 and before) or not } std::string const& id = suffixes[1]; auto keysRepository = _vocbase->collectionKeys(); TRI_ASSERT(keysRepository != nullptr); auto collectionKeysId = static_cast(arangodb::basics::StringUtils::uint64(id)); auto collectionKeys = keysRepository->find(collectionKeysId); if (collectionKeys == nullptr) { generateError(GeneralResponse::responseCode(TRI_ERROR_CURSOR_NOT_FOUND), TRI_ERROR_CURSOR_NOT_FOUND); return; } try { auto ctx = transaction::StandaloneContext::Create(_vocbase); VPackBuilder resultBuilder(ctx->getVPackOptions()); resultBuilder.openArray(); if (keys) { collectionKeys->dumpKeys(resultBuilder, chunk, static_cast(chunkSize)); } else { bool success; std::shared_ptr parsedIds = parseVelocyPackBody(success); if (!success) { // error already created collectionKeys->release(); return; } collectionKeys->dumpDocs(resultBuilder, chunk, static_cast(chunkSize), offsetInChunk, maxChunkSize, parsedIds->slice()); } resultBuilder.close(); collectionKeys->release(); generateResult(rest::ResponseCode::OK, resultBuilder.slice(), ctx); return; } catch (...) { collectionKeys->release(); throw; } } void MMFilesRestReplicationHandler::handleCommandRemoveKeys() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting DELETE /_api/replication/keys/"); return; } std::string const& id = suffixes[1]; auto keys = _vocbase->collectionKeys(); TRI_ASSERT(keys != nullptr); auto collectionKeysId = static_cast(arangodb::basics::StringUtils::uint64(id)); bool found = keys->remove(collectionKeysId); if (!found) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND); return; } VPackBuilder resultBuilder; resultBuilder.openObject(); resultBuilder.add("id", VPackValue(id)); // id as a string resultBuilder.add(StaticStrings::Error, VPackValue(false)); resultBuilder.add(StaticStrings::Code, VPackValue(static_cast(rest::ResponseCode::ACCEPTED))); resultBuilder.close(); generateResult(rest::ResponseCode::ACCEPTED, resultBuilder.slice()); } void MMFilesRestReplicationHandler::handleCommandDump() { std::string const& collection = _request->value("collection"); if (collection.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid collection parameter"); return; } // determine start tick for dump TRI_voc_tick_t tickStart = 0; TRI_voc_tick_t tickEnd = static_cast(UINT64_MAX); bool flush = true; // flush WAL before dumping? bool withTicks = true; uint64_t flushWait = 0; // determine flush WAL value bool found; std::string const& value1 = _request->value("flush", found); if (found) { flush = StringUtils::boolean(value1); } // determine flush WAL wait time value std::string const& value3 = _request->value("flushWait", found); if (found) { flushWait = StringUtils::uint64(value3); if (flushWait > 60) { flushWait = 60; } } // determine start tick for dump std::string const& value4 = _request->value("from", found); if (found) { tickStart = (TRI_voc_tick_t)StringUtils::uint64(value4); } // determine end tick for dump std::string const& value5 = _request->value("to", found); if (found) { tickEnd = (TRI_voc_tick_t)StringUtils::uint64(value5); } if (tickStart > tickEnd || tickEnd == 0) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return; } bool includeSystem = true; std::string const& value6 = _request->value("includeSystem", found); if (found) { includeSystem = StringUtils::boolean(value6); } std::string const& value7 = _request->value("ticks", found); if (found) { withTicks = StringUtils::boolean(value7); } LogicalCollection* c = _vocbase->lookupCollection(collection); if (c == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); return; } ExecContext const* exec = ExecContext::CURRENT; if (exec != nullptr && !exec->canUseCollection(_vocbase->name(), c->name(), auth::Level::RO)) { generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN); return; } LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "requested collection dump for collection '" << collection << "', tickStart: " << tickStart << ", tickEnd: " << tickEnd; if (flush) { MMFilesLogfileManager::instance()->flush(true, true, false); // additionally wait for the collector if (flushWait > 0) { MMFilesLogfileManager::instance()->waitForCollectorQueue(c->cid(), static_cast(flushWait)); } } arangodb::CollectionGuard guard(_vocbase, c->cid(), false); arangodb::LogicalCollection* col = guard.collection(); TRI_ASSERT(col != nullptr); auto ctx = std::make_shared(_vocbase); // initialize the dump container MMFilesReplicationDumpContext dump(ctx, static_cast(determineChunkSize()), includeSystem, 0); int res = MMFilesDumpCollectionReplication(&dump, col, tickStart, tickEnd, withTicks); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } // generate the result size_t const length = TRI_LengthStringBuffer(dump._buffer); if (length == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); } // TODO needs to generalized auto response = dynamic_cast(_response.get()); if (response == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } response->setContentType(rest::ContentType::DUMP); // set headers _response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE, (dump._hasMore ? "true" : "false")); _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTINCLUDED, StringUtils::itoa(dump._lastFoundTick)); // transfer ownership of the buffer contents response->body().set(dump._buffer); // avoid double freeing TRI_StealStringBuffer(dump._buffer); }