1
0
Fork 0

add handleCommandDetermineOpenTransactions

This commit is contained in:
Jan Christoph Uhde 2017-04-28 14:21:55 +02:00
parent 1b1fb806fd
commit ce762fe872
1 changed files with 77 additions and 50 deletions

View File

@ -624,67 +624,71 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
builder.close();
auto data = builder.slice();
if (result.ok()) {
bool const checkMore =
(result.maxTick() > 0 && result.maxTick() < latestSequenceNumber());
if (result.fail()) {
generateError(rest::ResponseCode::SERVER_ERROR, result.errorNumber(), result.errorMessage());
return;
}
// generate the result
size_t length = data.length();
bool const checkMore =
(result.maxTick() > 0 && result.maxTick() < latestSequenceNumber());
if (length == 0) {
resetResponse(rest::ResponseCode::NO_CONTENT);
} else {
resetResponse(rest::ResponseCode::OK);
}
// generate the result
size_t length = data.length();
// transfer ownership of the buffer contents
_response->setContentType(rest::ContentType::DUMP);
if (length == 0) {
resetResponse(rest::ResponseCode::NO_CONTENT);
} else {
resetResponse(rest::ResponseCode::OK);
}
// set headers
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
checkMore ? "true" : "false");
_response->setHeaderNC(
TRI_REPLICATION_HEADER_LASTINCLUDED,
StringUtils::itoa((length == 0) ? 0 : result.maxTick()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK,
StringUtils::itoa(latestSequenceNumber()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true");
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT,
result.fromTickIncluded() ? "true" : "false");
// transfer ownership of the buffer contents
_response->setContentType(rest::ContentType::DUMP);
if (length > 0) {
if (useVpp) {
auto iter = arangodb::velocypack::ArrayIterator(data);
auto opts = arangodb::velocypack::Options::Defaults;
for (auto message : iter) {
_response->addPayload(VPackSlice(message), &opts, true);
}
} else {
HttpResponse* httpResponse =
dynamic_cast<HttpResponse*>(_response.get());
// set headers
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
checkMore ? "true" : "false");
_response->setHeaderNC(
TRI_REPLICATION_HEADER_LASTINCLUDED,
StringUtils::itoa((length == 0) ? 0 : result.maxTick()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK,
StringUtils::itoa(latestSequenceNumber()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true");
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT,
result.fromTickIncluded() ? "true" : "false");
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
if (length > 0) {
httpResponse->body().appendText(data.toJson());
}
if (length > 0) {
if (useVpp) {
auto iter = arangodb::velocypack::ArrayIterator(data);
auto opts = arangodb::velocypack::Options::Defaults;
for (auto message : iter) {
_response->addPayload(VPackSlice(message), &opts, true);
}
// add client
bool found;
std::string const& value = _request->value("serverId", found);
} else {
HttpResponse* httpResponse =
dynamic_cast<HttpResponse*>(_response.get());
if (found) {
TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value);
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
if (serverId > 0) {
_vocbase->updateReplicationClient(serverId, result.maxTick());
}
if (length > 0) {
httpResponse->body().appendText(data.toJson());
}
}
// add client
bool found;
std::string const& value = _request->value("serverId", found);
if (found) {
TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value);
if (serverId > 0) {
_vocbase->updateReplicationClient(serverId, result.maxTick());
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
@ -695,7 +699,30 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
////////////////////////////////////////////////////////////////////////////////
void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() {
generateResult(rest::ResponseCode::OK, VelocyPackHelper::EmptyArrayValue());
bool useVpp = false;
if (_request->transportType() == Endpoint::TransportType::VPP) {
useVpp = true;
}
//_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(dump._lastFoundTick));
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, "0");
_response->setContentType(rest::ContentType::DUMP);
//_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, dump._fromTickIncluded ? "true" : "false");
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, "true");
VPackSlice slice = VelocyPackHelper::EmptyArrayValue();
if (useVpp) {
_response->addPayload(slice, &VPackOptions::Defaults, false);
} else {
HttpResponse* httpResponse =
dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
httpResponse->body().appendText(slice.toJson());
}
_response->setResponseCode(rest::ResponseCode::OK);
}
////////////////////////////////////////////////////////////////////////////////