1
0
Fork 0

documented upgrade procedure

This commit is contained in:
Jan Steemann 2013-04-12 10:25:01 +02:00
parent 3e4ddb049a
commit 3629eecddd
2 changed files with 205 additions and 109 deletions

View File

@ -13,5 +13,6 @@ ArangoDB's User Manual (@VERSION) {#UserManual}
@CHAPTER_REF{Aql}
@CHAPTER_REF{AqlExamples}
@CHAPTER_REF{UserManualActions}
@CHAPTER_REF{Transactions}
@CHAPTER_REF{CommandLine}
@CHAPTER_REF{Glossary}

View File

@ -38,6 +38,10 @@
#include "VocBase/shape-collection.h"
#include "VocBase/voc-shaper.h"
// -----------------------------------------------------------------------------
// --SECTION-- COLLECTION MIGRATION
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
@ -48,7 +52,7 @@
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief hashs the document id
/// @brief hashes the document id. this is used from the UpgradeOpenIterator
////////////////////////////////////////////////////////////////////////////////
static uint64_t HashKeyHeader (TRI_associative_pointer_t* array, void const* key) {
@ -56,7 +60,7 @@ static uint64_t HashKeyHeader (TRI_associative_pointer_t* array, void const* key
}
////////////////////////////////////////////////////////////////////////////////
/// @brief hashs the document header
/// @brief hashes the document header. this is used from UpgradeOpenIterator
////////////////////////////////////////////////////////////////////////////////
static uint64_t HashElementDocument (TRI_associative_pointer_t* array, void const* element) {
@ -77,6 +81,15 @@ static bool IsEqualKeyDocument (TRI_associative_pointer_t* array, void const* ke
////////////////////////////////////////////////////////////////////////////////
/// @brief this iterates over all markers of a collection on upgrade
///
/// It will build a standalone temporary index with all documents, without
/// using any of the existing functionality in document-collection.c or
/// primary-collection.c. The reason is that the iteration over datafiles has
/// changed between 1.2 and 1.3, and this function preserves the logic from
/// 1.2. It is thus used to read 1.2-collections.
/// After the iteration, we have all the (surviving) documents in the
/// temporary primary index, which will then be used to write out the documents
/// to a new datafile.
////////////////////////////////////////////////////////////////////////////////
static bool UpgradeOpenIterator (TRI_df_marker_t const* marker,
@ -176,9 +189,18 @@ static bool UpgradeOpenIterator (TRI_df_marker_t const* marker,
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the numeric part from a filename
@ -207,7 +229,7 @@ static uint64_t GetNumericFilenamePart (const char* filename) {
////////////////////////////////////////////////////////////////////////////////
/// @brief compare two filenames, based on the numeric part contained in
/// the filename
/// the filename. this is used to sort datafile filenames on startup
////////////////////////////////////////////////////////////////////////////////
static int FilenameComparator (const void* lhs, const void* rhs) {
@ -1529,6 +1551,19 @@ void TRI_DestroyFileStructureCollection (TRI_col_file_structure_t* info) {
////////////////////////////////////////////////////////////////////////////////
/// @brief upgrade a collection
///
/// this is called when starting the database for all collections that have a
/// too low (< TRI_COL_VERSION) "version" attribute.
/// For now, the upgrade procedure will grab all existing journals, datafiles,
/// and compactors files and read them into memory, into a temporary index.
/// It will use the UpgradeOpenIterator function callback for that.
///
/// All documents that are present in that temporary index will be written
/// out into a new datafile. That datafiles will only contain the existing
/// documents (at most one per unique key), and the order of the documents in
/// it does not matter. That file can then be read with the 1.3 OpenIterator,
/// which has a slightly different way of determining which document revision
/// is active.
////////////////////////////////////////////////////////////////////////////////
int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
@ -1543,12 +1578,12 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
TRI_ASSERT_MAINTAINER(info->_version < TRI_COL_VERSION);
// iterate all files
// find all files in the collection directory
files = TRI_FilesDirectory(path);
n = files._length;
// .............................................................................
// remove all .new files
// remove all .new files. they are probably left-overs from previous runs
// .............................................................................
regcomp(&re, "^.*\\.new$", REG_EXTENDED);
@ -1559,8 +1594,14 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
if (regexec(&re, file, sizeof(matches) / sizeof(matches[0]), matches, 0) == 0) {
char* fqn = TRI_Concatenate2File(path, file);
int r;
TRI_UnlinkFile(fqn);
r = TRI_UnlinkFile(fqn);
if (r != TRI_ERROR_NO_ERROR) {
LOG_WARNING("could not remove previous temporary file '%s': '%s'", fqn, TRI_errno_string(r));
}
TRI_FreeString(TRI_CORE_MEM_ZONE, fqn);
}
}
@ -1582,7 +1623,10 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
size_t markers;
int fdout;
outfile = NULL;
TRI_InitVectorPointer(&datafiles, TRI_CORE_MEM_ZONE);
// we're interested in these files...
regcomp(&re, "^(compactor|journal|datafile)-.*\\.db$", REG_EXTENDED);
for (i = 0; i < n; ++i) {
@ -1592,135 +1636,182 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
if (regexec(&re, file, sizeof(matches) / sizeof(matches[0]), matches, 0) == 0) {
char* fqn = TRI_Concatenate2File(path, file);
// open the datafile, and push it into a vector of datafiles
TRI_datafile_t* datafile = TRI_OpenDatafile(fqn);
TRI_FreeString(TRI_CORE_MEM_ZONE, fqn);
if (datafile != NULL) {
TRI_PushBackVectorPointer(&datafiles, datafile);
}
else {
LOG_WARNING("could not open datafile '%s'", fqn);
res = TRI_errno();
}
TRI_FreeString(TRI_CORE_MEM_ZONE, fqn);
}
}
regfree(&re);
TRI_InitAssociativePointer(&primaryIndex,
TRI_UNKNOWN_MEM_ZONE,
HashKeyHeader,
HashElementDocument,
IsEqualKeyDocument,
0);
for (i = 0; i < datafiles._length; ++i) {
TRI_datafile_t* df = datafiles._buffer[i];
// all datafiles opened
TRI_IterateDatafile(df, UpgradeOpenIterator, &primaryIndex, false);
}
// calculate the length for the new datafile
markers = 0;
neededSize = sizeof(TRI_df_header_marker_t) +
sizeof(TRI_col_header_marker_t) +
sizeof(TRI_df_footer_marker_t);
for (i = 0; i < primaryIndex._nrAlloc; ++i) {
TRI_doc_mptr_t* header = primaryIndex._table[i];
if (res == TRI_ERROR_NO_ERROR) {
// build an in-memory index of documents
TRI_InitAssociativePointer(&primaryIndex,
TRI_UNKNOWN_MEM_ZONE,
HashKeyHeader,
HashElementDocument,
IsEqualKeyDocument,
0);
if (header != NULL && header->_validTo == 0) {
TRI_df_marker_t const* marker = header->_data;
// read all markers in the existing datafiles
for (i = 0; i < datafiles._length; ++i) {
TRI_datafile_t* df = datafiles._buffer[i];
if (marker != NULL) {
neededSize += TRI_DF_ALIGN_BLOCK(marker->_size);
markers++;
}
TRI_IterateDatafile(df, UpgradeOpenIterator, &primaryIndex, false);
}
}
actualSize = ((neededSize + PageSize - 1) / PageSize) * PageSize;
written = 0;
{
char* fidString;
char* jname;
fidString = TRI_StringUInt64(TRI_NewTickVocBase());
jname = TRI_Concatenate3String("datafile-", fidString, ".db.new");
TRI_FreeString(TRI_CORE_MEM_ZONE, fidString);
outfile = TRI_Concatenate2File(path, jname);
TRI_FreeString(TRI_CORE_MEM_ZONE, jname);
}
LOG_INFO("migrating data for collection '%s' (%llu) into new datafile '%s' (%llu actual markers)",
info->_name,
(unsigned long long) info->_cid,
outfile,
(unsigned long long) markers);
fdout = TRI_CREATE(outfile, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
if (fdout < 0) {
res = TRI_ERROR_CANNOT_WRITE_FILE;
}
else {
// create the header & footer
TRI_df_header_marker_t header;
TRI_col_header_marker_t cm;
TRI_df_footer_marker_t footer;
TRI_InitMarker(&header.base, TRI_DF_MARKER_HEADER, sizeof(TRI_df_header_marker_t), TRI_NewTickVocBase());
header._version = TRI_DF_VERSION;
header._maximalSize = actualSize;
header._fid = TRI_NewTickVocBase();
header.base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) &header.base, header.base._size));
written += TRI_WRITE(fdout, &header.base, header.base._size);
TRI_InitMarker(&cm.base, TRI_COL_MARKER_HEADER, sizeof(TRI_col_header_marker_t), TRI_NewTickVocBase());
cm._type = (TRI_col_type_t) info->_type;
cm._cid = info->_cid;
cm.base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) &cm.base, cm.base._size));
written += TRI_WRITE(fdout, &cm.base, cm.base._size);
// calculate the length for the new datafile
markers = 0;
neededSize = sizeof(TRI_df_header_marker_t) +
sizeof(TRI_col_header_marker_t) +
sizeof(TRI_df_footer_marker_t);
// go over all documents in the index and calculate the total length
for (i = 0; i < primaryIndex._nrAlloc; ++i) {
TRI_doc_mptr_t* mptr = primaryIndex._table[i];
TRI_doc_mptr_t* header = primaryIndex._table[i];
if (mptr != NULL && mptr->_validTo == 0) {
TRI_df_marker_t const* marker = mptr->_data;
if (header != NULL && header->_validTo == 0) {
TRI_df_marker_t const* marker = header->_data;
if (marker != NULL) {
ssize_t w = TRI_WRITE(fdout, marker, TRI_DF_ALIGN_BLOCK(marker->_size));
if (w <= 0) {
res = TRI_ERROR_INTERNAL;
break;
}
written += w;
neededSize += TRI_DF_ALIGN_BLOCK(marker->_size);
markers++;
}
}
}
TRI_InitMarker(&footer.base, TRI_DF_MARKER_FOOTER, sizeof(TRI_df_footer_marker_t), TRI_NewTickVocBase());
footer.base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) &footer.base, footer.base._size));
written += TRI_WRITE(fdout, &footer.base, footer.base._size);
TRI_CLOSE(fdout);
// round up to nearest page size
actualSize = ((neededSize + PageSize - 1) / PageSize) * PageSize;
written = 0;
// generate the name for the new datafile: datafile-xxx.db.new
{
char* fidString;
char* jname;
fidString = TRI_StringUInt64(TRI_NewTickVocBase());
jname = TRI_Concatenate3String("datafile-", fidString, ".db.new");
TRI_FreeString(TRI_CORE_MEM_ZONE, fidString);
outfile = TRI_Concatenate2File(path, jname);
TRI_FreeString(TRI_CORE_MEM_ZONE, jname);
}
LOG_INFO("migrating data for collection '%s' (id: %llu, %llu documents) into new datafile '%s'",
info->_name,
(unsigned long long) info->_cid,
(unsigned long long) markers,
outfile);
// create the outfile
fdout = TRI_CREATE(outfile, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
if (fdout < 0) {
res = TRI_ERROR_CANNOT_WRITE_FILE;
LOG_ERROR("cannot create new datafile '%s'", outfile);
}
else {
TRI_df_header_marker_t header;
TRI_col_header_marker_t cm;
TRI_df_footer_marker_t footer;
// datafile header
TRI_InitMarker(&header.base, TRI_DF_MARKER_HEADER, sizeof(TRI_df_header_marker_t), TRI_NewTickVocBase());
header._version = TRI_DF_VERSION;
header._maximalSize = actualSize;
header._fid = TRI_NewTickVocBase();
header.base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) &header.base, header.base._size));
written += TRI_WRITE(fdout, &header.base, header.base._size);
// col header
TRI_InitMarker(&cm.base, TRI_COL_MARKER_HEADER, sizeof(TRI_col_header_marker_t), TRI_NewTickVocBase());
cm._type = (TRI_col_type_t) info->_type;
cm._cid = info->_cid;
cm.base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) &cm.base, cm.base._size));
written += TRI_WRITE(fdout, &cm.base, cm.base._size);
// write all surviving documents into the datafile
for (i = 0; i < primaryIndex._nrAlloc; ++i) {
TRI_doc_mptr_t* mptr = primaryIndex._table[i];
if (mptr != NULL && mptr->_validTo == 0) {
TRI_df_marker_t const* marker = mptr->_data;
if (marker != NULL) {
TRI_doc_document_key_marker_t* doc;
void* buffer;
ssize_t w;
assert(marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT ||
marker->_type == TRI_DOC_MARKER_KEY_EDGE);
// copy the old marker
buffer = TRI_Allocate(TRI_CORE_MEM_ZONE, TRI_DF_ALIGN_BLOCK(marker->_size), true);
memcpy(buffer, marker, marker->_size);
doc = (TRI_doc_document_key_marker_t*) buffer;
// reset _tid value to 0
doc->_tid = 0;
// recalc crc
doc->base._crc = 0;
doc->base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) doc, marker->_size));
w = TRI_WRITE(fdout, buffer, TRI_DF_ALIGN_BLOCK(marker->_size));
TRI_Free(TRI_CORE_MEM_ZONE, buffer);
if (w <= 0) {
res = TRI_ERROR_INTERNAL;
LOG_ERROR("an error occurred while writing documents into datafile '%s'", outfile);
break;
}
written += w;
}
}
}
// datafile footer
TRI_InitMarker(&footer.base, TRI_DF_MARKER_FOOTER, sizeof(TRI_df_footer_marker_t), TRI_NewTickVocBase());
footer.base._crc = TRI_FinalCrc32(TRI_BlockCrc32(TRI_InitialCrc32(), (char const*) &footer.base, footer.base._size));
written += TRI_WRITE(fdout, &footer.base, footer.base._size);
TRI_CLOSE(fdout);
}
TRI_DestroyAssociativePointer(&primaryIndex);
// rename target file (by removing the .new suffix)
if (res == TRI_ERROR_NO_ERROR) {
char* dst = TRI_DuplicateString2(outfile, strlen(outfile) - strlen(".new"));
res = TRI_RenameFile(outfile, dst);
TRI_FreeString(TRI_CORE_MEM_ZONE, dst);
}
}
TRI_DestroyAssociativePointer(&primaryIndex);
// rename target file
if (res == TRI_ERROR_NO_ERROR) {
char* dst = TRI_DuplicateString2(outfile, strlen(outfile) - strlen(".new"));
res = TRI_RenameFile(outfile, dst);
TRI_FreeString(TRI_CORE_MEM_ZONE, dst);
if (outfile != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, outfile);
}
// close datafiles
// close "old" datafiles
for (i = 0; i < datafiles._length; ++i) {
TRI_datafile_t* df = datafiles._buffer[i];
char* old;
@ -1730,6 +1821,7 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
TRI_CloseDatafile(df);
TRI_FreeDatafile(df);
// if no error happened, we'll also rename the old datafiles to ".old"
if (res == TRI_ERROR_NO_ERROR) {
char* dst;
@ -1747,6 +1839,7 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
TRI_DestroyVectorString(&files);
if (res == TRI_ERROR_NO_ERROR) {
// when no error occurred, we'll bump the version number in the collection parameters file.
info->_version = TRI_COL_VERSION;
res = TRI_SaveCollectionInfo(path, info, true);
}
@ -1755,10 +1848,10 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
}
////////////////////////////////////////////////////////////////////////////////
/// @brief iterate over the markers in collection journals
/// @brief iterate over the markers in a collection's datafiles
///
/// this function is called on server startup for all collections. we do this
/// to get the last tick used in a collection
/// this function may be called on server startup for all collections, in order
/// to get the last tick value used
////////////////////////////////////////////////////////////////////////////////
bool TRI_IterateTicksCollection (const char* const path,
@ -1776,6 +1869,8 @@ bool TRI_IterateTicksCollection (const char* const path,
result = IterateFiles(&structure._datafiles, iterator, data, false);
}
else {
// compactor files don't need to be iterated... they just contain data copied
// from other files, so their tick values will never be any higher
result = IterateFiles(&structure._journals, iterator, data, true);
}