//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann /// @author Jan Christoph Uhde //////////////////////////////////////////////////////////////////////////////// #include "RocksDBRestReplicationHandler.h" #include "Basics/StaticStrings.h" #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" #include "Logger/Logger.h" #include "Replication/Syncer.h" #include "Replication/utilities.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBReplicationContext.h" #include "RocksDBEngine/RocksDBReplicationManager.h" #include "RocksDBEngine/RocksDBReplicationTailing.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "Transaction/StandaloneContext.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" #include #include #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::rest; using namespace arangodb::rocksutils; RocksDBRestReplicationHandler::RocksDBRestReplicationHandler(GeneralRequest* request, GeneralResponse* response) : RestReplicationHandler(request, response), _manager(globalRocksEngine()->replicationManager()) {} void RocksDBRestReplicationHandler::handleCommandBatch() { // extract the request type auto const type = _request->requestType(); auto const& suffixes = _request->suffixes(); size_t const len = suffixes.size(); TRI_ASSERT(len >= 1); if (type == rest::RequestType::POST) { // create a new blocker bool parseSuccess = true; VPackSlice body = this->parseVPackBody(parseSuccess); if (!parseSuccess || !body.isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); return; } std::string patchCount = VelocyPackHelper::getStringValue(body, "patchCount", ""); TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); std::string const clientInfo = _request->value("clientInfo"); // create transaction+snapshot, ttl will be default if `ttl == 0`` auto ttl = VelocyPackHelper::getNumericValue(body, "ttl", replutils::BatchInfo::DefaultTimeout); auto* ctx = _manager->createContext(ttl, syncerId, clientId); RocksDBReplicationContextGuard guard(_manager, ctx); if (!patchCount.empty()) { auto triple = ctx->bindCollectionIncremental(_vocbase, patchCount); Result res = std::get<0>(triple); if (res.fail()) { LOG_TOPIC("3d5d4", WARN, Logger::REPLICATION) << "Error during first phase of" << " collection count patching: " << res.errorMessage(); } } VPackBuilder b; b.add(VPackValue(VPackValueType::Object)); b.add("id", VPackValue(std::to_string(ctx->id()))); // id always string b.add("lastTick", VPackValue(std::to_string(ctx->snapshotTick()))); b.close(); _vocbase.replicationClients().track(syncerId, clientId, clientInfo, ctx->snapshotTick(), ttl); generateResult(rest::ResponseCode::OK, b.slice()); return; } if (type == rest::RequestType::PUT && len >= 2) { // extend an existing blocker auto id = static_cast(StringUtils::uint64(suffixes[1])); auto input = _request->toVelocyPackBuilderPtr(); if (input == nullptr || !input->slice().isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); return; } // extract ttl. Context uses initial ttl from batch creation, if `ttl == 0` auto ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", replutils::BatchInfo::DefaultTimeout); auto res = _manager->extendLifetime(id, ttl); if (res.fail()) { generateError(res.result()); return; } SyncerId const syncerId = std::get(res.get()); TRI_server_id_t const clientId = std::get(res.get()); std::string const& clientInfo = std::get(res.get()); // last tick value in context should not have changed compared to the // initial tick value used in the context (it's only updated on bind() // call, which is only executed when a batch is initially created) _vocbase.replicationClients().extend(syncerId, clientId, clientInfo, ttl); resetResponse(rest::ResponseCode::NO_CONTENT); return; } if (type == rest::RequestType::DELETE_REQ && len >= 2) { // delete an existing blocker auto id = static_cast(StringUtils::uint64(suffixes[1])); bool found = _manager->remove(id); if (found) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { int res = TRI_ERROR_CURSOR_NOT_FOUND; generateError(GeneralResponse::responseCode(res), res); } return; } // we get here if anything above is invalid generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); } // handled by the batch for rocksdb void RocksDBRestReplicationHandler::handleCommandBarrier() { auto const type = _request->requestType(); if (type == rest::RequestType::POST) { VPackBuilder b; b.add(VPackValue(VPackValueType::Object)); // always return a non-0 barrier id // it will be ignored by the client anyway for the RocksDB engine std::string const idString = std::to_string(TRI_NewTickServer()); b.add("id", VPackValue(idString)); b.close(); generateResult(rest::ResponseCode::OK, b.slice()); } else if (type == rest::RequestType::PUT || type == rest::RequestType::DELETE_REQ) { resetResponse(rest::ResponseCode::NO_CONTENT); } else if (type == rest::RequestType::GET) { generateResult(rest::ResponseCode::OK, VPackSlice::emptyArraySlice()); } } void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { bool useVst = false; if (_request->transportType() == Endpoint::TransportType::VST) { useVst = true; } // determine start and end tick TRI_voc_tick_t tickStart = 0; TRI_voc_tick_t tickEnd = UINT64_MAX; bool found; std::string const& value1 = _request->value("from", found); if (found) { tickStart = static_cast(StringUtils::uint64(value1)); } // determine end tick for dump std::string const& value2 = _request->value("to", found); if (found) { tickEnd = static_cast(StringUtils::uint64(value2)); } if (found && (tickStart > tickEnd || tickEnd == 0)) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid from/to values"); return; } // add client TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); std::string const clientInfo = _request->value("clientInfo"); bool includeSystem = _request->parsedValue("includeSystem", true); auto chunkSize = _request->parsedValue("chunkSize", 1024 * 1024); grantTemporaryRights(); // extract collection TRI_voc_cid_t cid = 0; std::string const& value6 = _request->value("collection", found); if (found) { auto c = _vocbase.lookupCollection(value6); if (c == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); return; } cid = c->id(); } auto trxContext = transaction::StandaloneContext::Create(_vocbase); VPackBuilder builder(trxContext->getVPackOptions()); builder.openArray(); auto result = tailWal(&_vocbase, tickStart, tickEnd, static_cast(chunkSize), includeSystem, cid, builder); builder.close(); auto data = builder.slice(); uint64_t const latest = latestSequenceNumber(); if (result.fail()) { generateError(GeneralResponse::responseCode(result.errorNumber()), result.errorNumber(), result.errorMessage()); return; } TRI_ASSERT(latest >= result.maxTick()); bool const checkMore = (result.maxTick() > 0 && result.maxTick() < latest); // generate the result size_t length = data.length(); TRI_ASSERT(length == 0 || result.maxTick() > 0); if (length == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); } // transfer ownership of the buffer contents _response->setContentType(rest::ContentType::DUMP); // set headers _response->setHeaderNC(StaticStrings::ReplicationHeaderCheckMore, checkMore ? "true" : "false"); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastIncluded, StringUtils::itoa((length == 0) ? 0 : result.maxTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, StringUtils::itoa(latest)); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, StringUtils::itoa(result.lastScannedTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); // TODO remove _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, result.minTickIncluded() ? "true" : "false"); if (length > 0) { if (useVst) { for (auto message : arangodb::velocypack::ArrayIterator(data)) { _response->addPayload(VPackSlice(message), trxContext->getVPackOptions(), true); } } else { HttpResponse* httpResponse = dynamic_cast(_response.get()); if (httpResponse == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } basics::StringBuffer& buffer = httpResponse->body(); arangodb::basics::VPackStringBufferAdapter adapter(buffer.stringBuffer()); // note: we need the CustomTypeHandler here VPackDumper dumper(&adapter, trxContext->getVPackOptions()); for (auto marker : arangodb::velocypack::ArrayIterator(data)) { dumper.dump(marker); httpResponse->body().appendChar('\n'); // LOG_TOPIC("2c0b2", INFO, Logger::REPLICATION) << // marker.toJson(trxContext->getVPackOptions()); } } } // insert the start tick (minus 1 to be on the safe side) as the // minimum tick we need to keep on the master. we cannot be sure // the master's response makes it to the slave safely, so we must // not insert the maximum of the WAL entries we sent. if we did, // and the response does not make it to the slave, the master will // note a higher tick than the slave will have received, which may // lead to the master eventually deleting a WAL section that the // slave will still request later double ttl = _request->parsedValue("ttl", replutils::BatchInfo::DefaultTimeout); _vocbase.replicationClients().track(syncerId, clientId, clientInfo, tickStart == 0 ? 0 : tickStart - 1, ttl); } /// @brief run the command that determines which transactions were open at /// a given tick value /// this is an internal method use by ArangoDB's replication that should not /// be called by client drivers directly void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() { generateResult(rest::ResponseCode::OK, VPackSlice::emptyArraySlice()); // rocksdb only includes finished transactions in the WAL. _response->setContentType(rest::ContentType::DUMP); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, "0"); // always true to satisfy continuous syncer _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, "true"); } void RocksDBRestReplicationHandler::handleCommandInventory() { bool found; std::string batchId = _request->value("batchId", found); if (!found) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND, "batchId not specified"); return; } RocksDBReplicationContext* ctx = _manager->find(StringUtils::uint64(batchId)); RocksDBReplicationContextGuard guard(_manager, ctx); if (ctx == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND, "context was not found"); return; } TRI_voc_tick_t tick = TRI_CurrentTickServer(); // include system collections? bool includeSystem = _request->parsedValue("includeSystem", true); bool includeFoxxQs = _request->parsedValue("includeFoxxQueues", false); // produce inventory for all databases? bool isGlobal = false; getApplier(isGlobal); VPackBuilder builder; builder.openObject(); // add collections and views Result res; if (isGlobal) { builder.add(VPackValue("databases")); res = ctx->getInventory(_vocbase, includeSystem, includeFoxxQs, true, builder); } else { grantTemporaryRights(); res = ctx->getInventory(_vocbase, includeSystem, includeFoxxQs, false, builder); TRI_ASSERT(builder.hasKey("collections") && builder.hasKey("views")); } if (res.fail()) { generateError(rest::ResponseCode::BAD, res.errorNumber(), "inventory could not be created"); return; } const std::string snapTick = std::to_string(ctx->snapshotTick()); // builder.add("state", VPackValue(VPackValueType::Object)); builder.add("running", VPackValue(true)); builder.add("lastLogTick", VPackValue(snapTick)); builder.add("lastUncommittedLogTick", VPackValue(snapTick)); builder.add("totalEvents", VPackValue(ctx->snapshotTick())); builder.add("time", VPackValue(utilities::timeString())); builder.close(); // builder.add("tick", VPackValue(std::to_string(tick))); builder.close(); // Toplevel generateResult(rest::ResponseCode::OK, builder.slice()); } /// @brief produce list of keys for a specific collection void RocksDBRestReplicationHandler::handleCommandCreateKeys() { std::string const& collection = _request->value("collection"); if (collection.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid collection parameter"); return; } // to is ignored because the snapshot time is the latest point in time RocksDBReplicationContext* ctx = nullptr; // get batchId from url parameters bool found; std::string batchId = _request->value("batchId", found); // find context if (found) { ctx = _manager->find(StringUtils::uint64(batchId)); } RocksDBReplicationContextGuard guard(_manager, ctx); if (!found || ctx == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND, "batchId not specified"); return; } // bind collection to context - will initialize iterator Result res; TRI_voc_cid_t cid; uint64_t numDocs; std::tie(res, cid, numDocs) = ctx->bindCollectionIncremental(_vocbase, collection); if (res.fail()) { generateError(res); return; } // keysId = - std::string keysId = StringUtils::itoa(ctx->id()); keysId.push_back('-'); keysId.append(StringUtils::itoa(cid)); VPackBuilder result; result.add(VPackValue(VPackValueType::Object)); result.add("id", VPackValue(keysId)); result.add("count", VPackValue(numDocs)); result.close(); generateResult(rest::ResponseCode::OK, result.slice()); } static std::pair extractBatchAndCid(std::string const& input) { auto pos = input.find('-'); if (pos != std::string::npos && input.size() > pos + 1 && pos > 1) { return std::make_pair(StringUtils::uint64(input.c_str(), pos), StringUtils::uint64(input.substr(pos + 1))); } return std::make_pair(0, 0); } /// @brief returns all key ranges void RocksDBRestReplicationHandler::handleCommandGetKeys() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting GET /_api/replication/keys/"); return; } static uint64_t const DefaultChunkSize = 5000; // determine chunk size uint64_t chunkSize = _request->parsedValue("chunkSize", DefaultChunkSize); if (chunkSize < 100) { chunkSize = DefaultChunkSize; } else if (chunkSize > 20000) { chunkSize = 20000; } // first suffix needs to be the key id std::string const& keysId = suffixes[1]; // - uint64_t batchId; TRI_voc_cid_t cid; std::tie(batchId, cid) = extractBatchAndCid(keysId); // get context RocksDBReplicationContext* ctx = _manager->find(batchId); // lock context RocksDBReplicationContextGuard guard(_manager, ctx); if (ctx == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND, "batchId not specified, expired or invalid in another way"); return; } VPackBuffer buffer; VPackBuilder builder(buffer); ctx->dumpKeyChunks(_vocbase, cid, builder, chunkSize); generateResult(rest::ResponseCode::OK, std::move(buffer)); } /// @brief returns date for a key range void RocksDBRestReplicationHandler::handleCommandFetchKeys() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting PUT /_api/replication/keys/"); return; } static uint64_t const DefaultChunkSize = 5000; // determine chunk size uint64_t chunkSize = _request->parsedValue("chunkSize", DefaultChunkSize); if (chunkSize < 100) { chunkSize = DefaultChunkSize; } else if (chunkSize > 20000) { chunkSize = 20000; } // chunk is supplied by old clients, low is an optimization // for rocksdb, because seeking should be cheaper size_t chunk = static_cast(_request->parsedValue("chunk", uint64_t(0))); bool found; std::string const& lowKey = _request->value("low", found); std::string const& value = _request->value("type", found); bool keys = true; if (value == "keys") { keys = true; } else if (value == "docs") { keys = false; } else { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid 'type' value"); return; } // first suffix needs to be the key id std::string const& keysId = suffixes[1]; // - uint64_t batchId; TRI_voc_cid_t cid; std::tie(batchId, cid) = extractBatchAndCid(keysId); RocksDBReplicationContext* ctx = _manager->find(batchId); RocksDBReplicationContextGuard guard(_manager, ctx); if (ctx == nullptr) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND, "batchId not specified or not found"); return; } auto transactionContext = transaction::StandaloneContext::Create(_vocbase); VPackBuffer buffer; VPackBuilder builder(buffer, transactionContext->getVPackOptions()); if (keys) { Result rv = ctx->dumpKeys(_vocbase, cid, builder, chunk, static_cast(chunkSize), lowKey); if (rv.fail()) { generateError(rv); return; } } else { size_t offsetInChunk = 0; size_t maxChunkSize = SIZE_MAX; std::string const& value2 = _request->value("offset", found); if (found) { offsetInChunk = static_cast(StringUtils::uint64(value2)); // "offset" was introduced with ArangoDB 3.3. if the client sends it, // it means we can adapt the result size dynamically and the client // may refetch data for the same chunk maxChunkSize = 8 * 1024 * 1024; // if a client does not send an "offset" parameter at all, we are // not sure if it supports this protocol (3.2 and before) or not } bool success = false; VPackSlice const parsedIds = this->parseVPackBody(success); if (!success) { generateResult(rest::ResponseCode::BAD, VPackSlice()); return; } Result rv = ctx->dumpDocuments(_vocbase, cid, builder, chunk, static_cast(chunkSize), offsetInChunk, maxChunkSize, lowKey, parsedIds); if (rv.fail()) { generateError(rv); return; } } generateResult(rest::ResponseCode::OK, std::move(buffer), transactionContext); } void RocksDBRestReplicationHandler::handleCommandRemoveKeys() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting DELETE /_api/replication/keys/"); return; } // first suffix needs to be the key id std::string const& keysId = suffixes[1]; // - uint64_t batchId; TRI_voc_cid_t cid; std::tie(batchId, cid) = extractBatchAndCid(keysId); RocksDBReplicationContext* ctx = _manager->find(batchId); RocksDBReplicationContextGuard guard(_manager, ctx); if (ctx != nullptr) { ctx->releaseIterators(_vocbase, cid); } VPackBuilder resultBuilder; resultBuilder.openObject(); resultBuilder.add("id", VPackValue(keysId)); // id as a string resultBuilder.add(StaticStrings::Error, VPackValue(false)); resultBuilder.add(StaticStrings::Code, VPackValue(static_cast(rest::ResponseCode::ACCEPTED))); resultBuilder.close(); generateResult(rest::ResponseCode::ACCEPTED, resultBuilder.slice()); } void RocksDBRestReplicationHandler::handleCommandDump() { LOG_TOPIC("213e2", TRACE, arangodb::Logger::REPLICATION) << "enter handleCommandDump"; bool found = false; uint64_t contextId = 0; // contains dump options that might need to be inspected // VPackSlice options = _request->payload(); // get collection Name std::string const& cname = _request->value("collection"); if (cname.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid collection parameter"); return; } // get contextId std::string const& contextIdString = _request->value("batchId", found); if (found) { contextId = StringUtils::uint64(contextIdString); } else { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_BAD_PARAMETER, "replication dump - request misses batchId"); return; } // acquire context RocksDBReplicationContext* ctx = _manager->find(contextId, /*ttl*/ 0); RocksDBReplicationContextGuard guard(_manager, ctx); if (ctx == nullptr) { generateError( rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_BAD_PARAMETER, "replication dump - unable to find context (it could be expired)"); return; } // print request LOG_TOPIC("2b20f", TRACE, arangodb::Logger::REPLICATION) << "requested collection dump for collection '" << collection << "' using contextId '" << ctx->id() << "'"; grantTemporaryRights(); ExecContext const* exec = ExecContext::CURRENT; if (exec != nullptr && !exec->canUseCollection(_vocbase.name(), cname, auth::Level::RO)) { generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_FORBIDDEN); return; } uint64_t chunkSize = determineChunkSize(); size_t reserve = std::max(chunkSize, 8192); RocksDBReplicationContext::DumpResult res(TRI_ERROR_NO_ERROR); if (request()->contentTypeResponse() == rest::ContentType::VPACK) { VPackBuffer buffer; buffer.reserve(reserve); // avoid reallocs res = ctx->dumpVPack(_vocbase, cname, buffer, chunkSize); // generate the result if (res.fail()) { generateError(res.result()); } else if (buffer.byteSize() == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); _response->setContentType(rest::ContentType::VPACK); _response->setPayload(std::move(buffer), true, VPackOptions::Options::Defaults, /*resolveExternals*/ false); } // set headers _response->setHeaderNC(StaticStrings::ReplicationHeaderCheckMore, (res.hasMore ? "true" : "false")); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastIncluded, StringUtils::itoa(buffer.empty() ? 0 : res.includedTick)); } else { auto response = dynamic_cast(_response.get()); StringBuffer dump(reserve, false); if (response == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type"); } // do the work! res = ctx->dumpJson(_vocbase, cname, dump, determineChunkSize()); if (res.fail()) { if (res.is(TRI_ERROR_BAD_PARAMETER)) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "replication dump - " + res.errorMessage()); return; } generateError(rest::ResponseCode::SERVER_ERROR, res.errorNumber(), "replication dump - " + res.errorMessage()); return; } // generate the result if (dump.length() == 0) { resetResponse(rest::ResponseCode::NO_CONTENT); } else { resetResponse(rest::ResponseCode::OK); } response->setContentType(rest::ContentType::DUMP); // set headers _response->setHeaderNC(StaticStrings::ReplicationHeaderCheckMore, (res.hasMore ? "true" : "false")); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastIncluded, StringUtils::itoa((dump.length() == 0) ? 0 : res.includedTick)); // transfer ownership of the buffer contents response->body().set(dump.stringBuffer()); // avoid double freeing TRI_StealStringBuffer(dump.stringBuffer()); } }