1
0
Fork 0

add part of sliceify in replication-dump

This commit is contained in:
Jan Christoph Uhde 2016-09-06 19:05:49 +02:00
parent cea8110c2e
commit f195b09783
2 changed files with 161 additions and 8 deletions

View File

@ -24,6 +24,7 @@
#include "replication-dump.h"
#include "Basics/ReadLocker.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringRef.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "Logger/Logger.h"
#include "VocBase/CompactionLocker.h"
@ -320,6 +321,141 @@ static int StringifyMarker(TRI_replication_dump_t* dump,
return TRI_ERROR_NO_ERROR;
}
static int SliceifyMarker(TRI_replication_dump_t* dump,
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
TRI_df_marker_t const* marker, bool isDump,
bool withTicks, bool isEdgeCollection) {
TRI_ASSERT(MustReplicateWalMarkerType(marker));
TRI_df_marker_type_t const type = marker->getType();
VPackBuilder builder(&dump->_vpackOptions);
if (!isDump) {
builder.openObject();
// logger-follow command
builder.add("tick", VPackValue(static_cast<uint64_t>(marker->getTick())));
if (dump->_compat28 && (type == TRI_DF_MARKER_VPACK_DOCUMENT ||
type == TRI_DF_MARKER_VPACK_REMOVE)) {
// 2.8-compatible format
VPackSlice slice(reinterpret_cast<char const*>(marker) +
DatafileHelper::VPackOffset(type));
// additionally dump "key" and "rev" attributes on the top-level
builder.add("key", slice.get(StaticStrings::KeyString));
if (slice.hasKey(StaticStrings::RevString)) {
builder.add("rev", slice.get(StaticStrings::RevString));
}
// convert 2300 markers to 2301 markers for edges
if (type == TRI_DF_MARKER_VPACK_DOCUMENT && isEdgeCollection) {
builder.add("type", VPackValue(2301));
} else {
builder.add("type",
VPackValue(static_cast<uint64_t>(TranslateType(marker))));
}
} else {
// 3.0 dump
builder.add("type",
VPackValue(static_cast<uint64_t>(TranslateType(marker))));
}
if (type == TRI_DF_MARKER_VPACK_DOCUMENT ||
type == TRI_DF_MARKER_VPACK_REMOVE ||
type == TRI_DF_MARKER_VPACK_BEGIN_TRANSACTION ||
type == TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION ||
type == TRI_DF_MARKER_VPACK_ABORT_TRANSACTION) {
// transaction id
builder.add("tid", VPackValue(DatafileHelper::TransactionId(marker)));
}
if (databaseId > 0) {
builder.add("database", VPackValue(databaseId));
if (collectionId > 0) {
builder.add("cid", VPackValue(collectionId));
// also include collection name
char const* cname = NameFromCid(dump, collectionId);
if (cname != nullptr) {
builder.add("cname", VPackValue(cname));
}
}
}
} else {
// collection dump
if (withTicks) {
Append(dump, "{\"tick\":\"");
Append(dump, static_cast<uint64_t>(marker->getTick()));
Append(dump, "\",");
} else {
Append(dump, "{");
}
if (dump->_compat28 && (type == TRI_DF_MARKER_VPACK_DOCUMENT ||
type == TRI_DF_MARKER_VPACK_REMOVE)) {
// 2.8-compatible format
VPackSlice slice(reinterpret_cast<char const*>(marker) +
DatafileHelper::VPackOffset(type));
arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer);
VPackDumper dumper(
&adapter,
&dump->_vpackOptions); // note: we need the CustomTypeHandler here
// additionally dump "key" and "rev" attributes on the top-level
Append(dump, "\"key\":");
dumper.dump(slice.get(StaticStrings::KeyString));
if (slice.hasKey(StaticStrings::RevString)) {
Append(dump, ",\"rev\":");
dumper.dump(slice.get(StaticStrings::RevString));
}
// convert 2300 markers to 2301 markers for edges
Append(dump, ",\"type\":");
if (type == TRI_DF_MARKER_VPACK_DOCUMENT && isEdgeCollection) {
Append(dump, 2301);
} else {
Append(dump, static_cast<uint64_t>(TranslateType(marker)));
}
} else {
Append(dump, "\"type\":");
Append(dump, static_cast<uint64_t>(TranslateType(marker)));
}
}
switch (type) {
case TRI_DF_MARKER_VPACK_DOCUMENT:
case TRI_DF_MARKER_VPACK_REMOVE:
case TRI_DF_MARKER_VPACK_CREATE_DATABASE:
case TRI_DF_MARKER_VPACK_CREATE_COLLECTION:
case TRI_DF_MARKER_VPACK_CREATE_INDEX:
case TRI_DF_MARKER_VPACK_RENAME_COLLECTION:
case TRI_DF_MARKER_VPACK_CHANGE_COLLECTION:
case TRI_DF_MARKER_VPACK_DROP_DATABASE:
case TRI_DF_MARKER_VPACK_DROP_COLLECTION:
case TRI_DF_MARKER_VPACK_DROP_INDEX: {
Append(dump, ",\"data\":");
VPackSlice slice(reinterpret_cast<char const*>(marker) +
DatafileHelper::VPackOffset(type));
arangodb::basics::VPackStringBufferAdapter adapter(dump->_buffer);
VPackDumper dumper(
&adapter,
&dump->_vpackOptions); // note: we need the CustomTypeHandler here
dumper.dump(slice);
break;
}
case TRI_DF_MARKER_VPACK_BEGIN_TRANSACTION:
case TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION:
case TRI_DF_MARKER_VPACK_ABORT_TRANSACTION: {
// nothing to do
break;
}
default: {
TRI_ASSERT(false);
return TRI_ERROR_INTERNAL;
}
}
Append(dump, "}\n");
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a marker belongs to a transaction
////////////////////////////////////////////////////////////////////////////////
@ -399,7 +535,7 @@ static int DumpCollection(TRI_replication_dump_t* dump,
LogicalCollection* collection,
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
bool withTicks) {
bool withTicks, bool useVpp = false) {
LOG(TRACE) << "dumping collection " << collection->cid() << ", tick range "
<< dataMin << " - " << dataMax;
@ -410,18 +546,25 @@ static int DumpCollection(TRI_replication_dump_t* dump,
bool bufferFull = false;
auto callback = [&dump, &lastFoundTick, &databaseId, &collectionId,
&withTicks, &isEdgeCollection, &bufferFull](
&withTicks, &isEdgeCollection, &bufferFull, &useVpp](
TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker) {
// note the last tick we processed
lastFoundTick = foundTick;
int res = StringifyMarker(dump, databaseId, collectionId, marker, true,
int res;
if (useVpp) {
res = SliceifyMarker(dump, databaseId, collectionId, marker, true,
withTicks, isEdgeCollection);
} else {
res = StringifyMarker(dump, databaseId, collectionId, marker, true,
withTicks, isEdgeCollection);
}
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
// TODO if vppcase find out slice lenght of _slices.back()
if (static_cast<uint64_t>(TRI_LengthStringBuffer(dump->_buffer)) >
dump->_chunkSize) {
// abort the iteration

View File

@ -32,7 +32,13 @@
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
#include <velocypack/Builder.h>
#include <velocypack/Dumper.h>
#include <velocypack/Iterator.h>
#include <velocypack/Options.h>
#include <velocypack/velocypack-aliases.h>
#include <vector>
////////////////////////////////////////////////////////////////////////////////
/// @brief replication dump container
@ -56,18 +62,21 @@ struct TRI_replication_dump_t {
_hasMore(false),
_includeSystem(includeSystem),
_fromTickIncluded(false),
_compat28(false) {
_compat28(false),
_slices() {
if (_chunkSize == 0) {
// default chunk size
_chunkSize = 128 * 1024;
}
if (!useVpp) {
_buffer = TRI_CreateSizedStringBuffer(TRI_UNKNOWN_MEM_ZONE, _chunkSize);
if (_buffer == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
}
}
~TRI_replication_dump_t() {
if (_buffer != nullptr) {
@ -90,6 +99,7 @@ struct TRI_replication_dump_t {
bool _includeSystem;
bool _fromTickIncluded;
bool _compat28;
std::vector<VPackBuffer<uint64_t>> _slices;
};
////////////////////////////////////////////////////////////////////////////////