1
0
Fork 0

Fixed broken JSON output

This commit is contained in:
Simon Grätzer 2017-04-28 14:28:34 +02:00
parent 1d22f7bb61
commit 0a00a7eb67
7 changed files with 190 additions and 133 deletions

View File

@ -110,29 +110,9 @@ arangodb::Result RocksDBCollection::updateProperties(VPackSlice const& slice,
}
arangodb::Result RocksDBCollection::persistProperties() {
Result res;
try {
VPackBuilder infoBuilder = _logicalCollection->toVelocyPackIgnore(
{"path", "statusString"}, true, true);
RocksDBKey key(RocksDBKey::Collection(_logicalCollection->vocbase()->id(),
_logicalCollection->cid()));
RocksDBValue value(RocksDBValue::Document(infoBuilder.slice()));
res = globalRocksDBPut(key.string(), value.string());
} catch (arangodb::basics::Exception const& ex) {
res.reset(ex.code());
} catch (...) {
res.reset(TRI_ERROR_INTERNAL);
}
if (res.fail()) {
// TODO: what to do here
LOG_TOPIC(ERR, arangodb::Logger::ENGINES)
<< "could not save collection change marker in log: "
<< res.errorMessage();
}
return res;
// only code path calling this causes these properties to be
// already written in RocksDBEngine::changeCollection()
return arangodb::Result{};
}
PhysicalCollection* RocksDBCollection::clone(LogicalCollection* logical,

View File

@ -963,7 +963,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase,
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(lastTick)));
builder.add("lastUncommittedLogTick", VPackValue(std::to_string(lastTick)));
builder.add("totalEvents", VPackValue(0)); // s.numEvents + s.numEventsSync
builder.add("totalEvents", VPackValue(lastTick)); // s.numEvents + s.numEventsSync
builder.add("time", VPackValue(utilities::timeString()));
builder.close();

View File

@ -45,8 +45,8 @@ static TRI_replication_operation_e convertLogType(RocksDBLogType t) {
// return REPLICATION_MARKER_DOCUMENT;
// case TRI_DF_MARKER_VPACK_REMOVE:
// return REPLICATION_MARKER_REMOVE;
case RocksDBLogType::BeginTransaction:
return REPLICATION_TRANSACTION_START;
//case RocksDBLogType::BeginTransaction:
// return REPLICATION_TRANSACTION_START;
// case TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION:
// return REPLICATION_TRANSACTION_COMMIT;
// case TRI_DF_MARKER_VPACK_ABORT_TRANSACTION:
@ -124,8 +124,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_INDEX_CREATE)));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("cid", VPackValue(_currentCollectionId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("data", indexSlice);
_builder.close();
break;
@ -138,8 +138,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_INDEX_CREATE)));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("cid", VPackValue(_currentCollectionId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("data", VPackValue(VPackValueType::Object));
_builder.add("id", VPackValue(std::to_string(iid)));
_builder.close();
@ -164,12 +164,12 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_currentTrxId = RocksDBLogValue::transactionId(blob);
_builder.openObject();
_builder.add("tick", VPackValue(_currentSequence));
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_START)));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("tid", VPackValue(_currentTrxId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.close();
break;
}
@ -205,15 +205,20 @@ class WALParser : public rocksdb::WriteBatch::Handler {
int res = TRI_ERROR_NO_ERROR;
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::Collection: {
usleep(1000000);
usleep(1000000);
TRI_ASSERT(_lastLogType == RocksDBLogType::CollectionCreate ||
_lastLogType == RocksDBLogType::CollectionChange ||
_lastLogType == RocksDBLogType::CollectionRename);
_lastLogType == RocksDBLogType::CollectionRename ||
_lastLogType == RocksDBLogType::IndexCreate ||
_lastLogType == RocksDBLogType::IndexDrop);
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
_builder.openObject();
_builder.add("tick", VPackValue(_currentSequence));
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add("type", VPackValue(convertLogType(_lastLogType)));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("cid", VPackValue(_currentCollectionId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_lastLogType == RocksDBLogType::CollectionRename) {
VPackSlice collectionData(value.data());
@ -236,12 +241,12 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.openObject();
_builder.add("type", VPackValue(REPLICATION_MARKER_DOCUMENT));
// auto containers = getContainerIds(key);
_builder.add("database", VPackValue(_currentDbId));
_builder.add("cid", VPackValue(_currentCollectionId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_singleOpTransaction) { // single op is defined to 0
_builder.add("tid", VPackValue(0));
_builder.add("tid", VPackValue("0"));
} else {
_builder.add("tid", VPackValue(_currentTrxId));
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
}
_builder.add("data", RocksDBValue::data(value));
_builder.close();
@ -272,12 +277,12 @@ class WALParser : public rocksdb::WriteBatch::Handler {
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
_builder.openObject();
_builder.add("tick", VPackValue(_currentSequence));
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add("type", VPackValue(REPLICATION_INDEX_DROP));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("cid", VPackValue(_currentCollectionId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("data", VPackValue(VPackValueType::Object));
_builder.add("id", VPackValue(_currentCollectionId));
_builder.add("id", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("name", VPackValue("")); // not used at all
_builder.close();
_builder.close();
@ -294,8 +299,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("cid", VPackValue(_currentCollectionId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_singleOpTransaction) { // single op is defined to 0
_builder.add("tid", VPackValue(0));
} else {
@ -333,12 +338,12 @@ class WALParser : public rocksdb::WriteBatch::Handler {
void endBatch() {
if (_seenBeginTransaction) {
_builder.openObject();
_builder.add("tick", VPackValue(_currentSequence));
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_COMMIT)));
_builder.add("database", VPackValue(_currentDbId));
_builder.add("tid", VPackValue(_currentTrxId));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.close();
}
_seenBeginTransaction = false;

View File

@ -27,6 +27,7 @@
#include "Basics/ConditionLocker.h"
#include "Basics/ReadLocker.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "Basics/conversions.h"
#include "Basics/files.h"
#include "Cluster/ClusterComm.h"
@ -617,8 +618,11 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
if (found) {
limit = static_cast<size_t>(StringUtils::uint64(value5));
}
VPackBuilder builder;
std::shared_ptr<transaction::Context> transactionContext =
transaction::StandaloneContext::Create(_vocbase);
VPackBuilder builder(transactionContext->getVPackOptions());
builder.openArray();
auto result = tailWal(_vocbase, tickStart, limit, includeSystem, builder);
builder.close();
@ -654,10 +658,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
if (length > 0) {
if (useVpp) {
auto iter = arangodb::velocypack::ArrayIterator(data);
auto opts = arangodb::velocypack::Options::Defaults;
for (auto message : iter) {
_response->addPayload(VPackSlice(message), &opts, true);
for (auto message : arangodb::velocypack::ArrayIterator(data)) {
_response->addPayload(VPackSlice(message),
transactionContext->getVPackOptions(), true);
}
} else {
HttpResponse* httpResponse =
@ -668,8 +671,13 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
"invalid response type");
}
if (length > 0) {
httpResponse->body().appendText(data.toJson());
basics::StringBuffer& buffer = httpResponse->body();
arangodb::basics::VPackStringBufferAdapter adapter(buffer.stringBuffer());
VPackDumper dumper(&adapter,
transactionContext->getVPackOptions()); // note: we need the CustomTypeHandler here
for (auto marker : arangodb::velocypack::ArrayIterator(data)) {
dumper.dump(marker);
httpResponse->body().appendChar('\n');
}
}
// add client
@ -756,8 +764,8 @@ void RocksDBRestReplicationHandler::handleCommandInventory() {
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(ctx->lastTick())));
builder.add("lastUncommittedLogTick",
VPackValue(std::to_string(0))); // s.lastAssignedTick
builder.add("totalEvents", VPackValue(0)); // s.numEvents + s.numEventsSync
VPackValue(std::to_string(ctx->lastTick()))); // s.lastAssignedTick
builder.add("totalEvents", VPackValue(ctx->lastTick())); // s.numEvents + s.numEventsSync
builder.add("time", VPackValue(utilities::timeString()));
builder.close(); // state

View File

@ -40,7 +40,7 @@
#include "MMFiles/mmfiles-replication-dump.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBReplicationTailing.h"
#include <velocypack/Builder.h>
#include <velocypack/Parser.h>
#include <velocypack/Slice.h>
@ -65,14 +65,14 @@ static void JS_StateLoggerReplication(
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
v8::Handle<v8::Object> result = v8::Object::New(isolate);
if(engineName == "mmfiles"){
v8::Handle<v8::Object> state = v8::Object::New(isolate);
v8::Handle<v8::Object> state = v8::Object::New(isolate);
MMFilesLogfileManagerState const s = MMFilesLogfileManager::instance()->state();
state->Set(TRI_V8_ASCII_STRING("running"), v8::True(isolate));
state->Set(TRI_V8_ASCII_STRING("lastLogTick"),
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastCommittedTick));
state->Set(TRI_V8_ASCII_STRING("lastUncommittedLogTick"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastAssignedTick));
state->Set(TRI_V8_ASCII_STRING("lastUncommittedLogTick"),
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastAssignedTick));
state->Set(TRI_V8_ASCII_STRING("totalEvents"),
v8::Number::New(isolate, static_cast<double>(s.numEvents + s.numEventsSync)));
state->Set(TRI_V8_ASCII_STRING("time"), TRI_V8_STD_STRING(s.timeString));
@ -92,13 +92,11 @@ static void JS_StateLoggerReplication(
auto res = rocksutils::globalRocksEngine()->createLoggerState(nullptr,builder);
if(res.fail()){
TRI_V8_THROW_EXCEPTION(res);
return;
}
v8::Handle<v8::Value>resultValue = TRI_VPackToV8(isolate, builder.slice());
result = v8::Handle<v8::Object>::Cast(resultValue);
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
return;
}
TRI_V8_RETURN(result);
@ -113,29 +111,62 @@ static void JS_TickRangesLoggerReplication(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
v8::Handle<v8::Array> result;
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if( engineName != "mmfiles"){
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "only implemented for mmfiles engine");
return;
if(engineName == "mmfiles"){
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
result = v8::Array::New(isolate, (int)ranges.size());
uint32_t i = 0;
for (auto& it : ranges) {
v8::Handle<v8::Object> df = v8::Object::New(isolate);
df->ForceSet(TRI_V8_ASCII_STRING("datafile"), TRI_V8_STD_STRING(it.filename));
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STD_STRING(it.state));
df->ForceSet(TRI_V8_ASCII_STRING("tickMin"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, it.tickMin));
df->ForceSet(TRI_V8_ASCII_STRING("tickMax"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, it.tickMax));
result->Set(i++, df);
}
} else if (engineName == "rocksdb") {
rocksdb::TransactionDB *tdb = rocksutils::globalRocksDB();
rocksdb::VectorLogPtr walFiles;
rocksdb::Status s = tdb->GetSortedWalFiles(walFiles);
if (!s.ok()) {
Result r = rocksutils::convertStatus(s);
TRI_V8_THROW_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
result = v8::Array::New(isolate, (int)walFiles.size());
for(uint32_t i = 0; i < walFiles.size(); i++) {
std::unique_ptr<rocksdb::LogFile>& logfile = walFiles[i];
v8::Handle<v8::Object> df = v8::Object::New(isolate);
df->ForceSet(TRI_V8_ASCII_STRING("datafile"), TRI_V8_STD_STRING(logfile->PathName()));
// setting state of each file
if (logfile->Type() == rocksdb::WalFileType::kAliveLogFile) {
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STRING("open"));
} else if (logfile->Type() == rocksdb::WalFileType::kArchivedLogFile) {
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STRING("collected"));
}
rocksdb::SequenceNumber min = logfile->StartSequence();
df->ForceSet(TRI_V8_ASCII_STRING("tickMin"),
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, min));
rocksdb::SequenceNumber max = UINT64_MAX;
if (i+1 < walFiles.size()) {
max = walFiles[i+1]->StartSequence();
}
df->ForceSet(TRI_V8_ASCII_STRING("tickMax"),
TRI_V8UInt64String<rocksdb::SequenceNumber>(isolate, max));
result->Set(i++, df);
}
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
}
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
v8::Handle<v8::Array> result = v8::Array::New(isolate, (int)ranges.size());
uint32_t i = 0;
for (auto& it : ranges) {
v8::Handle<v8::Object> df = v8::Object::New(isolate);
df->ForceSet(TRI_V8_ASCII_STRING("datafile"), TRI_V8_STD_STRING(it.filename));
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STD_STRING(it.state));
df->ForceSet(TRI_V8_ASCII_STRING("tickMin"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, it.tickMin));
df->ForceSet(TRI_V8_ASCII_STRING("tickMax"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, it.tickMax));
result->Set(i++, df);
}
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
}
@ -149,30 +180,40 @@ static void JS_FirstTickLoggerReplication(
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if( engineName != "mmfiles"){
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "only implemented for mmfiles engine");
return;
}
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
TRI_voc_tick_t tick = UINT64_MAX;
for (auto& it : ranges) {
if (it.tickMin == 0) {
continue;
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if(engineName == "mmfiles"){
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
for (auto& it : ranges) {
if (it.tickMin == 0) {
continue;
}
if (it.tickMin < tick) {
tick = it.tickMin;
}
}
if (it.tickMin < tick) {
tick = it.tickMin;
} else if (engineName == "rocksdb") {
rocksdb::TransactionDB *tdb = rocksutils::globalRocksDB();
rocksdb::VectorLogPtr walFiles;
rocksdb::Status s = tdb->GetSortedWalFiles(walFiles);
if (!s.ok()) {
Result r = rocksutils::convertStatus(s);
TRI_V8_THROW_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
// read minium possible tick
if (!walFiles.empty()) {
tick = walFiles[0]->StartSequence();
}
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
}
if (tick == UINT64_MAX) {
TRI_V8_RETURN(v8::Null(isolate));
}
TRI_V8_RETURN(TRI_V8UInt64String<TRI_voc_tick_t>(isolate, tick));
TRI_V8_TRY_CATCH_END
}
@ -185,44 +226,62 @@ static void JS_LastLoggerReplication(
v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if( engineName != "mmfiles"){
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "only implemented for mmfiles engine");
return;
}
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (args.Length() != 2) {
TRI_V8_THROW_EXCEPTION_USAGE(
"REPLICATION_LOGGER_LAST(<fromTick>, <toTick>)");
"REPLICATION_LOGGER_LAST(<fromTick>, <toTick>)");
}
auto transactionContext = std::make_shared<transaction::StandaloneContext>(vocbase);
MMFilesReplicationDumpContext dump(transactionContext, 0, true, 0);
TRI_voc_tick_t tickStart = TRI_ObjectToUInt64(args[0], true);
TRI_voc_tick_t tickEnd = TRI_ObjectToUInt64(args[1], true);
v8::Handle<v8::Value> result;
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if(engineName == "mmfiles"){
auto transactionContext = std::make_shared<transaction::StandaloneContext>(vocbase);
MMFilesReplicationDumpContext dump(transactionContext, 0, true, 0);
int res = MMFilesDumpLogReplication(&dump, std::unordered_set<TRI_voc_tid_t>(),
0, tickStart, tickEnd, true);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
// parsing JSON
VPackParser parser;
parser.parse(dump._buffer->_buffer);
result = TRI_VPackToV8(isolate, VPackSlice(parser.start()));
int res = MMFilesDumpLogReplication(&dump, std::unordered_set<TRI_voc_tid_t>(),
0, tickStart, tickEnd, true);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
} else if (engineName == "rocksdb") {
bool includeSystem = true;
size_t limit = 10000; // TODO: determine good default value?
// construct vocbase with proper handler
std::shared_ptr<transaction::Context> transactionContext =
transaction::StandaloneContext::Create(vocbase);
VPackBuilder builder(transactionContext->getVPackOptions());
builder.openArray();
RocksDBReplicationResult rep = rocksutils::tailWal(vocbase, tickStart, limit,
includeSystem, builder);
builder.close();
if (rep.ok()) {
result = TRI_VPackToV8(isolate, builder.slice());
} else {
result = v8::Null(isolate);
}
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
}
VPackParser parser;
parser.parse(dump._buffer->_buffer);
std::shared_ptr<VPackBuilder> builder = parser.steal();
v8::Handle<v8::Value> result = TRI_VPackToV8(isolate, builder->slice());
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
}

View File

@ -19,6 +19,3 @@ threads = 20
[ssl]
keyfile = @TOP_DIR@/UnitTests/server.pem
[cluster]
system-replication-factor = 1

View File

@ -262,6 +262,10 @@ function ReplicationLoggerSuite () {
////////////////////////////////////////////////////////////////////////////////
testFirstTick : function () {
if (db._engine().name === "mmfiles") {
return;
}
var state = replication.logger.state().state;
assertTrue(state.running);
var tick = state.lastLogTick;
@ -279,6 +283,10 @@ function ReplicationLoggerSuite () {
////////////////////////////////////////////////////////////////////////////////
testTickRanges : function () {
if (db._engine().name === "mmfiles") {
return;
}
var state = replication.logger.state().state;
assertTrue(state.running);
var tick = state.lastLogTick;