1
0
Fork 0

write prologue markers for document & remove markers

This commit is contained in:
Jan Steemann 2016-02-29 17:08:38 +01:00
parent 6ff12b98db
commit 254f6b49b9
18 changed files with 248 additions and 383 deletions

View File

@ -31,7 +31,7 @@
#include "Utils/AqlTransaction.h"
#include "VocBase/shaped-json.h"
struct TRI_df_marker_s;
struct TRI_df_marker_t;
struct TRI_doc_mptr_t;
struct TRI_json_t;
@ -71,7 +71,7 @@ class ModificationBlock : public ExecutionBlock {
/// @brief constructs a master pointer from the marker passed
//////////////////////////////////////////////////////////////////////////////
void constructMptr(TRI_doc_mptr_t*, TRI_df_marker_s const*) const;
void constructMptr(TRI_doc_mptr_t*, TRI_df_marker_t const*) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief check whether a shard key value has changed

View File

@ -30,7 +30,7 @@
#include <v8.h>
struct TRI_df_marker_s;
struct TRI_df_marker_t;
struct TRI_doc_mptr_t;
struct TRI_document_collection_t;
@ -46,7 +46,7 @@ namespace V8VPackWrapper {
v8::Handle<v8::Value> wrap(v8::Isolate*, arangodb::Transaction*,
TRI_voc_cid_t cid, arangodb::DocumentDitch* ditch,
struct TRI_df_marker_s const*);
struct TRI_df_marker_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief wraps a VPackSlice

View File

@ -24,7 +24,7 @@
#include "cleanup.h"
#include "Basics/files.h"
#include "Basics/Logger.h"
#include "Basics/tri-strings.h"
#include "Basics/ReadLocker.h"
#include "Utils/CursorRepository.h"
#include "VocBase/compactor.h"
#include "VocBase/Ditch.h"

View File

@ -1407,6 +1407,8 @@ char const* TRI_NameMarkerDatafile(TRI_df_marker_t const* marker) {
return "header";
case TRI_DF_MARKER_FOOTER:
return "footer";
case TRI_DF_MARKER_PROLOGUE:
return "prologue";
// datafile markers
case TRI_DOC_MARKER_KEY_DOCUMENT:

View File

@ -107,25 +107,26 @@ struct TRI_datafile_t;
/// @brief state of the datafile
////////////////////////////////////////////////////////////////////////////////
typedef enum {
enum TRI_df_state_e {
TRI_DF_STATE_CLOSED = 1, // datafile is closed
TRI_DF_STATE_READ = 2, // datafile is opened read only
TRI_DF_STATE_WRITE = 3, // datafile is opened read/append
TRI_DF_STATE_OPEN_ERROR = 4, // an error has occurred while opening
TRI_DF_STATE_WRITE_ERROR = 5, // an error has occurred while writing
TRI_DF_STATE_RENAME_ERROR = 6 // an error has occurred while renaming
} TRI_df_state_e;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief type of the marker
////////////////////////////////////////////////////////////////////////////////
typedef enum {
enum TRI_df_marker_type_e {
TRI_MARKER_MIN = 999, // not a real marker type,
// but used for bounds checking
TRI_DF_MARKER_HEADER = 1000,
TRI_DF_MARKER_FOOTER = 1001,
TRI_DF_MARKER_PROLOGUE = 1002,
TRI_DF_MARKER_BLANK = 1100,
@ -164,7 +165,7 @@ typedef enum {
TRI_MARKER_MAX // again, this is not a real
// marker, but we use it for
// bounds checking
} TRI_df_marker_type_e;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief storage type of the marker
@ -182,7 +183,7 @@ typedef uint32_t TRI_df_version_t;
/// @brief scan result
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_scan_s {
struct TRI_df_scan_t {
TRI_voc_size_t _currentSize;
TRI_voc_size_t _maximalSize;
TRI_voc_size_t _endPosition;
@ -192,7 +193,7 @@ typedef struct TRI_df_scan_s {
uint32_t _status;
bool _isSealed;
} TRI_df_scan_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief scan result entry
@ -205,7 +206,7 @@ typedef struct TRI_df_scan_s {
/// 5 - CRC failed
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_scan_entry_s {
struct TRI_df_scan_entry_t {
TRI_voc_size_t _position;
TRI_voc_size_t _size;
TRI_voc_size_t _realSize;
@ -217,7 +218,7 @@ typedef struct TRI_df_scan_entry_s {
char* _diagnosis;
char* _key;
char const* _typeName;
} TRI_df_scan_entry_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief datafile
@ -310,7 +311,7 @@ struct TRI_datafile_t {
/// and _crc the second.
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_marker_s {
struct TRI_df_marker_t {
TRI_voc_size_t _size; // 4 bytes, must be supplied
TRI_voc_crc_t _crc; // 4 bytes, will be generated
@ -321,7 +322,7 @@ typedef struct TRI_df_marker_s {
#endif
TRI_voc_tick_t _tick; // 8 bytes, will be generated
} TRI_df_marker_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief datafile header marker
@ -353,13 +354,24 @@ typedef struct TRI_df_marker_s {
/// </table>
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_header_marker_s {
struct TRI_df_header_marker_t {
TRI_df_marker_t base; // 24 bytes
TRI_df_version_t _version; // 4 bytes
TRI_voc_size_t _maximalSize; // 4 bytes
TRI_voc_tick_t _fid; // 8 bytes
} TRI_df_header_marker_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief datafile footer marker
////////////////////////////////////////////////////////////////////////////////
struct TRI_df_prologue_marker_t {
TRI_df_marker_t base; // 24 bytes
TRI_voc_tick_t _databaseId; // 8 bytes
TRI_voc_cid_t _collectionId; // 8 bytes
};
////////////////////////////////////////////////////////////////////////////////
/// @brief datafile footer marker
@ -387,18 +399,18 @@ typedef struct TRI_df_header_marker_s {
/// contains a footer is sealed and read-only.
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_footer_marker_s {
struct TRI_df_footer_marker_t {
TRI_df_marker_t base; // 24 bytes
TRI_voc_size_t _maximalSize; // 4 bytes
TRI_voc_size_t _totalSize; // 4 bytes
} TRI_df_footer_marker_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief document datafile header marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_col_header_marker_s {
struct TRI_col_header_marker_t {
TRI_df_marker_t base; // 24 bytes
TRI_col_type_t _type; // 4 bytes
@ -408,28 +420,7 @@ typedef struct TRI_col_header_marker_s {
#endif
TRI_voc_cid_t _cid; // 8 bytes
} TRI_col_header_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief datafile attribute marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_attribute_marker_s {
TRI_df_marker_t base;
TRI_shape_aid_t _aid;
TRI_shape_size_t _size;
// char name[]
} TRI_df_attribute_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief datafile shape marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_df_shape_marker_s {
TRI_df_marker_t base;
} TRI_df_shape_marker_t;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief document datafile marker with key
@ -482,50 +473,6 @@ typedef struct TRI_doc_deletion_key_marker_s {
uint16_t _offsetKey;
} TRI_doc_deletion_key_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief begin transaction marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_doc_begin_transaction_marker_s {
TRI_df_marker_t base;
TRI_voc_tid_t _tid;
uint32_t _numCollections;
#ifdef TRI_PADDING_32
char _padding_begin_marker[4];
#endif
} TRI_doc_begin_transaction_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief commit transaction marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_doc_commit_transaction_marker_s {
TRI_df_marker_t base;
TRI_voc_tid_t _tid;
} TRI_doc_commit_transaction_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief abort transaction marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_doc_abort_transaction_marker_s {
TRI_df_marker_t base;
TRI_voc_tid_t _tid;
} TRI_doc_abort_transaction_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief prepare transaction marker
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_doc_prepare_transaction_marker_s {
TRI_df_marker_t base;
TRI_voc_tid_t _tid;
} TRI_doc_prepare_transaction_marker_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new datafile
///

View File

@ -32,6 +32,7 @@
#include "Basics/Logger.h"
#include "Basics/tri-strings.h"
#include "Basics/ThreadPool.h"
#include "Basics/WriteLocker.h"
#include "Cluster/ServerState.h"
#include "FulltextIndex/fulltext-index.h"
#include "Indexes/EdgeIndex.h"

View File

@ -677,7 +677,7 @@ static TRI_voc_cid_t GetCollectionFromWalMarker(TRI_df_marker_t const* marker) {
if (!slice.isObject()) {
return 0;
}
VPackSlice const id = slice.get("id");
VPackSlice const id = slice.get("cid");
if (id.isString()) {
return std::stoull(id.copyString());
}

View File

@ -1056,17 +1056,18 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
bool& waitForSync) {
TRI_ASSERT(operation.header != nullptr);
TRI_document_collection_t* document = operation.document;
bool const isSingleOperationTransaction = IsSingleOperationTransaction(trx);
// upgrade the info for the transaction
if (waitForSync || operation.document->_info.waitForSync()) {
if (waitForSync || document->_info.waitForSync()) {
trx->_waitForSync = true;
}
// default is false
waitForSync = false;
if (isSingleOperationTransaction) {
waitForSync |= operation.document->_info.waitForSync();
waitForSync |= document->_info.waitForSync();
}
TRI_IF_FAILURE("TransactionOperationNoSlot") { return TRI_ERROR_DEBUG; }
@ -1086,13 +1087,12 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
TRI_voc_fid_t fid = 0;
void const* position = nullptr;
TRI_document_collection_t* document = operation.document;
if (operation.marker->fid() == 0) {
// this is a "real" marker that must be written into the logfiles
// No document or edge marker, just append it to the WAL:
// just append it to the WAL:
arangodb::wal::SlotInfoCopy slotInfo =
arangodb::wal::LogfileManager::instance()->allocateAndWrite(
trx->_vocbase->_id, document->_info.id(),
operation.marker->mem(), operation.marker->size(), false);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
@ -1114,8 +1114,7 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
if (operation.type == TRI_VOC_DOCUMENT_OPERATION_INSERT ||
operation.type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
// adjust the data position in the header
operation.header->setDataPtr(
position); // PROTECTED by ongoing trx from operation
operation.header->setDataPtr(position);
}
TRI_IF_FAILURE("TransactionOperationAfterAdjust") { return TRI_ERROR_DEBUG; }

View File

@ -40,6 +40,7 @@
#include "Basics/threads.h"
#include "Basics/Exceptions.h"
#include "Basics/FileUtils.h"
#include "Basics/WriteLocker.h"
#include "Utils/CollectionKeysRepository.h"
#include "Utils/CursorRepository.h"
#include "Utils/transactions.h"

View File

@ -38,7 +38,6 @@
#include "VocBase/DatafileStatistics.h"
#include "VocBase/document-collection.h"
#include "VocBase/server.h"
#include "VocBase/VocShaper.h"
#include "Wal/Logfile.h"
#include "Wal/LogfileManager.h"
@ -211,19 +210,6 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data,
break;
}
case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION:
case TRI_WAL_MARKER_COMMIT_REMOTE_TRANSACTION: {
break;
}
case TRI_WAL_MARKER_ABORT_REMOTE_TRANSACTION: {
transaction_remote_abort_marker_t const* m =
reinterpret_cast<transaction_remote_abort_marker_t const*>(marker);
// note which abort markers we found
state->handledTransactions.emplace(m->_transactionId);
break;
}
case TRI_WAL_MARKER_CREATE_COLLECTION: {
VPackSlice const slice(p + sizeof(TRI_df_marker_t));
TRI_voc_tid_t cid = NumericValue<TRI_voc_cid_t>(slice, "cid");
@ -246,7 +232,7 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data,
case TRI_WAL_MARKER_CREATE_DATABASE: {
VPackSlice const slice(p + sizeof(TRI_df_marker_t));
TRI_voc_tick_t id = NumericValue<TRI_voc_tick_t>(slice, "id");
TRI_voc_tick_t id = NumericValue<TRI_voc_tick_t>(slice, "database");
// note that the database is now considered not dropped
state->droppedDatabases.erase(id);
break;
@ -254,7 +240,7 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data,
case TRI_WAL_MARKER_DROP_DATABASE: {
VPackSlice const slice(p + sizeof(TRI_df_marker_t));
TRI_voc_tick_t id = NumericValue<TRI_voc_tick_t>(slice, "id");
TRI_voc_tick_t id = NumericValue<TRI_voc_tick_t>(slice, "database");
// note that the database was dropped and doesn't need to be collected
state->droppedDatabases.emplace(id);
@ -273,6 +259,20 @@ static bool ScanMarker(TRI_df_marker_t const* marker, void* data,
}
break;
}
case TRI_WAL_MARKER_BEGIN_REMOTE_TRANSACTION:
case TRI_WAL_MARKER_COMMIT_REMOTE_TRANSACTION: {
break;
}
case TRI_WAL_MARKER_ABORT_REMOTE_TRANSACTION: {
transaction_remote_abort_marker_t const* m =
reinterpret_cast<transaction_remote_abort_marker_t const*>(marker);
// note which abort markers we found
state->handledTransactions.emplace(m->_transactionId);
break;
}
default: {
// do nothing intentionally
@ -1301,7 +1301,7 @@ void CollectorThread::initMarker(TRI_df_marker_t* marker,
TRI_ASSERT(marker != nullptr);
marker->_size = size;
marker->_type = (TRI_df_marker_type_t)type;
marker->_type = static_cast<TRI_df_marker_type_t>(type);
marker->_crc = 0;
marker->_tick = 0;
}

View File

@ -37,7 +37,7 @@
#include "Wal/Logfile.h"
struct TRI_datafile_t;
struct TRI_df_marker_s;
struct TRI_df_marker_t;
struct TRI_document_collection_t;
struct TRI_server_t;
@ -182,14 +182,14 @@ class CollectorThread : public Thread {
/// @brief typedef key => document marker
//////////////////////////////////////////////////////////////////////////////
typedef std::unordered_map<std::string, struct TRI_df_marker_s const*>
typedef std::unordered_map<std::string, struct TRI_df_marker_t const*>
DocumentOperationsType;
//////////////////////////////////////////////////////////////////////////////
/// @brief typedef for structural operation (attributes, shapes) markers
//////////////////////////////////////////////////////////////////////////////
typedef std::vector<struct TRI_df_marker_s const*> OperationsType;
typedef std::vector<struct TRI_df_marker_t const*> OperationsType;
public:
void beginShutdown() override final;
@ -310,7 +310,7 @@ class CollectorThread : public Thread {
/// @brief initialize a marker
//////////////////////////////////////////////////////////////////////////////
void initMarker(struct TRI_df_marker_s*, TRI_df_marker_type_e,
void initMarker(struct TRI_df_marker_t*, TRI_df_marker_type_e,
TRI_voc_size_t);
//////////////////////////////////////////////////////////////////////////////

View File

@ -178,6 +178,21 @@ TRI_df_header_marker_t Logfile::getHeaderMarker() const {
return header;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a prologue marker
////////////////////////////////////////////////////////////////////////////////
TRI_df_prologue_marker_t Logfile::getPrologueMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const {
TRI_df_prologue_marker_t header;
size_t const size = sizeof(TRI_df_prologue_marker_t);
TRI_InitMarkerDatafile((char*)&header, TRI_DF_MARKER_PROLOGUE, size);
header._databaseId = databaseId;
header._collectionId = collectionId;
return header;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a footer marker
////////////////////////////////////////////////////////////////////////////////

View File

@ -25,23 +25,11 @@
#define ARANGOD_WAL_LOGFILE_H 1
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Basics/Logger.h"
#include "VocBase/datafile.h"
#include "VocBase/shaped-json.h"
#include "VocBase/voc-types.h"
#include "Wal/Marker.h"
////////////////////////////////////////////////////////////////////////////////
/// @brief number of cache buckets
/// TODO: convert to a C++11 constexpr once Visual Studio supports it
/// (Visual Studio 2015?)
////////////////////////////////////////////////////////////////////////////////
#define LOGFILE_LEGEND_CACHE_BUCKETS 8
namespace arangodb {
namespace wal {
@ -309,6 +297,12 @@ class Logfile {
//////////////////////////////////////////////////////////////////////////////
TRI_df_header_marker_t getHeaderMarker() const;
//////////////////////////////////////////////////////////////////////////////
/// @brief create a prologue marker
//////////////////////////////////////////////////////////////////////////////
TRI_df_prologue_marker_t getPrologueMarker(TRI_voc_tick_t, TRI_voc_cid_t) const;
//////////////////////////////////////////////////////////////////////////////
/// @brief create a footer marker
@ -345,43 +339,6 @@ class Logfile {
--_users;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief lookup a legend in the cache
//////////////////////////////////////////////////////////////////////////////
void* lookupLegend(TRI_voc_cid_t cid, TRI_shape_sid_t sid) {
CidSid cs(cid, sid);
size_t const i = cs.hash() % LOGFILE_LEGEND_CACHE_BUCKETS;
READ_LOCKER(readLocker, _legendCacheLock[i]);
auto it = _legendCache[i].find(cs);
if (it != _legendCache[i].end()) {
return it->second;
}
return nullptr;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief cache a legend
//////////////////////////////////////////////////////////////////////////////
void cacheLegend(TRI_voc_cid_t cid, TRI_shape_sid_t sid, void* l) {
CidSid cs(cid, sid);
size_t const i = cs.hash() % LOGFILE_LEGEND_CACHE_BUCKETS;
WRITE_LOCKER(writeLocker, _legendCacheLock[i]);
auto it = _legendCache[i].find(cs);
if (it == _legendCache[i].end()) {
_legendCache[i].emplace(cs, l);
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief the logfile id
//////////////////////////////////////////////////////////////////////////////
@ -411,51 +368,6 @@ class Logfile {
//////////////////////////////////////////////////////////////////////////////
std::atomic<int64_t> _collectQueueSize;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief legend cache, key type with hash function
//////////////////////////////////////////////////////////////////////////////
struct CidSid {
TRI_voc_cid_t cid;
TRI_shape_sid_t sid;
CidSid(TRI_voc_cid_t c, TRI_shape_sid_t s) : cid(c), sid(s) {}
size_t hash() const {
CidSidHash h;
return h(this);
}
bool operator==(CidSid const& a) const {
return this->cid == a.cid && this->sid == a.sid;
}
};
struct CidSidHash {
size_t operator()(CidSid const& cs) const {
return std::hash<TRI_voc_cid_t>()(cs.cid) ^
std::hash<TRI_shape_sid_t>()(cs.sid);
}
size_t operator()(CidSid const* cs) const {
return std::hash<TRI_voc_cid_t>()(cs->cid) ^
std::hash<TRI_shape_sid_t>()(cs->sid);
}
};
//////////////////////////////////////////////////////////////////////////////
/// @brief legend cache, split into buckets
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<CidSid, void*, CidSidHash>
_legendCache[LOGFILE_LEGEND_CACHE_BUCKETS];
//////////////////////////////////////////////////////////////////////////////
/// @brief legend cache, locks, split into buckets
//////////////////////////////////////////////////////////////////////////////
basics::ReadWriteLock _legendCacheLock[LOGFILE_LEGEND_CACHE_BUCKETS];
};
}
}

View File

@ -116,7 +116,6 @@ LogfileManager::LogfileManager(TRI_server_t* server, std::string* databasePath)
_allowOversizeEntries(true),
_ignoreLogfileErrors(false),
_ignoreRecoveryErrors(false),
_suppressShapeInformation(false),
_allowWrites(false), // start in read-only mode
_hasFoundLastTick(false),
_inRecovery(true),
@ -215,9 +214,6 @@ void LogfileManager::setupOptions(
"wal.reserve-logfiles", &_reserveLogfiles,
"maximum number of reserve logfiles to maintain")(
"wal.slots", &_numberOfSlots, "number of logfile slots to use")(
"wal.suppress-shape-information", &_suppressShapeInformation,
"do not write shape information for markers (saves a lot of disk space, "
"but effectively disables using the write-ahead log for replication)")(
"wal.sync-interval", &_syncInterval,
"interval for automatic, non-requested disk syncs (in milliseconds)")(
"wal.throttle-when-pending", &_throttleWhenPending,
@ -238,7 +234,7 @@ bool LogfileManager::prepare() {
if (_directory.empty()) {
// use global configuration variable
_directory = (*_databasePath);
_directory = *_databasePath;
if (!basics::FileUtils::isDirectory(_directory)) {
std::string systemErrorStr;
@ -751,21 +747,13 @@ SlotInfo LogfileManager::allocate(uint32_t size) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief allocate space in a logfile for later writing, version for legends
///
/// See explanations about legends in the corresponding allocateAndWrite
/// convenience function.
/// @brief allocate space in a logfile for later writing
////////////////////////////////////////////////////////////////////////////////
SlotInfo LogfileManager::allocate(uint32_t size, TRI_voc_cid_t cid,
TRI_shape_sid_t sid, uint32_t legendOffset,
void*& oldLegend) {
SlotInfo LogfileManager::allocate(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
uint32_t size) {
if (!_allowWrites) {
// no writes allowed
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(false);
#endif
// no writes allowed
return SlotInfo(TRI_ERROR_ARANGO_READ_ONLY);
}
@ -779,25 +767,19 @@ SlotInfo LogfileManager::allocate(uint32_t size, TRI_voc_cid_t cid,
return SlotInfo(TRI_ERROR_ARANGO_DOCUMENT_TOO_LARGE);
}
return _slots->nextUnused(size, cid, sid, legendOffset, oldLegend);
return _slots->nextUnused(databaseId, collectionId, size);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief finalize a log entry
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::finalize(SlotInfo& slotInfo, bool waitForSync) {
_slots->returnUsed(slotInfo, waitForSync);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write data into the logfile
/// @brief write data into the logfile, using database id and collection id
/// this is a convenience function that combines allocate, memcpy and finalize
////////////////////////////////////////////////////////////////////////////////
SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size,
SlotInfoCopy LogfileManager::allocateAndWrite(TRI_voc_tick_t databaseId,
TRI_voc_cid_t collectionId,
void* src, uint32_t size,
bool waitForSync) {
SlotInfo slotInfo = allocate(size);
SlotInfo slotInfo = allocate(databaseId, collectionId, size);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
return SlotInfoCopy(slotInfo.errorCode);
@ -827,11 +809,51 @@ SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size,
/// this is a convenience function that combines allocate, memcpy and finalize
////////////////////////////////////////////////////////////////////////////////
SlotInfoCopy LogfileManager::allocateAndWrite(void* src, uint32_t size,
bool waitForSync) {
SlotInfo slotInfo = allocate(size);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
return SlotInfoCopy(slotInfo.errorCode);
}
TRI_ASSERT(slotInfo.slot != nullptr);
try {
slotInfo.slot->fill(src, size);
// we must copy the slotinfo because finalize() will set its internals to 0
// again
SlotInfoCopy copy(slotInfo.slot);
finalize(slotInfo, waitForSync);
return copy;
} catch (...) {
// if we don't return the slot we'll run into serious problems later
finalize(slotInfo, false);
return SlotInfoCopy(TRI_ERROR_INTERNAL);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write data into the logfile
/// this is a convenience function that combines allocate, memcpy and finalize
////////////////////////////////////////////////////////////////////////////////
SlotInfoCopy LogfileManager::allocateAndWrite(Marker const& marker,
bool waitForSync) {
return allocateAndWrite(marker.mem(), marker.size(), waitForSync);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief finalize a log entry
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::finalize(SlotInfo& slotInfo, bool waitForSync) {
_slots->returnUsed(slotInfo, waitForSync);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief wait for the collector queue to get cleared for the given collection
////////////////////////////////////////////////////////////////////////////////

View File

@ -356,19 +356,25 @@ class LogfileManager : public rest::ApplicationFeature {
//////////////////////////////////////////////////////////////////////////////
SlotInfo allocate(uint32_t);
//////////////////////////////////////////////////////////////////////////////
/// @brief reserve space in a logfile, version for legends
/// @brief reserve space in a logfile
//////////////////////////////////////////////////////////////////////////////
SlotInfo allocate(uint32_t, TRI_voc_cid_t cid, TRI_shape_sid_t sid,
uint32_t legendOffset, void*& oldLegend);
SlotInfo allocate(TRI_voc_tick_t, TRI_voc_cid_t, uint32_t);
//////////////////////////////////////////////////////////////////////////////
/// @brief finalize a log entry
//////////////////////////////////////////////////////////////////////////////
void finalize(SlotInfo&, bool);
//////////////////////////////////////////////////////////////////////////////
/// @brief write data into the logfile, using database id and collection id
/// this is a convenience function that combines allocate, memcpy and finalize
//////////////////////////////////////////////////////////////////////////////
SlotInfoCopy allocateAndWrite(TRI_voc_tick_t, TRI_voc_cid_t, void*, uint32_t, bool);
//////////////////////////////////////////////////////////////////////////////
/// @brief write data into the logfile

View File

@ -46,6 +46,10 @@ using namespace arangodb::wal;
template <typename T>
static inline T NumericValue(VPackSlice const& slice, char const* attribute) {
if (!slice.isObject()) {
LOG(ERR) << "invalid value type when looking for attribute '" << attribute << "': expecting object";
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
VPackSlice v = slice.get(attribute);
if (v.isString()) {
return static_cast<T>(std::stoull(v.copyString()));
@ -53,7 +57,9 @@ static inline T NumericValue(VPackSlice const& slice, char const* attribute) {
if (v.isNumber()) {
return v.getNumber<T>();
}
return 0;
LOG(ERR) << "invalid value for attribute '" << attribute << "'";
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -50,7 +50,9 @@ Slots::Slots(LogfileManager* logfileManager, size_t numberOfSlots,
_lastAssignedTick(0),
_lastCommittedTick(0),
_lastCommittedDataTick(0),
_numEvents(0) {
_numEvents(0),
_lastDatabaseId(0),
_lastCollectionId(0) {
_slots = new Slot[numberOfSlots];
}
@ -117,131 +119,22 @@ Slot::TickType Slots::lastCommittedTick() {
////////////////////////////////////////////////////////////////////////////////
SlotInfo Slots::nextUnused(uint32_t size) {
// we need to use the aligned size for writing
uint32_t alignedSize = TRI_DF_ALIGN_BLOCK(size);
int iterations = 0;
bool hasWaited = false;
TRI_ASSERT(size > 0);
while (++iterations < 1000) {
{
MUTEX_LOCKER(mutexLocker, _lock);
Slot* slot = &_slots[_handoutIndex];
TRI_ASSERT(slot != nullptr);
if (slot->isUnused()) {
if (hasWaited) {
CONDITION_LOCKER(guard, _condition);
TRI_ASSERT(_waiting > 0);
--_waiting;
}
// cycle until we have a valid logfile
while (_logfile == nullptr ||
_logfile->freeSize() < static_cast<uint64_t>(alignedSize)) {
if (_logfile != nullptr) {
// seal existing logfile by creating a footer marker
int res = writeFooter(slot);
if (res != TRI_ERROR_NO_ERROR) {
return SlotInfo(res);
}
// advance to next slot
slot = &_slots[_handoutIndex];
_logfileManager->setLogfileSealRequested(_logfile);
_logfile = nullptr;
}
TRI_IF_FAILURE("LogfileManagerGetWriteableLogfile") {
return SlotInfo(TRI_ERROR_ARANGO_NO_JOURNAL);
}
// fetch the next free logfile (this may create a new one)
Logfile::StatusType status;
int res = newLogfile(alignedSize, status);
if (res != TRI_ERROR_NO_ERROR) {
if (res != TRI_ERROR_ARANGO_NO_JOURNAL) {
return SlotInfo(res);
}
usleep(10 * 1000);
// try again in next iteration
} else {
TRI_ASSERT(_logfile != nullptr);
if (status == Logfile::StatusType::EMPTY) {
// initialize the empty logfile by writing a header marker
int res = writeHeader(slot);
if (res != TRI_ERROR_NO_ERROR) {
return SlotInfo(res);
}
// advance to next slot
slot = &_slots[_handoutIndex];
_logfileManager->setLogfileOpen(_logfile);
} else {
TRI_ASSERT(status == Logfile::StatusType::OPEN);
}
}
}
// if we get here, we got a free slot for the actual data...
char* mem = _logfile->reserve(alignedSize);
if (mem == nullptr) {
return SlotInfo(TRI_ERROR_INTERNAL);
}
// only in this case we return a valid slot
slot->setUsed(static_cast<void*>(mem), size, _logfile->id(), handout());
return SlotInfo(slot);
}
}
// if we get here, all slots are busy
CONDITION_LOCKER(guard, _condition);
if (!hasWaited) {
++_waiting;
hasWaited = true;
}
bool mustWait;
{
MUTEX_LOCKER(mutexLocker, _lock);
mustWait = (_freeSlots == 0);
}
if (mustWait) {
guard.wait(10 * 1000);
}
}
return SlotInfo(TRI_ERROR_ARANGO_NO_JOURNAL);
return nextUnused(0, 0, size);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the next unused slot, version for legends
///
/// See explanations in arangod/Wal/LogfileManager.cpp in the
/// corresponding allocateAndWrite method.
/// @brief return the next unused slot
////////////////////////////////////////////////////////////////////////////////
SlotInfo Slots::nextUnused(uint32_t size, TRI_voc_cid_t cid,
TRI_shape_sid_t sid, uint32_t legendOffset,
void*& oldLegend) {
// legendOffset 0 means no legend included
SlotInfo Slots::nextUnused(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
uint32_t size) {
static size_t const PrologueSize = sizeof(TRI_df_prologue_marker_t);
// we need to use the aligned size for writing
uint32_t alignedSize = TRI_DF_ALIGN_BLOCK(size);
int iterations = 0;
bool hasWaited = false;
bool mustWritePrologue = false;
TRI_ASSERT(size > 0);
@ -259,6 +152,20 @@ SlotInfo Slots::nextUnused(uint32_t size, TRI_voc_cid_t cid,
--_waiting;
}
if (databaseId == 0 && collectionId == 0) {
_lastDatabaseId = 0;
_lastCollectionId = 0;
}
else if (!mustWritePrologue &&
databaseId > 0 &&
collectionId > 0 &&
_lastDatabaseId != databaseId &&
_lastCollectionId != collectionId) {
// write a prologue
alignedSize += PrologueSize;
mustWritePrologue = true;
}
// cycle until we have a valid logfile
while (_logfile == nullptr ||
_logfile->freeSize() < static_cast<uint64_t>(alignedSize)) {
@ -314,25 +221,31 @@ SlotInfo Slots::nextUnused(uint32_t size, TRI_voc_cid_t cid,
// if we get here, we got a free slot for the actual data...
// Now sort out the legend business:
if (legendOffset == 0) {
void* legend = _logfile->lookupLegend(cid, sid);
if (nullptr == legend) {
// Bad, we would need a legend for this marker
return SlotInfo(TRI_ERROR_LEGEND_NOT_IN_WAL_FILE);
}
oldLegend = legend;
}
char* mem = _logfile->reserve(alignedSize);
if (mem == nullptr) {
return SlotInfo(TRI_ERROR_INTERNAL);
}
if (legendOffset != 0) {
void* legend = static_cast<void*>(mem + legendOffset);
_logfile->cacheLegend(cid, sid, legend);
if (mustWritePrologue) {
// write prologue...
// hand out the prologue slot and directly fill it
int res = writePrologue(slot, databaseId, collectionId);
if (res != TRI_ERROR_NO_ERROR) {
return SlotInfo(res);
}
// now return the slot
mem += PrologueSize; // advance memory pointer
// use following slot for the actual data
slot = &_slots[_handoutIndex];
// note database and collection id for next time
_lastDatabaseId = databaseId;
_lastCollectionId = collectionId;
}
// only in this case we return a valid slot
@ -673,7 +586,7 @@ int Slots::closeLogfile(Slot::TickType& lastCommittedTick, bool& worked) {
////////////////////////////////////////////////////////////////////////////////
int Slots::writeHeader(Slot* slot) {
TRI_df_header_marker_t&& header = _logfile->getHeaderMarker();
TRI_df_header_marker_t header = _logfile->getHeaderMarker();
size_t const size = header.base._size;
TRI_df_marker_t* mem =
@ -683,7 +596,31 @@ int Slots::writeHeader(Slot* slot) {
slot->setUsed(static_cast<void*>(mem), static_cast<uint32_t>(size),
_logfile->id(), handout());
slot->fill(&header.base, size);
slot->setReturned(false); // sync
slot->setReturned(false); // no sync
// reset values for next write
_lastDatabaseId = 0;
_lastCollectionId = 0;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write a prologue marker
////////////////////////////////////////////////////////////////////////////////
int Slots::writePrologue(Slot* slot, TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) {
TRI_df_header_marker_t header = _logfile->getHeaderMarker();
size_t const size = header.base._size;
TRI_df_marker_t* mem =
reinterpret_cast<TRI_df_marker_t*>(_logfile->reserve(size));
TRI_ASSERT(mem != nullptr);
slot->setUsed(static_cast<void*>(mem), static_cast<uint32_t>(size),
_logfile->id(), handout());
slot->fill(&header.base, size);
slot->setReturned(false); // no sync
return TRI_ERROR_NO_ERROR;
}
@ -695,7 +632,7 @@ int Slots::writeHeader(Slot* slot) {
int Slots::writeFooter(Slot* slot) {
TRI_ASSERT(_logfile != nullptr);
TRI_df_footer_marker_t&& footer = _logfile->getFooterMarker();
TRI_df_footer_marker_t footer = _logfile->getFooterMarker();
size_t const size = footer.base._size;
TRI_df_marker_t* mem =
@ -706,6 +643,10 @@ int Slots::writeFooter(Slot* slot) {
_logfile->id(), handout());
slot->fill(&footer.base, size);
slot->setReturned(true); // sync
// reset values for next write
_lastDatabaseId = 0;
_lastCollectionId = 0;
return TRI_ERROR_NO_ERROR;
}

View File

@ -114,13 +114,8 @@ class Slots {
//////////////////////////////////////////////////////////////////////////////
SlotInfo nextUnused(uint32_t);
//////////////////////////////////////////////////////////////////////////////
/// @brief return the next unused slot, version for legends
//////////////////////////////////////////////////////////////////////////////
SlotInfo nextUnused(uint32_t size, TRI_voc_cid_t cid, TRI_shape_sid_t sid,
uint32_t legendIncluded, void*& oldLegend);
SlotInfo nextUnused(TRI_voc_tick_t, TRI_voc_cid_t, uint32_t);
//////////////////////////////////////////////////////////////////////////////
/// @brief return a used slot, allowing its synchronization
@ -165,6 +160,12 @@ class Slots {
//////////////////////////////////////////////////////////////////////////////
int writeHeader(Slot*);
//////////////////////////////////////////////////////////////////////////////
/// @brief writes a prologue for a document/remove marker
//////////////////////////////////////////////////////////////////////////////
int writePrologue(Slot*, TRI_voc_tick_t, TRI_voc_cid_t);
//////////////////////////////////////////////////////////////////////////////
/// @brief write a footer marker
@ -275,6 +276,18 @@ class Slots {
//////////////////////////////////////////////////////////////////////////////
uint64_t _numEvents;
//////////////////////////////////////////////////////////////////////////////
/// @brief last written database id (in prologue marker)
//////////////////////////////////////////////////////////////////////////////
TRI_voc_tick_t _lastDatabaseId;
//////////////////////////////////////////////////////////////////////////////
/// @brief last written collection id (in prologue marker)
//////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t _lastCollectionId;
};
}
}