1
0
Fork 0
arangodb/arangod/VocBase/replication-dump.cpp

1298 lines
44 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// @brief replication dump functions
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "replication-dump.h"
#include "BasicsC/conversions.h"
#include "BasicsC/files.h"
#include "BasicsC/json.h"
#include "BasicsC/logging.h"
#include "BasicsC/string-buffer.h"
#include "BasicsC/tri-strings.h"
#include "VocBase/collection.h"
#include "VocBase/datafile.h"
#include "VocBase/document-collection.h"
#include "VocBase/transaction.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-shaper.h"
#include "Utils/Exception.h"
#include "Utils/transactions.h"
#include "Wal/Logfile.h"
#include "Wal/LogfileManager.h"
#include "Wal/Marker.h"
using namespace triagens;
// -----------------------------------------------------------------------------
// --SECTION-- REPLICATION
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- private defines
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief shortcut function
////////////////////////////////////////////////////////////////////////////////
#define FAIL_IFNOT(func, buffer, val) \
if (func(buffer, val) != TRI_ERROR_NO_ERROR) { \
return false; \
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a string-buffer function name
////////////////////////////////////////////////////////////////////////////////
#define APPEND_FUNC(name) TRI_ ## name ## StringBuffer
////////////////////////////////////////////////////////////////////////////////
/// @brief append a character to a string-buffer or fail
////////////////////////////////////////////////////////////////////////////////
#define APPEND_CHAR(buffer, c) FAIL_IFNOT(APPEND_FUNC(AppendChar), buffer, c)
////////////////////////////////////////////////////////////////////////////////
/// @brief append a string to a string-buffer or fail
////////////////////////////////////////////////////////////////////////////////
#define APPEND_STRING(buffer, str) FAIL_IFNOT(APPEND_FUNC(AppendString), buffer, str)
////////////////////////////////////////////////////////////////////////////////
/// @brief append uint64 to a string-buffer or fail
////////////////////////////////////////////////////////////////////////////////
#define APPEND_UINT64(buffer, val) FAIL_IFNOT(APPEND_FUNC(AppendUInt64), buffer, val)
// -----------------------------------------------------------------------------
// --SECTION-- private types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief a datafile descriptor
////////////////////////////////////////////////////////////////////////////////
typedef struct df_entry_s {
TRI_datafile_t* _data;
TRI_voc_tick_t _dataMin;
TRI_voc_tick_t _dataMax;
TRI_voc_tick_t _tickMax;
bool _isJournal;
}
df_entry_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief container for a resolved collection name (cid => name)
////////////////////////////////////////////////////////////////////////////////
typedef struct resolved_name_s {
TRI_voc_cid_t _cid;
char* _name;
}
resolved_name_t;
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief hashes a collection id
////////////////////////////////////////////////////////////////////////////////
static uint64_t HashKeyCid (TRI_associative_pointer_t* array,
void const* key) {
TRI_voc_cid_t const* k = static_cast<TRI_voc_cid_t const*>(key);
return *k;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief hashes a collection name
////////////////////////////////////////////////////////////////////////////////
static uint64_t HashElementCid (TRI_associative_pointer_t* array,
void const* element) {
resolved_name_t const* e = static_cast<resolved_name_t const*>(element);
return e->_cid;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief compares a collection
////////////////////////////////////////////////////////////////////////////////
static bool IsEqualKeyElementCid (TRI_associative_pointer_t* array,
void const* key,
void const* element) {
TRI_voc_cid_t const* k = static_cast<TRI_voc_cid_t const*>(key);
resolved_name_t const* e = static_cast<resolved_name_t const*>(element);
return *k == e->_cid;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lookup a collection name
////////////////////////////////////////////////////////////////////////////////
static bool LookupCollectionName (TRI_replication_dump_t* dump,
TRI_voc_cid_t cid,
char** result) {
TRI_ASSERT(cid > 0);
resolved_name_t* found = static_cast<resolved_name_t*>(TRI_LookupByKeyAssociativePointer(&dump->_collectionNames, &cid));
if (found == NULL) {
found = static_cast<resolved_name_t*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(resolved_name_t), false));
if (found == NULL) {
// out of memory;
return false;
}
found->_cid = cid;
// name can be NULL if collection is not found.
// but we will still cache a NULL result!
found->_name = TRI_GetCollectionNameByIdVocBase(dump->_vocbase, cid);
TRI_InsertKeyAssociativePointer(&dump->_collectionNames, &found->_cid, found, false);
}
*result = found->_name;
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief append a collection name or id to a string buffer
////////////////////////////////////////////////////////////////////////////////
static bool AppendCollection (TRI_replication_dump_t* dump,
TRI_voc_cid_t cid,
bool translateCollectionIds) {
if (translateCollectionIds) {
if (cid > 0) {
char* name;
if (! LookupCollectionName(dump, cid, &name)) {
return false;
}
if (name != NULL) {
APPEND_STRING(dump->_buffer, name);
return true;
}
}
APPEND_STRING(dump->_buffer, "_unknown");
}
else {
APPEND_UINT64(dump->_buffer, (uint64_t) cid);
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief iterate over a vector of datafiles and pick those with a specific
/// data range
////////////////////////////////////////////////////////////////////////////////
static int IterateDatafiles (TRI_vector_pointer_t const* datafiles,
TRI_vector_t* result,
TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax,
bool isJournal) {
int res = TRI_ERROR_NO_ERROR;
size_t const n = datafiles->_length;
for (size_t i = 0; i < n; ++i) {
TRI_datafile_t* df = static_cast<TRI_datafile_t*>(TRI_AtVectorPointer(datafiles, i));
df_entry_t entry = {
df,
df->_dataMin,
df->_dataMax,
df->_tickMax,
isJournal
};
LOG_TRACE("checking datafile %llu with data range %llu - %llu, tick max: %llu",
(unsigned long long) df->_fid,
(unsigned long long) df->_dataMin,
(unsigned long long) df->_dataMax,
(unsigned long long) df->_tickMax);
if (df->_dataMin == 0 || df->_dataMax == 0) {
// datafile doesn't have any data
continue;
}
TRI_ASSERT(df->_tickMin <= df->_tickMax);
TRI_ASSERT(df->_dataMin <= df->_dataMax);
if (dataMax < df->_dataMin) {
// datafile is newer than requested range
continue;
}
if (dataMin > df->_dataMax) {
// datafile is older than requested range
continue;
}
res = TRI_PushBackVector(result, &entry);
if (res != TRI_ERROR_NO_ERROR) {
break;
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the datafiles of a collection for a specific tick range
////////////////////////////////////////////////////////////////////////////////
static TRI_vector_t GetRangeDatafiles (TRI_document_collection_t* document,
TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax) {
TRI_vector_t datafiles;
LOG_TRACE("getting datafiles in data range %llu - %llu",
(unsigned long long) dataMin,
(unsigned long long) dataMax);
// determine the datafiles of the collection
TRI_InitVector(&datafiles, TRI_CORE_MEM_ZONE, sizeof(df_entry_t));
TRI_READ_LOCK_DATAFILES_DOC_COLLECTION(document);
IterateDatafiles(&document->_datafiles, &datafiles, dataMin, dataMax, false);
IterateDatafiles(&document->_journals, &datafiles, dataMin, dataMax, true);
TRI_READ_UNLOCK_DATAFILES_DOC_COLLECTION(document);
return datafiles;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a raw marker from a datafile for a collection dump
////////////////////////////////////////////////////////////////////////////////
static bool StringifyMarkerDump (TRI_replication_dump_t* dump,
TRI_document_collection_t* document,
TRI_df_marker_t const* marker,
bool withTicks,
bool translateCollectionIds) {
// This covers two cases:
// 1. document is not nullptr and marker points into a data file
// 2. document is a nullptr and marker points into a WAL file
// no other combinations are allowed.
TRI_string_buffer_t* buffer;
TRI_replication_operation_e type;
TRI_voc_key_t key;
TRI_voc_rid_t rid;
bool haveData = true;
bool isWal = false;
buffer = dump->_buffer;
if (buffer == NULL) {
return false;
}
switch (marker->_type) {
case TRI_DOC_MARKER_KEY_DELETION: {
TRI_ASSERT(nullptr != document);
auto m = reinterpret_cast<TRI_doc_deletion_key_marker_t const*>(marker);
key = ((char*) m) + m->_offsetKey;
type = MARKER_REMOVE;
rid = m->_rid;
haveData = false;
break;
}
case TRI_DOC_MARKER_KEY_DOCUMENT: {
TRI_ASSERT(nullptr != document);
auto m = reinterpret_cast<TRI_doc_document_key_marker_t const*>(marker);
key = ((char*) m) + m->_offsetKey;
type = MARKER_DOCUMENT;
rid = m->_rid;
break;
}
case TRI_DOC_MARKER_KEY_EDGE: {
TRI_ASSERT(nullptr != document);
auto m = reinterpret_cast<TRI_doc_document_key_marker_t const*>(marker);
key = ((char*) m) + m->_offsetKey;
type = MARKER_EDGE;
rid = m->_rid;
break;
}
case TRI_WAL_MARKER_REMOVE: {
TRI_ASSERT(nullptr == document);
auto m = static_cast<wal::remove_marker_t const*>(marker);
key = ((char*) m) + sizeof(wal::remove_marker_t);
type = MARKER_REMOVE;
rid = m->_revisionId;
haveData = false;
isWal = true;
break;
}
case TRI_WAL_MARKER_DOCUMENT: {
TRI_ASSERT(nullptr == document);
auto m = static_cast<wal::document_marker_t const*>(marker);
key = ((char*) m) + m->_offsetKey;
type = MARKER_DOCUMENT;
rid = m->_revisionId;
isWal = true;
break;
}
case TRI_WAL_MARKER_EDGE: {
TRI_ASSERT(nullptr == document);
auto m = static_cast<wal::edge_marker_t const*>(marker);
key = ((char*) m) + m->_offsetKey;
type = MARKER_EDGE;
rid = m->_revisionId;
isWal = true;
break;
}
default: {
return false;
}
}
if (withTicks) {
APPEND_STRING(buffer, "{\"tick\":\"");
APPEND_UINT64(buffer, (uint64_t) marker->_tick);
APPEND_STRING(buffer, "\",\"type\":");
}
else {
APPEND_STRING(buffer, "{\"type\":");
}
APPEND_UINT64(buffer, (uint64_t) type);
APPEND_STRING(buffer, ",\"key\":\"");
// key is user-defined, but does not need escaping
APPEND_STRING(buffer, key);
APPEND_STRING(buffer, "\",\"rev\":\"");
APPEND_UINT64(buffer, (uint64_t) rid);
// document
if (haveData) {
APPEND_STRING(buffer, "\",\"data\":{");
// common document meta-data
APPEND_STRING(buffer, "\"" TRI_VOC_ATTRIBUTE_KEY "\":\"");
APPEND_STRING(buffer, key);
APPEND_STRING(buffer, "\",\"" TRI_VOC_ATTRIBUTE_REV "\":\"");
APPEND_UINT64(buffer, (uint64_t) rid);
APPEND_CHAR(buffer, '"');
// Is it an edge marker?
if (marker->_type == TRI_DOC_MARKER_KEY_EDGE ||
marker->_type == TRI_WAL_MARKER_EDGE) {
TRI_voc_key_t fromKey;
TRI_voc_key_t toKey;
TRI_voc_cid_t fromCid;
TRI_voc_cid_t toCid;
if (marker->_type == TRI_DOC_MARKER_KEY_EDGE) {
auto e = reinterpret_cast<TRI_doc_edge_key_marker_t const*>(marker);
fromKey = ((char*) e) + e->_offsetFromKey;
toKey = ((char*) e) + e->_offsetToKey;
fromCid = e->_fromCid;
toCid = e->_toCid;
}
else { // TRI_WAL_MARKER_EDGE
auto e = reinterpret_cast<wal::edge_marker_t const*>(marker);
fromKey = ((char*) e) + e->_offsetFromKey;
toKey = ((char*) e) + e->_offsetToKey;
fromCid = e->_fromCid;
toCid = e->_toCid;
}
APPEND_STRING(buffer, ",\"" TRI_VOC_ATTRIBUTE_FROM "\":\"");
if (! AppendCollection(dump, fromCid, translateCollectionIds)) {
return false;
}
APPEND_STRING(buffer, "\\/");
APPEND_STRING(buffer, fromKey);
APPEND_STRING(buffer, "\",\"" TRI_VOC_ATTRIBUTE_TO "\":\"");
if (! AppendCollection(dump, toCid, translateCollectionIds)) {
return false;
}
APPEND_STRING(buffer, "\\/");
APPEND_STRING(buffer, toKey);
APPEND_CHAR(buffer, '"');
}
// the actual document data
TRI_shaped_json_t shaped;
if (! isWal) {
auto m = reinterpret_cast<TRI_doc_document_key_marker_t const*>(marker);
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m);
TRI_StringifyArrayShapedJson(document->getShaper(), buffer, &shaped, true); // ONLY IN DUMP, PROTECTED by fake trx above
}
else { // isWal == true
auto m = static_cast<wal::document_marker_t const*>(marker);
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m);
basics::LegendReader legendReader
(reinterpret_cast<char const*>(m) + m->_offsetLegend);
TRI_StringifyArrayShapedJson(&legendReader, buffer, &shaped, true);
}
APPEND_STRING(buffer, "}}\n");
}
else { // deletion marker, so no data
APPEND_STRING(buffer, "\"}\n");
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a document marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerDocument (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::document_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"tid\":\"");
APPEND_UINT64(dump->_buffer, m->_transactionId);
APPEND_STRING(dump->_buffer, "\",\"key\":\"");
APPEND_STRING(dump->_buffer, (char const*) m + m->_offsetKey);
APPEND_STRING(dump->_buffer, "\",\"rev\":\"");
APPEND_UINT64(dump->_buffer, m->_revisionId);
APPEND_STRING(dump->_buffer, "\",\"data\":{");
// common document meta-data
APPEND_STRING(dump->_buffer, "\"" TRI_VOC_ATTRIBUTE_KEY "\":\"");
APPEND_STRING(dump->_buffer, (char const*) m + m->_offsetKey);
APPEND_STRING(dump->_buffer, "\",\"" TRI_VOC_ATTRIBUTE_REV "\":\"");
APPEND_UINT64(dump->_buffer, (uint64_t) m->_revisionId);
APPEND_STRING(dump->_buffer, "\"");
TRI_shaped_json_t shaped;
shaped._sid = m->_shape;
shaped._data.length = m->_size - m->_offsetJson;
shaped._data.data = (char*) m + m->_offsetJson;
triagens::basics::LegendReader lr((char const*) m + m->_offsetLegend);
if (! TRI_StringifyArrayShapedJson(&lr, dump->_buffer, &shaped, false)) {
return false;
}
APPEND_STRING(dump->_buffer, "}");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify an edge marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerEdge (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::edge_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"tid\":\"");
APPEND_UINT64(dump->_buffer, m->_transactionId);
APPEND_STRING(dump->_buffer, "\",\"key\":\"");
APPEND_STRING(dump->_buffer, (char const*) m + m->_offsetKey);
APPEND_STRING(dump->_buffer, "\",\"rev\":\"");
APPEND_UINT64(dump->_buffer, m->_revisionId);
APPEND_STRING(dump->_buffer, "\",\"data\":{");
// common document meta-data
APPEND_STRING(dump->_buffer, "\"" TRI_VOC_ATTRIBUTE_KEY "\":\"");
APPEND_STRING(dump->_buffer, (char const*) m + m->_offsetKey);
APPEND_STRING(dump->_buffer, "\",\"" TRI_VOC_ATTRIBUTE_REV "\":\"");
APPEND_UINT64(dump->_buffer, (uint64_t) m->_revisionId);
// from
APPEND_STRING(dump->_buffer, ",\"" TRI_VOC_ATTRIBUTE_FROM "\":\"");
APPEND_UINT64(dump->_buffer, m->_fromCid);
APPEND_STRING(dump->_buffer, "\\/");
APPEND_STRING(dump->_buffer, (char const*) m + m->_offsetFromKey);
// to
APPEND_STRING(dump->_buffer, "\",\"" TRI_VOC_ATTRIBUTE_TO "\":\"");
APPEND_UINT64(dump->_buffer, m->_toCid);
APPEND_STRING(dump->_buffer, "\\/");
APPEND_STRING(dump->_buffer, (char const*) m + m->_offsetFromKey);
APPEND_STRING(dump->_buffer, "\"");
TRI_shaped_json_t shaped;
shaped._sid = m->_shape;
shaped._data.length = m->_size - m->_offsetJson;
shaped._data.data = (char*) m + m->_offsetJson;
triagens::basics::LegendReader lr((char const*) m + m->_offsetLegend);
if (! TRI_StringifyArrayShapedJson(&lr, dump->_buffer, &shaped, false)) {
return false;
}
APPEND_STRING(dump->_buffer, "}");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a remove marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerRemove (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::remove_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"tid\":\"");
APPEND_UINT64(dump->_buffer, m->_transactionId);
APPEND_STRING(dump->_buffer, "\",\"key\":\"");
APPEND_STRING(dump->_buffer, (char const*) m + sizeof(triagens::wal::remove_marker_t));
APPEND_STRING(dump->_buffer, "\",\"rev\":\"");
APPEND_UINT64(dump->_buffer, m->_revisionId);
APPEND_STRING(dump->_buffer, "\"");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a transaction marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerTransaction (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
// note: the data layout of begin / commit / abort markers is identical, so
// we cast to a begin transaction marker in all cases
auto m = reinterpret_cast<triagens::wal::transaction_begin_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"tid\":\"");
APPEND_UINT64(dump->_buffer, m->_transactionId);
APPEND_STRING(dump->_buffer, "\"");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a create collection marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerCreateCollection (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::collection_create_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"collection\":");
APPEND_STRING(dump->_buffer, (char const*) m + sizeof(triagens::wal::collection_create_marker_t));
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a drop collection marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerDropCollection (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::collection_drop_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\"");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a rename collection marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerRenameCollection (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::collection_rename_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"collection:{\"name\":\"");
APPEND_STRING(dump->_buffer, (char const*) m + sizeof(triagens::wal::collection_rename_marker_t));
APPEND_STRING(dump->_buffer, "\"}");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a change collection marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerChangeCollection (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::collection_change_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"collection\":");
APPEND_STRING(dump->_buffer, (char const*) m + sizeof(triagens::wal::collection_change_marker_t));
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a create index marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerCreateIndex (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::index_create_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"id\":\"");
APPEND_UINT64(dump->_buffer, m->_indexId);
APPEND_STRING(dump->_buffer, "\",\"index\":");
APPEND_STRING(dump->_buffer, (char const*) m + sizeof(triagens::wal::index_create_marker_t));
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a drop index marker
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarkerDropIndex (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
auto m = reinterpret_cast<triagens::wal::index_drop_marker_t const*>(marker);
APPEND_STRING(dump->_buffer, "\"database\":\"");
APPEND_UINT64(dump->_buffer, m->_databaseId);
APPEND_STRING(dump->_buffer, "\",\"cid\":\"");
APPEND_UINT64(dump->_buffer, m->_collectionId);
APPEND_STRING(dump->_buffer, "\",\"id\":\"");
APPEND_UINT64(dump->_buffer, m->_indexId);
APPEND_STRING(dump->_buffer, "\"");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief translate a marker type to a replication type
////////////////////////////////////////////////////////////////////////////////
static TRI_replication_operation_e TranslateType (TRI_df_marker_t const* marker) {
switch (marker->_type) {
case TRI_WAL_MARKER_DOCUMENT:
return MARKER_DOCUMENT;
case TRI_WAL_MARKER_EDGE:
return MARKER_EDGE;
case TRI_WAL_MARKER_REMOVE:
return MARKER_REMOVE;
case TRI_WAL_MARKER_BEGIN_TRANSACTION:
return TRI_TRANSACTION_START;
case TRI_WAL_MARKER_COMMIT_TRANSACTION:
return TRI_TRANSACTION_COMMIT;
case TRI_WAL_MARKER_ABORT_TRANSACTION:
return TRI_TRANSACTION_ABORT;
case TRI_WAL_MARKER_CREATE_COLLECTION:
return COLLECTION_CREATE;
case TRI_WAL_MARKER_DROP_COLLECTION:
return COLLECTION_DROP;
case TRI_WAL_MARKER_RENAME_COLLECTION:
return COLLECTION_RENAME;
case TRI_WAL_MARKER_CHANGE_COLLECTION:
return COLLECTION_CHANGE;
case TRI_WAL_MARKER_CREATE_INDEX:
return INDEX_CREATE;
case TRI_WAL_MARKER_DROP_INDEX:
return INDEX_DROP;
default:
return REPLICATION_INVALID;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stringify a raw marker from a WAL logfile for a log dump
////////////////////////////////////////////////////////////////////////////////
static bool StringifyWalMarker (TRI_replication_dump_t* dump,
TRI_df_marker_t const* marker) {
APPEND_STRING(dump->_buffer, "{\"tick\":\"");
APPEND_UINT64(dump->_buffer, (uint64_t) marker->_tick);
APPEND_STRING(dump->_buffer, "\",\"type\":");
APPEND_UINT64(dump->_buffer, (uint64_t) TranslateType(marker));
APPEND_STRING(dump->_buffer, ",");
bool result = true;
switch (marker->_type) {
case TRI_WAL_MARKER_ATTRIBUTE:
case TRI_WAL_MARKER_SHAPE: {
TRI_ASSERT(false);
return false;
}
case TRI_WAL_MARKER_DOCUMENT: {
result = StringifyWalMarkerDocument(dump, marker);
break;
}
case TRI_WAL_MARKER_EDGE: {
result = StringifyWalMarkerEdge(dump, marker);
break;
}
case TRI_WAL_MARKER_REMOVE: {
result = StringifyWalMarkerRemove(dump, marker);
break;
}
case TRI_WAL_MARKER_BEGIN_TRANSACTION:
case TRI_WAL_MARKER_COMMIT_TRANSACTION:
case TRI_WAL_MARKER_ABORT_TRANSACTION: {
result = StringifyWalMarkerTransaction(dump, marker);
break;
}
case TRI_WAL_MARKER_CREATE_COLLECTION: {
result = StringifyWalMarkerCreateCollection(dump, marker);
break;
}
case TRI_WAL_MARKER_DROP_COLLECTION: {
result = StringifyWalMarkerDropCollection(dump, marker);
break;
}
case TRI_WAL_MARKER_RENAME_COLLECTION: {
result = StringifyWalMarkerRenameCollection(dump, marker);
break;
}
case TRI_WAL_MARKER_CHANGE_COLLECTION: {
result = StringifyWalMarkerChangeCollection(dump, marker);
break;
}
case TRI_WAL_MARKER_CREATE_INDEX: {
result = StringifyWalMarkerCreateIndex(dump, marker);
break;
}
case TRI_WAL_MARKER_DROP_INDEX: {
result = StringifyWalMarkerDropIndex(dump, marker);
break;
}
}
APPEND_STRING(dump->_buffer, "}\n");
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a marker is replicated
////////////////////////////////////////////////////////////////////////////////
static inline bool MustReplicateMarker (TRI_df_marker_t const* marker) {
return (marker->_type == TRI_WAL_MARKER_DOCUMENT ||
marker->_type == TRI_WAL_MARKER_EDGE ||
marker->_type == TRI_WAL_MARKER_REMOVE ||
marker->_type == TRI_WAL_MARKER_BEGIN_TRANSACTION ||
marker->_type == TRI_WAL_MARKER_COMMIT_TRANSACTION ||
marker->_type == TRI_WAL_MARKER_ABORT_TRANSACTION ||
marker->_type == TRI_WAL_MARKER_CREATE_COLLECTION ||
marker->_type == TRI_WAL_MARKER_DROP_COLLECTION ||
marker->_type == TRI_WAL_MARKER_RENAME_COLLECTION ||
marker->_type == TRI_WAL_MARKER_CHANGE_COLLECTION ||
marker->_type == TRI_WAL_MARKER_CREATE_INDEX ||
marker->_type == TRI_WAL_MARKER_DROP_INDEX);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dump data from a collection
////////////////////////////////////////////////////////////////////////////////
static int DumpCollection (TRI_replication_dump_t* dump,
TRI_document_collection_t* document,
TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax,
uint64_t chunkSize,
bool withTicks,
bool translateCollectionIds) {
TRI_vector_t datafiles;
TRI_string_buffer_t* buffer;
TRI_voc_tick_t lastFoundTick;
TRI_voc_tid_t lastTid;
size_t i, n;
int res;
bool hasMore;
bool bufferFull;
bool ignoreMarkers;
// The following fake transaction allows us to access data pointers
// and shapers, essentially disabling the runtime checks. This is OK,
// since the dump only considers data files (and not WAL files), so
// the collector has no trouble. Also, the data files of the collection
// are protected from the compactor by a barrier and the dump only goes
// until a certain tick.
triagens::arango::TransactionBase trx(true);
LOG_TRACE("dumping collection %llu, tick range %llu - %llu, chunk size %llu",
(unsigned long long) document->_info._cid,
(unsigned long long) dataMin,
(unsigned long long) dataMax,
(unsigned long long) chunkSize);
buffer = dump->_buffer;
datafiles = GetRangeDatafiles(document, dataMin, dataMax);
// setup some iteration state
lastFoundTick = 0;
lastTid = 0;
res = TRI_ERROR_NO_ERROR;
hasMore = true;
bufferFull = false;
ignoreMarkers = false;
n = datafiles._length;
for (i = 0; i < n; ++i) {
df_entry_t* e = (df_entry_t*) TRI_AtVector(&datafiles, i);
TRI_datafile_t* datafile = e->_data;
char const* ptr;
char const* end;
// we are reading from a journal that might be modified in parallel
// so we must read-lock it
if (e->_isJournal) {
TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
}
else {
TRI_ASSERT(datafile->_isSealed);
}
ptr = datafile->_data;
if (res == TRI_ERROR_NO_ERROR) {
// no error so far. start iterating
end = ptr + datafile->_currentSize;
}
else {
// some error occurred. don't iterate
end = ptr;
}
while (ptr < end) {
TRI_df_marker_t* marker = (TRI_df_marker_t*) ptr;
TRI_voc_tick_t foundTick;
TRI_voc_tid_t tid;
if (marker->_size == 0 || marker->_type <= TRI_MARKER_MIN) {
// end of datafile
break;
}
ptr += TRI_DF_ALIGN_BLOCK(marker->_size);
if (marker->_type == TRI_DF_MARKER_ATTRIBUTE ||
marker->_type == TRI_DF_MARKER_SHAPE) {
// fully ignore these marker types. they don't need to be replicated,
// but we also cannot stop iteration if we find one of these
continue;
}
// get the marker's tick and check whether we should include it
foundTick = marker->_tick;
if (foundTick <= dataMin) {
// marker too old
continue;
}
if (foundTick > dataMax) {
// marker too new
hasMore = false;
goto NEXT_DF;
}
if (marker->_type != TRI_DOC_MARKER_KEY_DOCUMENT &&
marker->_type != TRI_DOC_MARKER_KEY_EDGE &&
marker->_type != TRI_DOC_MARKER_KEY_DELETION) {
// found a non-data marker...
// check if we can abort searching
if (foundTick >= dataMax ||
(foundTick >= e->_tickMax && i == (n - 1))) {
// fetched the last available marker
hasMore = false;
goto NEXT_DF;
}
continue;
}
// note the last tick we processed
lastFoundTick = foundTick;
// handle aborted/unfinished transactions
if (document->_failedTransactions == nullptr) {
// there are no failed transactions
ignoreMarkers = false;
}
else {
// get transaction id of marker
if (marker->_type == TRI_DOC_MARKER_KEY_DELETION) {
tid = ((TRI_doc_deletion_key_marker_t const*) marker)->_tid;
}
else {
tid = ((TRI_doc_document_key_marker_t const*) marker)->_tid;
}
// check if marker is from an aborted transaction
if (tid > 0) {
if (tid != lastTid) {
ignoreMarkers = (document->_failedTransactions->find(tid) != document->_failedTransactions->end());
}
lastTid = tid;
}
if (ignoreMarkers) {
continue;
}
}
if (! StringifyMarkerDump(dump, document, marker, withTicks, translateCollectionIds)) {
res = TRI_ERROR_INTERNAL;
goto NEXT_DF;
}
if (foundTick >= dataMax ||
(foundTick >= e->_tickMax && i == (n - 1))) {
// fetched the last available marker
hasMore = false;
goto NEXT_DF;
}
if ((uint64_t) TRI_LengthStringBuffer(buffer) > chunkSize) {
// abort the iteration
bufferFull = true;
goto NEXT_DF;
}
}
NEXT_DF:
if (e->_isJournal) {
// read-unlock the journal
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
}
if (res != TRI_ERROR_NO_ERROR || ! hasMore || bufferFull) {
break;
}
}
TRI_DestroyVector(&datafiles);
if (res == TRI_ERROR_NO_ERROR) {
if (lastFoundTick > 0) {
// data available for requested range
dump->_lastFoundTick = lastFoundTick;
dump->_hasMore = hasMore;
dump->_bufferFull = bufferFull;
}
else {
// no data available for requested range
dump->_lastFoundTick = 0;
dump->_hasMore = false;
dump->_bufferFull = false;
}
}
return res;
}
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief dump data from a collection
////////////////////////////////////////////////////////////////////////////////
int TRI_DumpCollectionReplication (TRI_replication_dump_t* dump,
TRI_vocbase_col_t* col,
TRI_voc_tick_t dataMin,
TRI_voc_tick_t dataMax,
uint64_t chunkSize,
bool withTicks,
bool translateCollectionIds) {
TRI_ASSERT(col != nullptr);
TRI_ASSERT(col->_collection != nullptr);
TRI_document_collection_t* document = col->_collection;
// create a barrier so the underlying collection is not unloaded
TRI_barrier_t* b = TRI_CreateBarrierReplication(&document->_barrierList);
if (b == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
// block compaction
TRI_ReadLockReadWriteLock(&document->_compactionLock);
int res = DumpCollection(dump, document, dataMin, dataMax, chunkSize, withTicks, translateCollectionIds);
TRI_ReadUnlockReadWriteLock(&document->_compactionLock);
TRI_FreeBarrier(b);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dump data from the replication log
////////////////////////////////////////////////////////////////////////////////
int TRI_DumpLogReplication (TRI_vocbase_t* vocbase,
TRI_replication_dump_t* dump,
TRI_voc_tick_t tickMin,
TRI_voc_tick_t tickMax,
uint64_t chunkSize) {
LOG_TRACE("dumping log, tick range %llu - %llu, chunk size %llu",
(unsigned long long) tickMin,
(unsigned long long) tickMax,
(unsigned long long) chunkSize);
// ask the logfile manager which datafiles qualify
std::vector<triagens::wal::Logfile*> logfiles = triagens::wal::LogfileManager::instance()->getLogfilesForTickRange(tickMin, tickMax);
size_t const n = logfiles.size();
// setup some iteration state
int res = TRI_ERROR_NO_ERROR;
TRI_voc_tick_t lastFoundTick = 0;
bool hasMore = true;
bool bufferFull = false;
try {
// iterate over the datafiles found
for (size_t i = 0; i < n; ++i) {
triagens::wal::Logfile* logfile = logfiles[i];
char const* ptr;
char const* end;
triagens::wal::LogfileManager::instance()->getActiveLogfileRegion(logfile, ptr, end);
while (ptr < end) {
TRI_df_marker_t const* marker = reinterpret_cast<TRI_df_marker_t const*>(ptr);
if (marker->_size == 0 || marker->_type <= TRI_MARKER_MIN) {
// end of datafile
break;
}
ptr += TRI_DF_ALIGN_BLOCK(marker->_size);
// get the marker's tick and check whether we should include it
TRI_voc_tick_t foundTick = marker->_tick;
if (foundTick <= tickMin) {
// marker too old
continue;
}
if (foundTick >= tickMax) {
hasMore = false;
}
if (foundTick > tickMax) {
// marker too new
break;
}
if (! MustReplicateMarker(marker)) {
// check if we can abort searching
continue;
}
// note the last tick we processed
lastFoundTick = foundTick;
if (! StringifyWalMarker(dump, marker)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
if ((uint64_t) TRI_LengthStringBuffer(dump->_buffer) > chunkSize) {
// abort the iteration
bufferFull = true;
break;
}
}
if (res != TRI_ERROR_NO_ERROR ||
! hasMore ||
bufferFull) {
break;
}
}
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
// always return the logfiles we have used
triagens::wal::LogfileManager::instance()->returnLogfiles(logfiles);
if (res == TRI_ERROR_NO_ERROR) {
if (lastFoundTick > 0) {
// data available for requested range
dump->_lastFoundTick = lastFoundTick;
dump->_hasMore = hasMore;
dump->_bufferFull = bufferFull;
}
else {
// no data available for requested range
dump->_lastFoundTick = 0;
dump->_hasMore = false;
dump->_bufferFull = false;
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise a replication dump container
////////////////////////////////////////////////////////////////////////////////
int TRI_InitDumpReplication (TRI_replication_dump_t* dump,
TRI_vocbase_t* vocbase,
size_t bufferSize) {
int res;
TRI_ASSERT(vocbase != nullptr);
dump->_vocbase = vocbase;
dump->_lastFoundTick = 0;
dump->_lastSid = 0;
dump->_lastShape = nullptr;
dump->_failed = false;
dump->_bufferFull = false;
dump->_hasMore = false;
dump->_buffer = TRI_CreateSizedStringBuffer(TRI_CORE_MEM_ZONE, bufferSize);
if (dump->_buffer == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
res = TRI_InitAssociativePointer(&dump->_collectionNames,
TRI_UNKNOWN_MEM_ZONE,
HashKeyCid,
HashElementCid,
IsEqualKeyElementCid,
nullptr);
if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeStringBuffer(TRI_CORE_MEM_ZONE, dump->_buffer);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy a replication dump container
////////////////////////////////////////////////////////////////////////////////
void TRI_DestroyDumpReplication (TRI_replication_dump_t* dump) {
for (size_t i = 0; i < dump->_collectionNames._nrAlloc; ++i) {
resolved_name_t* found = static_cast<resolved_name_t*>(dump->_collectionNames._table[i]);
if (found != nullptr) {
if (found->_name != nullptr) {
// name can be NULL
TRI_Free(TRI_UNKNOWN_MEM_ZONE, found->_name);
}
TRI_Free(TRI_UNKNOWN_MEM_ZONE, found);
}
}
TRI_DestroyAssociativePointer(&dump->_collectionNames);
TRI_FreeStringBuffer(TRI_CORE_MEM_ZONE, dump->_buffer);
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End: