From ce762fe8725b44442f6e8a9d0866616cb84f7934 Mon Sep 17 00:00:00 2001 From: Jan Christoph Uhde Date: Fri, 28 Apr 2017 14:21:55 +0200 Subject: [PATCH] add handleCommandDetermineOpenTransactions --- .../RocksDBRestReplicationHandler.cpp | 127 +++++++++++------- 1 file changed, 77 insertions(+), 50 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index d5e829682a..32418490a3 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -624,67 +624,71 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { builder.close(); auto data = builder.slice(); - if (result.ok()) { - bool const checkMore = - (result.maxTick() > 0 && result.maxTick() < latestSequenceNumber()); + if (result.fail()) { + generateError(rest::ResponseCode::SERVER_ERROR, result.errorNumber(), result.errorMessage()); + return; + } - // generate the result - size_t length = data.length(); + bool const checkMore = + (result.maxTick() > 0 && result.maxTick() < latestSequenceNumber()); - if (length == 0) { - resetResponse(rest::ResponseCode::NO_CONTENT); - } else { - resetResponse(rest::ResponseCode::OK); - } + // generate the result + size_t length = data.length(); - // transfer ownership of the buffer contents - _response->setContentType(rest::ContentType::DUMP); + if (length == 0) { + resetResponse(rest::ResponseCode::NO_CONTENT); + } else { + resetResponse(rest::ResponseCode::OK); + } - // set headers - _response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE, - checkMore ? "true" : "false"); - _response->setHeaderNC( - TRI_REPLICATION_HEADER_LASTINCLUDED, - StringUtils::itoa((length == 0) ? 0 : result.maxTick())); - _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, - StringUtils::itoa(latestSequenceNumber())); - _response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true"); - _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, - result.fromTickIncluded() ? "true" : "false"); + // transfer ownership of the buffer contents + _response->setContentType(rest::ContentType::DUMP); - if (length > 0) { - if (useVpp) { - auto iter = arangodb::velocypack::ArrayIterator(data); - auto opts = arangodb::velocypack::Options::Defaults; - for (auto message : iter) { - _response->addPayload(VPackSlice(message), &opts, true); - } - } else { - HttpResponse* httpResponse = - dynamic_cast(_response.get()); + // set headers + _response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE, + checkMore ? "true" : "false"); + _response->setHeaderNC( + TRI_REPLICATION_HEADER_LASTINCLUDED, + StringUtils::itoa((length == 0) ? 0 : result.maxTick())); + _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, + StringUtils::itoa(latestSequenceNumber())); + _response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true"); + _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, + result.fromTickIncluded() ? "true" : "false"); - if (httpResponse == nullptr) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, - "invalid response type"); - } - - if (length > 0) { - httpResponse->body().appendText(data.toJson()); - } + if (length > 0) { + if (useVpp) { + auto iter = arangodb::velocypack::ArrayIterator(data); + auto opts = arangodb::velocypack::Options::Defaults; + for (auto message : iter) { + _response->addPayload(VPackSlice(message), &opts, true); } - // add client - bool found; - std::string const& value = _request->value("serverId", found); + } else { + HttpResponse* httpResponse = + dynamic_cast(_response.get()); - if (found) { - TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value); + if (httpResponse == nullptr) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "invalid response type"); + } - if (serverId > 0) { - _vocbase->updateReplicationClient(serverId, result.maxTick()); - } + if (length > 0) { + httpResponse->body().appendText(data.toJson()); + } + } + // add client + bool found; + std::string const& value = _request->value("serverId", found); + + if (found) { + TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value); + + if (serverId > 0) { + _vocbase->updateReplicationClient(serverId, result.maxTick()); } } } + } //////////////////////////////////////////////////////////////////////////////// @@ -695,7 +699,30 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { //////////////////////////////////////////////////////////////////////////////// void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() { - generateResult(rest::ResponseCode::OK, VelocyPackHelper::EmptyArrayValue()); + bool useVpp = false; + if (_request->transportType() == Endpoint::TransportType::VPP) { + useVpp = true; + } + //_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(dump._lastFoundTick)); + _response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, "0"); + _response->setContentType(rest::ContentType::DUMP); + //_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, dump._fromTickIncluded ? "true" : "false"); + _response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, "true"); + VPackSlice slice = VelocyPackHelper::EmptyArrayValue(); + if (useVpp) { + _response->addPayload(slice, &VPackOptions::Defaults, false); + } else { + HttpResponse* httpResponse = + dynamic_cast(_response.get()); + + if (httpResponse == nullptr) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "invalid response type"); + } + + httpResponse->body().appendText(slice.toJson()); + } + _response->setResponseCode(rest::ResponseCode::OK); } ////////////////////////////////////////////////////////////////////////////////