1
0
Fork 0

Added (rough) logger-follow handler.

This commit is contained in:
Dan Larkin 2017-04-20 14:59:49 -04:00
parent 973b410698
commit 57701ff766
6 changed files with 203 additions and 77 deletions

View File

@ -161,6 +161,11 @@ arangodb::Result globalRocksDBRemove(rocksdb::Slice const& key,
return convertStatus(status);
};
uint64_t latestSequenceNumber() {
auto seq = globalRocksDB()->GetLatestSequenceNumber();
return static_cast<uint64_t>(seq);
};
void addCollectionMapping(uint64_t objectId, TRI_voc_tick_t did,
TRI_voc_cid_t cid) {
StorageEngine* engine = EngineSelectorFeature::ENGINE;

View File

@ -99,6 +99,8 @@ arangodb::Result globalRocksDBRemove(
rocksdb::Slice const& key,
rocksdb::WriteOptions const& = rocksdb::WriteOptions{});
uint64_t latestSequenceNumber();
void addCollectionMapping(uint64_t, TRI_voc_tick_t, TRI_voc_cid_t);
std::pair<TRI_voc_tick_t, TRI_voc_cid_t> mapObjectToCollection(uint64_t);

View File

@ -27,6 +27,12 @@ using namespace arangodb;
RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber,
uint64_t maxTick)
: Result(errorNumber), _maxTick(maxTick) {}
: Result(errorNumber), _maxTick(maxTick), _fromTickIncluded(false) {}
uint64_t RocksDBReplicationResult::maxTick() const { return _maxTick; }
bool RocksDBReplicationResult::fromTickIncluded() const {
return _fromTickIncluded;
}
void RocksDBReplicationResult::includeFromTick() { _fromTickIncluded = true; }

View File

@ -33,9 +33,13 @@ class RocksDBReplicationResult : public Result {
public:
RocksDBReplicationResult(int, uint64_t);
uint64_t maxTick() const;
bool fromTickIncluded() const;
void includeFromTick();
private:
uint64_t _maxTick;
bool _fromTickIncluded;
};
} // namespace arangodb

View File

@ -76,7 +76,7 @@ class WBReader : public rocksdb::WriteBatch::Handler {
_builder.close();
if (res == TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_NO_ERROR && _limit > 0) {
_limit--;
}
}
@ -192,22 +192,35 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
return {converted.errorNumber(), lastTick};
}
bool fromTickIncluded = false;
while (iterator->Valid() && limit > 0) {
s = iterator->status();
if (s.ok()) {
rocksdb::BatchResult batch = iterator->GetBatch();
lastTick = batch.sequence;
if (lastTick == from) {
fromTickIncluded = true;
}
s = batch.writeBatchPtr->Iterate(handler.get());
}
if (!s.ok()) {
LOG_TOPIC(ERR, Logger::ENGINES) << "Error during WAL scan";
LOG_TOPIC(ERR, Logger::ENGINES) << iterator->status().getState();
auto converted = convertStatus(s);
return {converted.errorNumber(), lastTick};
auto result = RocksDBReplicationResult(converted.errorNumber(), lastTick);
if (fromTickIncluded) {
result.includeFromTick();
}
return result;
}
iterator->Next();
}
return {TRI_ERROR_NO_ERROR, lastTick};
auto result = RocksDBReplicationResult(TRI_ERROR_NO_ERROR, lastTick);
if (fromTickIncluded) {
result.includeFromTick();
}
return result;
}

View File

@ -23,11 +23,6 @@
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEngine/RocksDBRestReplicationHandler.h"
#include "RocksDBEngine/RocksDBReplicationContext.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBReplicationContext.h"
#include "RocksDBEngine/RocksDBReplicationManager.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ConditionLocker.h"
#include "Basics/ReadLocker.h"
@ -45,6 +40,11 @@
#include "Rest/Version.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/ServerIdFeature.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBReplicationContext.h"
#include "RocksDBEngine/RocksDBReplicationManager.h"
#include "RocksDBEngine/RocksDBReplicationTailing.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "Transaction/Context.h"
@ -346,14 +346,14 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
}
// extract ttl
//double expires =
//VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
// double expires =
// VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
//TRI_voc_tick_t id;
//StorageEngine* engine = EngineSelectorFeature::ENGINE;
//int res = engine->insertCompactionBlocker(_vocbase, expires, id);
// TRI_voc_tick_t id;
// StorageEngine* engine = EngineSelectorFeature::ENGINE;
// int res = engine->insertCompactionBlocker(_vocbase, expires, id);
RocksDBReplicationContext *ctx = _manager->createContext();
RocksDBReplicationContext* ctx = _manager->createContext();
if (ctx == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_FAILED);
}
@ -369,7 +369,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
if (type == rest::RequestType::PUT && len >= 2) {
// extend an existing blocker
TRI_voc_tick_t id =
static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
auto input = _request->toVelocyPackBuilderPtr();
@ -381,11 +381,11 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
// extract ttl
double expires =
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
int res = TRI_ERROR_NO_ERROR;
bool busy;
RocksDBReplicationContext *ctx = _manager->find(id, busy, expires);
RocksDBReplicationContext* ctx = _manager->find(id, busy, expires);
if (busy) {
res = TRI_ERROR_CURSOR_BUSY;
} else if (ctx == nullptr) {
@ -395,7 +395,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
}
// now extend the blocker
//StorageEngine* engine = EngineSelectorFeature::ENGINE;
// StorageEngine* engine = EngineSelectorFeature::ENGINE;
// res = engine->extendCompactionBlocker(_vocbase, id, expires);
if (res == TRI_ERROR_NO_ERROR) {
@ -409,11 +409,11 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
if (type == rest::RequestType::DELETE_REQ && len >= 2) {
// delete an existing blocker
TRI_voc_tick_t id =
static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
bool found = _manager->remove(id);
//StorageEngine* engine = EngineSelectorFeature::ENGINE;
//int res = engine->removeCompactionBlocker(_vocbase, id);
// StorageEngine* engine = EngineSelectorFeature::ENGINE;
// int res = engine->removeCompactionBlocker(_vocbase, id);
if (found) {
resetResponse(rest::ResponseCode::NO_CONTENT);
@ -556,9 +556,109 @@ void RocksDBRestReplicationHandler::handleTrampolineCoordinator() {
////////////////////////////////////////////////////////////////////////////////
void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
generateError(rest::ResponseCode::NOT_IMPLEMENTED,
TRI_ERROR_NOT_YET_IMPLEMENTED,
"replication API is not fully implemented for RocksDB yet");
bool useVpp = false;
if (_request->transportType() == Endpoint::TransportType::VPP) {
useVpp = true;
}
// determine start and end tick
TRI_voc_tick_t tickStart = 0;
TRI_voc_tick_t tickEnd = UINT64_MAX;
bool found;
std::string const& value1 = _request->value("from", found);
if (found) {
tickStart = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value1));
}
// determine end tick for dump
std::string const& value2 = _request->value("to", found);
if (found) {
tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value2));
}
if (found && (tickStart > tickEnd || tickEnd == 0)) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid from/to values");
return;
}
bool includeSystem = true;
std::string const& value4 = _request->value("includeSystem", found);
if (found) {
includeSystem = StringUtils::boolean(value4);
}
size_t limit = 10000; // TODO: determine good default value?
std::string const& value5 = _request->value("chunkSize", found);
if (found) {
limit = static_cast<size_t>(StringUtils::uint64(value5));
}
VPackBuilder builder;
builder.openArray();
auto result = tailWal(_vocbase, tickStart, limit, includeSystem, builder);
builder.close();
auto data = builder.slice();
if (result.ok()) {
bool const checkMore =
(result.maxTick() > 0 && result.maxTick() < latestSequenceNumber());
// generate the result
size_t length = 0;
if (useVpp) {
length = data.length();
} else {
length = data.byteSize();
}
if (data.length()) {
resetResponse(rest::ResponseCode::NO_CONTENT);
} else {
resetResponse(rest::ResponseCode::OK);
}
// transfer ownership of the buffer contents
_response->setContentType(rest::ContentType::DUMP);
// set headers
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
checkMore ? "true" : "false");
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTINCLUDED,
StringUtils::itoa(result.maxTick()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK,
StringUtils::itoa(latestSequenceNumber()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true");
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT,
result.fromTickIncluded() ? "true" : "false");
if (length > 0) {
if (useVpp) {
auto iter = arangodb::velocypack::ArrayIterator(data);
auto opts = arangodb::velocypack::Options::Defaults;
for (auto message : iter) {
_response->addPayload(VPackSlice(message), &opts, true);
}
} else {
HttpResponse* httpResponse =
dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
if (length > 0) {
httpResponse->body().appendText(data.toJson());
}
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
@ -579,15 +679,14 @@ void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() {
////////////////////////////////////////////////////////////////////////////////
void RocksDBRestReplicationHandler::handleCommandInventory() {
RocksDBReplicationContext *ctx = nullptr;
RocksDBReplicationContext* ctx = nullptr;
bool found, busy;
std::string batchId = _request->value("batchId", found);
if (found) {
ctx = _manager->find(StringUtils::uint64(batchId), busy);
}
if (!found || busy || ctx == nullptr) {
generateError(rest::ResponseCode::NOT_FOUND,
TRI_ERROR_CURSOR_NOT_FOUND,
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
"batchId not specified");
}
@ -601,10 +700,9 @@ void RocksDBRestReplicationHandler::handleCommandInventory() {
}
std::pair<RocksDBReplicationResult, std::shared_ptr<VPackBuilder>> result =
ctx->getInventory(this->_vocbase, includeSystem);
ctx->getInventory(this->_vocbase, includeSystem);
if (!result.first.ok()) {
generateError(rest::ResponseCode::BAD,
result.first.errorNumber(),
generateError(rest::ResponseCode::BAD, result.first.errorNumber(),
"inventory could not be created");
}
@ -620,14 +718,14 @@ void RocksDBRestReplicationHandler::handleCommandInventory() {
// "state"
builder.add("state", VPackValue(VPackValueType::Object));
//MMFilesLogfileManagerState const s =
//MMFilesLogfileManager::instance()->state();
// MMFilesLogfileManagerState const s =
// MMFilesLogfileManager::instance()->state();
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(ctx->lastTick())));
builder.add("lastUncommittedLogTick",
VPackValue(std::to_string(0)));//s.lastAssignedTick
builder.add("totalEvents", VPackValue(0));// s.numEvents + s.numEventsSync
VPackValue(std::to_string(0))); // s.lastAssignedTick
builder.add("totalEvents", VPackValue(0)); // s.numEvents + s.numEventsSync
builder.add("time", VPackValue(utilities::timeString()));
builder.close(); // state
@ -656,7 +754,7 @@ void RocksDBRestReplicationHandler::handleCommandClusterInventory() {
ClusterInfo* ci = ClusterInfo::instance();
std::vector<std::shared_ptr<LogicalCollection>> cols =
ci->getCollections(dbName);
ci->getCollections(dbName);
VPackBuilder resultBuilder;
resultBuilder.openObject();
@ -741,7 +839,6 @@ void RocksDBRestReplicationHandler::handleCommandRemoveKeys() {
////////////////////////////////////////////////////////////////////////////////
void RocksDBRestReplicationHandler::handleCommandDump() {
bool found = false;
uint64_t contextId = 0;
@ -765,19 +862,18 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
// acquire context
bool isBusy = false;
RocksDBReplicationContext* context = _manager->find(contextId, isBusy);
if (context == nullptr || isBusy){
if (context == nullptr || isBusy) {
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_BAD_PARAMETER,
"replication dump - unable to acquire context");
}
// print request
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "requested collection dump for collection '" << collection
<< "' using contextId '" << context->id() << "'";
// TODO needs to generalized || velocypacks needs to support multiple slices per response!
// TODO needs to generalized || velocypacks needs to support multiple slices
// per response!
auto response = dynamic_cast<HttpResponse*>(_response.get());
StringBuffer& dump = response->body();
@ -794,15 +890,15 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
} else {
resetResponse(rest::ResponseCode::OK);
response->setContentType(rest::ContentType::DUMP);
// set headers
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
(context->more() ? "true" : "false"));
// set headers
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
(context->more() ? "true" : "false"));
//_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTINCLUDED,
// StringUtils::itoa(dump._lastFoundTick));
//_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTINCLUDED,
// StringUtils::itoa(dump._lastFoundTick));
}
_manager->release(context); //release context when done
_manager->release(context); // release context when done
}
////////////////////////////////////////////////////////////////////////////////