1
0
Fork 0

updated logfile manager

This commit is contained in:
Jan Steemann 2014-03-25 18:21:31 +01:00
parent 213c61d104
commit 92f87c1421
24 changed files with 618 additions and 671 deletions

View File

@ -149,7 +149,6 @@ add_executable(
VocBase/vocbase-defaults.c
Wal/AllocatorThread.cpp
Wal/CollectorThread.cpp
Wal/Configuration.cpp
Wal/LogfileManager.cpp
Wal/Logfile.cpp
Wal/Slot.cpp

View File

@ -115,7 +115,6 @@ bin_arangod_SOURCES = \
arangod/VocBase/vocbase-defaults.c \
arangod/Wal/AllocatorThread.cpp \
arangod/Wal/CollectorThread.cpp \
arangod/Wal/Configuration.cpp \
arangod/Wal/LogfileManager.cpp \
arangod/Wal/Logfile.cpp \
arangod/Wal/Slot.cpp \

View File

@ -80,7 +80,7 @@
#include "V8Server/ApplicationV8.h"
#include "VocBase/auth.h"
#include "VocBase/server.h"
#include "Wal/Configuration.h"
#include "Wal/LogfileManager.h"
#ifdef TRI_ENABLE_CLUSTER
#include "Cluster/ApplicationCluster.h"
@ -290,7 +290,7 @@ ArangoServer::ArangoServer (int argc, char** argv)
: _argc(argc),
_argv(argv),
_tempPath(),
_walConfiguration(0),
_logfileManager(0),
_applicationScheduler(0),
_applicationDispatcher(0),
_applicationEndpointServer(0),
@ -386,9 +386,10 @@ void ArangoServer::buildApplicationServer () {
// arangod allows defining a user-specific configuration file. arangosh and the other binaries don't
_applicationServer->setUserConfigFile(".arango" + string(1, TRI_DIR_SEPARATOR_CHAR) + string(conf));
/*
_walConfiguration = new wal::Configuration();
_applicationServer->addFeature(_walConfiguration);
/*
_logfileManager = new wal::LogfileManager();
_applicationServer->addFeature(_logfileManager);
*/
// .............................................................................
// dispatcher

View File

@ -57,7 +57,7 @@ namespace triagens {
}
namespace wal {
class Configuration;
class LogfileManager;
}
namespace admin {
@ -191,10 +191,10 @@ namespace triagens {
std::string _tempPath;
////////////////////////////////////////////////////////////////////////////////
/// @brief write-ahead log configuration
/// @brief write-ahead log manager
////////////////////////////////////////////////////////////////////////////////
wal::Configuration* _walConfiguration;
wal::LogfileManager* _logfileManager;
////////////////////////////////////////////////////////////////////////////////
/// @brief application scheduler

View File

@ -820,11 +820,9 @@ static TRI_datafile_t* OpenDatafile (char const* filename,
TRI_datafile_t* TRI_CreateDatafile (char const* filename,
TRI_voc_fid_t fid,
TRI_voc_size_t maximalSize) {
TRI_voc_size_t maximalSize,
bool withInitialMarkers) {
TRI_datafile_t* datafile;
TRI_df_marker_t* position;
TRI_df_header_marker_t header;
int result;
assert(PageSize >= 256);
@ -860,30 +858,19 @@ TRI_datafile_t* TRI_CreateDatafile (char const* filename,
datafile->_state = TRI_DF_STATE_WRITE;
// create the header
TRI_InitMarker((char*) &header, TRI_DF_MARKER_HEADER, sizeof(TRI_df_header_marker_t));
header.base._tick = (TRI_voc_tick_t) fid;
if (withInitialMarkers) {
int res = TRI_WriteInitialHeaderMarkerDatafile(datafile, fid, maximalSize);
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("cannot write header to datafile '%s'", datafile->getName(datafile));
TRI_UNMMFile(datafile->_data, datafile->_maximalSize, datafile->_fd, &(datafile->_mmHandle));
header._version = TRI_DF_VERSION;
header._maximalSize = maximalSize;
header._fid = fid;
datafile->close(datafile);
datafile->destroy(datafile);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, datafile);
// reserve space and write header to file
result = TRI_ReserveElementDatafile(datafile, header.base._size, &position, 0);
if (result == TRI_ERROR_NO_ERROR) {
result = TRI_WriteCrcElementDatafile(datafile, position, &header.base, header.base._size, false);
}
if (result != TRI_ERROR_NO_ERROR) {
LOG_ERROR("cannot write header to datafile '%s'", datafile->getName(datafile));
TRI_UNMMFile(datafile->_data, datafile->_maximalSize, datafile->_fd, &(datafile->_mmHandle));
datafile->close(datafile);
datafile->destroy(datafile);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, datafile);
return NULL;
return NULL;
}
}
LOG_DEBUG("created datafile '%s' of size %u and page-size %u",
@ -1059,6 +1046,35 @@ void TRI_FreeDatafile (TRI_datafile_t* datafile) {
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief create the initial datafile header marker
////////////////////////////////////////////////////////////////////////////////
int TRI_WriteInitialHeaderMarkerDatafile (TRI_datafile_t* datafile,
TRI_voc_fid_t fid,
TRI_voc_size_t maximalSize) {
TRI_df_marker_t* position;
TRI_df_header_marker_t header;
int res;
// create the header
TRI_InitMarker((char*) &header, TRI_DF_MARKER_HEADER, sizeof(TRI_df_header_marker_t));
header.base._tick = (TRI_voc_tick_t) fid;
header._version = TRI_DF_VERSION;
header._maximalSize = maximalSize;
header._fid = fid;
// reserve space and write header to file
res = TRI_ReserveElementDatafile(datafile, header.base._size, &position, 0);
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_WriteCrcElementDatafile(datafile, position, &header.base, header.base._size, false);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks whether a marker is valid
////////////////////////////////////////////////////////////////////////////////

View File

@ -446,7 +446,8 @@ TRI_df_skip_marker_t;
TRI_datafile_t* TRI_CreateDatafile (char const*,
TRI_voc_fid_t fid,
TRI_voc_size_t);
TRI_voc_size_t,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new anonymous datafile
@ -503,6 +504,14 @@ void TRI_FreeDatafile (TRI_datafile_t*);
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief create the initial datafile header marker
////////////////////////////////////////////////////////////////////////////////
int TRI_WriteInitialHeaderMarkerDatafile (TRI_datafile_t*,
TRI_voc_fid_t,
TRI_voc_size_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief aligns in datafile blocks
////////////////////////////////////////////////////////////////////////////////

View File

@ -192,7 +192,7 @@ static TRI_datafile_t* CreateCompactor (TRI_primary_collection_t* primary,
if (collection->_info._isVolatile) {
// in-memory collection
journal = TRI_CreateDatafile(NULL, fid, maximalSize);
journal = TRI_CreateDatafile(NULL, fid, maximalSize, true);
}
else {
char* jname;
@ -211,7 +211,7 @@ static TRI_datafile_t* CreateCompactor (TRI_primary_collection_t* primary,
TRI_UnlinkFile(filename);
}
journal = TRI_CreateDatafile(filename, fid, maximalSize);
journal = TRI_CreateDatafile(filename, fid, maximalSize, true);
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
}
@ -284,7 +284,7 @@ static TRI_datafile_t* CreateJournal (TRI_primary_collection_t* primary,
if (collection->_info._isVolatile) {
// in-memory collection
journal = TRI_CreateDatafile(NULL, fid, maximalSize);
journal = TRI_CreateDatafile(NULL, fid, maximalSize, true);
}
else {
char* jname;
@ -299,7 +299,7 @@ static TRI_datafile_t* CreateJournal (TRI_primary_collection_t* primary,
TRI_FreeString(TRI_CORE_MEM_ZONE, number);
TRI_FreeString(TRI_CORE_MEM_ZONE, jname);
journal = TRI_CreateDatafile(filename, fid, maximalSize);
journal = TRI_CreateDatafile(filename, fid, maximalSize, true);
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
}

View File

@ -32,6 +32,16 @@
using namespace triagens::wal;
// -----------------------------------------------------------------------------
// --SECTION-- class AllocatorThread
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the allocator thread when idle
////////////////////////////////////////////////////////////////////////////////
const uint64_t AllocatorThread::Interval = 1000000;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -84,21 +94,19 @@ void AllocatorThread::stop () {
void AllocatorThread::signalLogfileCreation () {
CONDITION_LOCKER(guard, _condition);
if (_createRequests == 0) {
++_createRequests;
guard.signal();
LOG_INFO("got logfile creation signal");
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new logfile
/// @brief creates a new reserve logfile
////////////////////////////////////////////////////////////////////////////////
bool AllocatorThread::createLogfile () {
LOG_INFO("creating new logfile");
int res = _logfileManager->allocateDatafile();
bool AllocatorThread::createReserveLogfile () {
int res = _logfileManager->createReserveLogfile();
return (res == TRI_ERROR_NO_ERROR);
}
@ -120,26 +128,27 @@ void AllocatorThread::run () {
createRequests = _createRequests;
}
if (createRequests == 0 &&
! _logfileManager->hasReserveLogfiles()) {
if (! createLogfile()) {
LOG_ERROR("unable to create new reserve wal logfile");
if (createRequests == 0 && ! _logfileManager->hasReserveLogfiles()) {
if (createReserveLogfile()) {
continue;
}
LOG_ERROR("unable to create new wal reserve logfile");
}
else if (createRequests > 0) {
if (createLogfile()) {
if (createReserveLogfile()) {
CONDITION_LOCKER(guard, _condition);
--_createRequests;
continue;
}
LOG_ERROR("unable to create new wal logfile");
LOG_ERROR("unable to create new wal reserve logfile");
}
{
CONDITION_LOCKER(guard, _condition);
guard.wait(1000000);
guard.wait(Interval);
}
}

View File

@ -88,10 +88,10 @@ namespace triagens {
void signalLogfileCreation ();
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a new logfile
/// @brief creates a new reserve logfile
////////////////////////////////////////////////////////////////////////////////
bool createLogfile ();
bool createReserveLogfile ();
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
@ -141,6 +141,12 @@ namespace triagens {
volatile sig_atomic_t _stop;
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the allocator thread when idle
////////////////////////////////////////////////////////////////////////////////
static const uint64_t Interval;
};
}

View File

@ -33,6 +33,16 @@
using namespace triagens::wal;
// -----------------------------------------------------------------------------
// --SECTION-- class CollectorThread
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the collector thread when idle
////////////////////////////////////////////////////////////////////////////////
const uint64_t CollectorThread::Interval = 1000000;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -89,13 +99,16 @@ void CollectorThread::stop () {
void CollectorThread::run () {
while (_stop == 0) {
// collect a logfile if any qualifies
this->collect();
bool worked = this->collectLogfile();
// delete a logfile if any qualifies
this->remove();
worked |= this->removeLogfile();
CONDITION_LOCKER(guard, _condition);
guard.wait(1000000);
if (! worked) {
// sleep only if there was nothing to do
guard.wait(Interval);
}
}
_stop = 2;
@ -109,11 +122,11 @@ void CollectorThread::run () {
/// @brief perform collection of a logfile (if any)
////////////////////////////////////////////////////////////////////////////////
void CollectorThread::collect () {
bool CollectorThread::collectLogfile () {
Logfile* logfile = _logfileManager->getCollectableLogfile();
if (logfile == nullptr) {
return;
return false;
}
_logfileManager->setCollectionRequested(logfile);
@ -122,20 +135,22 @@ void CollectorThread::collect () {
LOG_INFO("collecting logfile %llu", (unsigned long long) logfile->id());
_logfileManager->setCollectionDone(logfile);
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief perform removal of a logfile (if any)
////////////////////////////////////////////////////////////////////////////////
void CollectorThread::remove () {
bool CollectorThread::removeLogfile () {
Logfile* logfile = _logfileManager->getRemovableLogfile();
if (logfile == nullptr) {
return;
return false;
}
_logfileManager->removeLogfile(logfile);
return true;
}
// Local Variables:

View File

@ -103,13 +103,13 @@ namespace triagens {
/// @brief perform collection of a logfile (if any)
////////////////////////////////////////////////////////////////////////////////
void collect ();
bool collectLogfile ();
////////////////////////////////////////////////////////////////////////////////
/// @brief perform removal of a logfile (if any)
////////////////////////////////////////////////////////////////////////////////
void remove ();
bool removeLogfile ();
// -----------------------------------------------------------------------------
// --SECTION-- private variables
@ -135,6 +135,12 @@ namespace triagens {
volatile sig_atomic_t _stop;
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the collector thread when idle
////////////////////////////////////////////////////////////////////////////////
static const uint64_t Interval;
};
}

View File

@ -1,163 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Write-ahead log configuration
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Configuration.h"
#include "BasicsC/logging.h"
#include "Wal/LogfileManager.h"
using namespace triagens::wal;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief create the configuration
////////////////////////////////////////////////////////////////////////////////
Configuration::Configuration ()
: ApplicationFeature("wal"),
_logfileManager(nullptr),
_filesize(32 * 1024 * 1024),
_numberOfLogfiles(4),
_reserveSize(16 * 1024 * 1024),
_directory() {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the configuration
////////////////////////////////////////////////////////////////////////////////
Configuration::~Configuration () {
if (_logfileManager != nullptr) {
delete _logfileManager;
}
}
// -----------------------------------------------------------------------------
// --SECTION-- ApplicationFeature methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void Configuration::setupOptions (std::map<std::string, triagens::basics::ProgramOptionsDescription>& options) {
options["Write-ahead log options:help-wal"]
("wal.filesize", &_filesize, "size of each logfile")
("wal.logfiles", &_numberOfLogfiles, "target number of logfiles")
("wal.reserve", &_reserveSize, "minimum space to reserve for new data")
("wal.directory", &_directory, "logfile directory")
;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool Configuration::prepare () {
if (_directory.empty()) {
LOG_FATAL_AND_EXIT("no directory specified for write-ahead logs. Please provide the --wal.directory option");
}
if (_directory[_directory.size() - 1] != TRI_DIR_SEPARATOR_CHAR) {
// append a trailing slash to directory name
_directory.push_back(TRI_DIR_SEPARATOR_CHAR);
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool Configuration::start () {
_logfileManager = new LogfileManager(this);
assert(_logfileManager != nullptr);
int res = _logfileManager->startup();
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not initialise wal components: %s", TRI_errno_string(res));
return false;
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool Configuration::open () {
for (size_t i = 0; i < 64 * 1024 * 1024; ++i) {
void* p = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, 64, true);
TRI_df_marker_t* marker = static_cast<TRI_df_marker_t*>(p);
marker->_type = TRI_DF_MARKER_HEADER;
marker->_size = 64;
marker->_crc = 0;
marker->_tick = 0;
if (i % 50000 == 0) {
LOG_INFO("now at: %d", (int) i);
}
memcpy(static_cast<char*>(p) + sizeof(TRI_df_marker_t), "the fox is brown\0", strlen("the fox is brown") + 1);
_logfileManager->allocateAndWrite(p, static_cast<uint32_t>(64));
TRI_Free(TRI_UNKNOWN_MEM_ZONE, p);
}
LOG_INFO("done");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void Configuration::close () {
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void Configuration::stop () {
if (_logfileManager != nullptr) {
_logfileManager->shutdown();
}
}
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -1,220 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Write-ahead log configuration
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_WAL_LOGFILE_CONFIGURATION_H
#define TRIAGENS_WAL_LOGFILE_CONFIGURATION_H 1
#include "Basics/Common.h"
#include "ApplicationServer/ApplicationFeature.h"
namespace triagens {
namespace wal {
class LogfileManager;
// -----------------------------------------------------------------------------
// --SECTION-- class Configuration
// -----------------------------------------------------------------------------
class Configuration : public rest::ApplicationFeature {
////////////////////////////////////////////////////////////////////////////////
/// @brief Configuration
////////////////////////////////////////////////////////////////////////////////
private:
Configuration (Configuration const&);
Configuration& operator= (Configuration const&);
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief create the configuration
////////////////////////////////////////////////////////////////////////////////
Configuration ();
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the configuration
////////////////////////////////////////////////////////////////////////////////
~Configuration ();
// -----------------------------------------------------------------------------
// --SECTION-- ApplicationFeature methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setupOptions (std::map<string, triagens::basics::ProgramOptionsDescription>&);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool prepare ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool open ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool start ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void close ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void stop ();
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief get the maximum size of an entry
////////////////////////////////////////////////////////////////////////////////
uint32_t maxEntrySize () const {
// TODO: account for datafile overhead
return _filesize;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the logfile size
////////////////////////////////////////////////////////////////////////////////
uint32_t filesize () const {
return _filesize;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the maximum number of logfiles
////////////////////////////////////////////////////////////////////////////////
uint32_t numberOfLogfiles () const {
return _numberOfLogfiles;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the maximum size for all logfiles
////////////////////////////////////////////////////////////////////////////////
uint64_t maximumSize () const {
return static_cast<uint64_t>(_filesize) * static_cast<uint64_t>(_numberOfLogfiles);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the reserve size for all logfiles
////////////////////////////////////////////////////////////////////////////////
uint64_t reserveSize () const {
return static_cast<uint64_t>(_reserveSize);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the logfile directory
////////////////////////////////////////////////////////////////////////////////
std::string directory () const {
return _directory;
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
private:
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the logfile manager
////////////////////////////////////////////////////////////////////////////////
LogfileManager* _logfileManager;
////////////////////////////////////////////////////////////////////////////////
/// @brief the size of each logfile
////////////////////////////////////////////////////////////////////////////////
uint32_t _filesize;
////////////////////////////////////////////////////////////////////////////////
/// @brief the target number of logfiles
////////////////////////////////////////////////////////////////////////////////
uint32_t _numberOfLogfiles;
////////////////////////////////////////////////////////////////////////////////
/// @brief the reserve free space
////////////////////////////////////////////////////////////////////////////////
uint32_t _reserveSize;
////////////////////////////////////////////////////////////////////////////////
/// @brief the logfile directory
////////////////////////////////////////////////////////////////////////////////
std::string _directory;
};
}
}
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -39,12 +39,10 @@ using namespace triagens::wal;
Logfile::Logfile (Logfile::IdType id,
TRI_datafile_t* df,
SealStatusType sealStatus,
CollectionStatusType collectionStatus)
StatusType status)
: _id(id),
_df(df),
_sealStatus(sealStatus),
_collectionStatus(collectionStatus) {
_status(status) {
}
////////////////////////////////////////////////////////////////////////////////
@ -69,7 +67,7 @@ Logfile::~Logfile () {
Logfile* Logfile::create (std::string const& filename,
Logfile::IdType id,
uint32_t size) {
TRI_datafile_t* df = TRI_CreateDatafile(filename.c_str(), id, static_cast<TRI_voc_size_t>(size));
TRI_datafile_t* df = TRI_CreateDatafile(filename.c_str(), id, static_cast<TRI_voc_size_t>(size), true);
if (df == nullptr) {
int res = TRI_errno();
@ -82,7 +80,7 @@ Logfile* Logfile::create (std::string const& filename,
}
}
Logfile* logfile = new Logfile(id, df, SealStatusType::UNSEALED, CollectionStatusType::UNCOLLECTED);
Logfile* logfile = new Logfile(id, df, StatusType::OPEN);
return logfile;
}
@ -105,12 +103,13 @@ Logfile* Logfile::open (std::string const& filename,
}
}
SealStatusType sealStatus = SealStatusType::UNSEALED;
StatusType status = StatusType::OPEN;
if (df->_isSealed) {
sealStatus = SealStatusType::SEALED;
status = StatusType::SEALED;
}
Logfile* logfile = new Logfile(id, df, sealStatus, CollectionStatusType::UNCOLLECTED);
Logfile* logfile = new Logfile(id, df, status);
return logfile;
}
@ -119,13 +118,16 @@ Logfile* Logfile::open (std::string const& filename,
////////////////////////////////////////////////////////////////////////////////
int Logfile::seal () {
int res = TRI_SealDatafile(_df);
// we're not really sealing the logfile as this isn't required
int res = TRI_ERROR_NO_ERROR; // TRI_SealDatafile(_df);
_df->_isSealed = true;
_df->_state = TRI_DF_STATE_READ;
if (res == TRI_ERROR_NO_ERROR) {
LOG_INFO("sealed logfile %llu", (unsigned long long) id());
assert(_sealStatus == SealStatusType::REQUESTED);
_sealStatus = SealStatusType::SEALED;
setStatus(StatusType::SEALED);
}
return res;

View File

@ -54,23 +54,16 @@ namespace triagens {
typedef TRI_voc_fid_t IdType;
////////////////////////////////////////////////////////////////////////////////
/// @brief logfile seal status
/// @brief logfile status
////////////////////////////////////////////////////////////////////////////////
enum class SealStatusType : uint32_t {
enum class StatusType : uint32_t {
UNKNOWN,
UNSEALED,
REQUESTED,
SEALED
};
////////////////////////////////////////////////////////////////////////////////
/// @brief logfile collection status
////////////////////////////////////////////////////////////////////////////////
enum class CollectionStatusType : uint32_t {
UNCOLLECTED,
REQUESTED,
EMPTY,
OPEN,
SEAL_REQUESTED,
SEALED,
COLLECTION_REQUESTED,
COLLECTED
};
@ -95,8 +88,7 @@ namespace triagens {
Logfile (Logfile::IdType,
TRI_datafile_t*,
SealStatusType,
CollectionStatusType);
StatusType);
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy a logfile
@ -164,6 +156,7 @@ namespace triagens {
return 0;
}
// TODO: decide whether we need _footerSize
return static_cast<uint64_t>(allocatedSize() - _df->_footerSize - _df->_currentSize);
}
@ -188,7 +181,9 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
inline bool isSealed () const {
return (_sealStatus == SealStatusType::SEALED);
return (_status == StatusType::SEALED ||
_status == StatusType::COLLECTION_REQUESTED ||
_status == StatusType::COLLECTED);
}
////////////////////////////////////////////////////////////////////////////////
@ -196,7 +191,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
inline bool canBeSealed () const {
return (_sealStatus == SealStatusType::REQUESTED);
return (_status == StatusType::SEAL_REQUESTED);
}
////////////////////////////////////////////////////////////////////////////////
@ -204,7 +199,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
inline bool canBeCollected () const {
return (isSealed() && (_collectionStatus == CollectionStatusType::UNCOLLECTED));
return (_status == StatusType::SEALED);
}
////////////////////////////////////////////////////////////////////////////////
@ -212,36 +207,69 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
inline bool canBeRemoved () const {
return (isSealed() && (_collectionStatus == CollectionStatusType::COLLECTED));
return (_status == StatusType::COLLECTED);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the logfile to status requested
/// @brief return the logfile status as a string
////////////////////////////////////////////////////////////////////////////////
void setCollectionRequested () {
LOG_INFO("requesting collection for logfile %llu", (unsigned long long) id());
_collectionStatus = CollectionStatusType::REQUESTED;
static std::string statusText (StatusType status) {
switch (status) {
case StatusType::UNKNOWN:
return "unknown";
case StatusType::EMPTY:
return "empty";
case StatusType::OPEN:
return "open";
case StatusType::SEAL_REQUESTED:
return "seal-requested";
case StatusType::SEALED:
return "sealed";
case StatusType::COLLECTION_REQUESTED:
return "collection-requested";
case StatusType::COLLECTED:
return "collected";
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief set the logfile to status collected
/// @brief change the logfile status
////////////////////////////////////////////////////////////////////////////////
void setCollectionDone () {
LOG_INFO("collection done for logfile %llu", (unsigned long long) id());
_collectionStatus = CollectionStatusType::COLLECTED;
}
void setStatus (StatusType status) {
switch (status) {
case StatusType::UNKNOWN:
case StatusType::EMPTY:
assert(false);
break;
////////////////////////////////////////////////////////////////////////////////
/// @brief request sealing of the logfile
////////////////////////////////////////////////////////////////////////////////
case StatusType::OPEN:
assert(_status == StatusType::EMPTY);
break;
void setSealRequested () {
LOG_INFO("sealing logfile %llu", (unsigned long long) id());
case StatusType::SEAL_REQUESTED:
assert(_status == StatusType::OPEN);
break;
assert(_sealStatus == SealStatusType::UNSEALED);
_sealStatus = SealStatusType::REQUESTED;
case StatusType::SEALED:
assert(_status == StatusType::SEAL_REQUESTED);
break;
case StatusType::COLLECTION_REQUESTED:
assert(_status == StatusType::SEALED);
break;
case StatusType::COLLECTED:
assert(_status == StatusType::COLLECTION_REQUESTED);
break;
}
LOG_INFO("changing logfile status from %s to %s for logfile %llu",
statusText(_status).c_str(),
statusText(status).c_str(),
(unsigned long long) id());
_status = status;
}
////////////////////////////////////////////////////////////////////////////////
@ -267,16 +295,10 @@ namespace triagens {
TRI_datafile_t* _df;
////////////////////////////////////////////////////////////////////////////////
/// @brief logfile seal status
/// @brief logfile status
////////////////////////////////////////////////////////////////////////////////
SealStatusType _sealStatus;
////////////////////////////////////////////////////////////////////////////////
/// @brief logfile collection status
////////////////////////////////////////////////////////////////////////////////
CollectionStatusType _collectionStatus;
StatusType _status;
};

View File

@ -39,7 +39,6 @@
#include "VocBase/server.h"
#include "Wal/AllocatorThread.h"
#include "Wal/CollectorThread.h"
#include "Wal/Configuration.h"
#include "Wal/Slots.h"
#include "Wal/SynchroniserThread.h"
@ -53,8 +52,12 @@ using namespace triagens::wal;
/// @brief create the logfile manager
////////////////////////////////////////////////////////////////////////////////
LogfileManager::LogfileManager (Configuration* configuration)
: _configuration(configuration),
LogfileManager::LogfileManager ()
: ApplicationFeature("logfile-manager"),
_directory(),
_filesize(32 * 1024 * 1024),
_reserveLogfiles(3),
_historicLogfiles(10),
_slots(nullptr),
_synchroniserThread(nullptr),
_allocatorThread(nullptr),
@ -62,8 +65,6 @@ LogfileManager::LogfileManager (Configuration* configuration)
_logfilesLock(),
_lastCollectedId(0),
_logfiles(),
_maxEntrySize(configuration->maxEntrySize()),
_directory(configuration->directory()),
_regex(),
_shutdown(0) {
@ -75,7 +76,7 @@ LogfileManager::LogfileManager (Configuration* configuration)
THROW_INTERNAL_ERROR("could not compile regex");
}
_slots = new Slots(this, 16384, 0);
_slots = new Slots(this, 1048576, 0);
}
////////////////////////////////////////////////////////////////////////////////
@ -85,8 +86,6 @@ LogfileManager::LogfileManager (Configuration* configuration)
LogfileManager::~LogfileManager () {
LOG_INFO("shutting down wal logfile manager");
shutdown();
regfree(&_regex);
if (_slots != nullptr) {
@ -95,44 +94,49 @@ LogfileManager::~LogfileManager () {
}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// --SECTION-- ApplicationFeature methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not there are reserve logfiles
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool LogfileManager::hasReserveLogfiles () {
size_t numberOfLogfiles = 0;
READ_LOCKER(_logfilesLock);
for (auto it = _logfiles.begin(); it != _logfiles.end(); ++it) {
Logfile* logfile = (*it).second;
if (logfile != nullptr &&
logfile->freeSize() > 0 &&
! logfile->isSealed()) {
if (++numberOfLogfiles >= 3) { // TODO: make "3" a configuration option
return true;
}
}
}
return false;
void LogfileManager::setupOptions (std::map<std::string, triagens::basics::ProgramOptionsDescription>& options) {
options["Write-ahead log options:help-wal"]
("wal.logfile-size", &_filesize, "size of each logfile")
("wal.historic-logfiles", &_historicLogfiles, "number of historic logfiles to keep after collection")
("wal.reserve-logfiles", &_reserveLogfiles, "number of reserve logfiles to maintain")
("wal.directory", &_directory, "logfile directory")
;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief startup the logfile manager
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
int LogfileManager::startup () {
bool LogfileManager::prepare () {
if (_directory.empty()) {
LOG_FATAL_AND_EXIT("no directory specified for write-ahead logs. Please use the --wal.directory option");
}
if (_directory[_directory.size() - 1] != TRI_DIR_SEPARATOR_CHAR) {
// append a trailing slash to directory name
_directory.push_back(TRI_DIR_SEPARATOR_CHAR);
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool LogfileManager::start () {
int res = inventory();
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not create wal logfile inventory: %s", TRI_errno_string(res));
return res;
return false;
}
std::string const shutdownFile = shutdownFilename();
@ -145,11 +149,11 @@ int LogfileManager::startup () {
LOG_ERROR("could not open shutdown file '%s': %s",
shutdownFile.c_str(),
TRI_errno_string(res));
return res;
return false;
}
LOG_INFO("last tick: %llu, last collected: %llu",
(unsigned long long) _slots->lastTick(),
(unsigned long long) _slots->lastAssignedTick(),
(unsigned long long) _lastCollectedId);
}
@ -158,48 +162,86 @@ int LogfileManager::startup () {
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not open wal logfiles: %s",
TRI_errno_string(res));
return res;
return false;
}
res = startSynchroniserThread();
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not start wal synchroniser thread: %s", TRI_errno_string(res));
return res;
return false;
}
res = startAllocatorThread();
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not start wal allocator thread: %s", TRI_errno_string(res));
return res;
return false;
}
res = startCollectorThread();
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not start wal collector thread: %s", TRI_errno_string(res));
return res;
return false;
}
if (shutdownFileExists) {
// delete the shutdown file if it existed
if (! basics::FileUtils::remove(shutdownFile, &res)) {
LOG_ERROR("could not remove shutdown file '%s': %s", shutdownFile.c_str(), TRI_errno_string(res));
return res;
return false;
}
}
LOG_INFO("wal logfile manager started successfully");
LOG_INFO("wal logfile manager configuration: historic logfiles: %lu, reserve logfiles: %lu, filesize: %lu",
(unsigned long) _historicLogfiles,
(unsigned long) _reserveLogfiles,
(unsigned long) _filesize);
return TRI_ERROR_NO_ERROR;
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool LogfileManager::open () {
for (size_t i = 0; i < 50 * 1024 * 1024; ++i) {
void* p = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, 64, true);
TRI_df_marker_t* marker = static_cast<TRI_df_marker_t*>(p);
marker->_type = TRI_DF_MARKER_HEADER;
marker->_size = 64;
marker->_crc = 0;
marker->_tick = 0;
if (i % 500000 == 0) {
LOG_INFO("now at: %d", (int) i);
}
memcpy(static_cast<char*>(p) + sizeof(TRI_df_marker_t), "the fox is brown\0", strlen("the fox is brown") + 1);
this->allocateAndWrite(p, static_cast<uint32_t>(64), false);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, p);
}
LOG_INFO("done");
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down and closes all open logfiles
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::shutdown () {
void LogfileManager::close () {
}
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::stop () {
if (_shutdown > 0) {
return;
}
@ -229,6 +271,34 @@ void LogfileManager::shutdown () {
}
}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not there are reserve logfiles
////////////////////////////////////////////////////////////////////////////////
bool LogfileManager::hasReserveLogfiles () {
uint32_t numberOfLogfiles = 0;
// note: this information could also be cached instead of being recalculated
// everytime
READ_LOCKER(_logfilesLock);
for (auto it = _logfiles.begin(); it != _logfiles.end(); ++it) {
Logfile* logfile = (*it).second;
if (logfile != nullptr && logfile->freeSize() > 0 && ! logfile->isSealed()) {
if (++numberOfLogfiles >= reserveLogfiles()) {
return true;
}
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief signal that a sync operation is required
////////////////////////////////////////////////////////////////////////////////
@ -276,7 +346,7 @@ void LogfileManager::sealLogfiles () {
////////////////////////////////////////////////////////////////////////////////
SlotInfo LogfileManager::allocate (uint32_t size) {
if (size > _maxEntrySize) {
if (size > maxEntrySize()) {
// entry is too big
return SlotInfo(TRI_ERROR_ARANGO_DOCUMENT_TOO_LARGE);
}
@ -288,8 +358,9 @@ SlotInfo LogfileManager::allocate (uint32_t size) {
/// @brief finalise a log entry
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::finalise (SlotInfo& slotInfo) {
_slots->returnUsed(slotInfo);
void LogfileManager::finalise (SlotInfo& slotInfo,
bool waitForSync) {
_slots->returnUsed(slotInfo, waitForSync);
}
////////////////////////////////////////////////////////////////////////////////
@ -298,7 +369,8 @@ void LogfileManager::finalise (SlotInfo& slotInfo) {
////////////////////////////////////////////////////////////////////////////////
int LogfileManager::allocateAndWrite (void* mem,
uint32_t size) {
uint32_t size,
bool waitForSync) {
SlotInfo slotInfo = allocate(size);
@ -323,7 +395,7 @@ int LogfileManager::allocateAndWrite (void* mem,
memcpy(slotInfo.slot->mem(), mem, static_cast<TRI_voc_size_t>(size));
finalise(slotInfo);
finalise(slotInfo, waitForSync);
return slotInfo.errorCode;
}
@ -362,7 +434,6 @@ void LogfileManager::unlinkLogfile (Logfile* logfile) {
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::removeLogfile (Logfile* logfile) {
assert(false);
unlinkLogfile(logfile);
// old filename
@ -393,7 +464,7 @@ int LogfileManager::requestSealing (Logfile* logfile) {
{
WRITE_LOCKER(_logfilesLock);
logfile->setSealRequested();
logfile->setStatus(Logfile::StatusType::SEAL_REQUESTED);
signalSync();
}
@ -428,13 +499,12 @@ Logfile* LogfileManager::getWriteableLogfile (uint32_t size) {
while (++iterations < 1000) {
{
READ_LOCKER(_logfilesLock);
WRITE_LOCKER(_logfilesLock);
for (auto it = _logfiles.begin(); it != _logfiles.end(); ++it) {
Logfile* logfile = (*it).second;
if (logfile != nullptr &&
logfile->isWriteable(size)) {
if (logfile != nullptr && logfile->isWriteable(size)) {
return logfile;
}
}
@ -471,7 +541,7 @@ Logfile* LogfileManager::getCollectableLogfile () {
////////////////////////////////////////////////////////////////////////////////
Logfile* LogfileManager::getRemovableLogfile () {
size_t numberOfLogfiles = 0;
uint32_t numberOfLogfiles = 0;
Logfile* first = nullptr;
READ_LOCKER(_logfilesLock);
@ -484,7 +554,7 @@ Logfile* LogfileManager::getRemovableLogfile () {
first = logfile;
}
if (++numberOfLogfiles >= 3) {
if (++numberOfLogfiles > historicLogfiles()) {
assert(first != nullptr);
return first;
}
@ -502,7 +572,7 @@ void LogfileManager::setCollectionRequested (Logfile* logfile) {
assert(logfile != nullptr);
WRITE_LOCKER(_logfilesLock);
logfile->setCollectionRequested();
logfile->setStatus(Logfile::StatusType::COLLECTION_REQUESTED);
}
////////////////////////////////////////////////////////////////////////////////
@ -512,15 +582,8 @@ void LogfileManager::setCollectionRequested (Logfile* logfile) {
void LogfileManager::setCollectionDone (Logfile* logfile) {
assert(logfile != nullptr);
{
WRITE_LOCKER(_logfilesLock);
logfile->setCollectionDone();
// finally get rid of the logfile
_logfiles.erase(logfile->id());
}
delete logfile;
WRITE_LOCKER(_logfilesLock);
logfile->setStatus(Logfile::StatusType::COLLECTED);
}
// -----------------------------------------------------------------------------
@ -571,7 +634,7 @@ int LogfileManager::readShutdownInfo () {
// read last assigned tick (may be 0)
uint64_t const lastTick = basics::JsonHelper::stringUInt64(json, "lastTick");
_slots->setLastTick(static_cast<Slot::TickType>(lastTick));
_slots->setLastAssignedTick(static_cast<Slot::TickType>(lastTick));
// read if of last collected logfile (maybe 0)
uint64_t const lastCollected = basics::JsonHelper::stringUInt64(json, "lastCollected");
@ -595,7 +658,7 @@ int LogfileManager::writeShutdownInfo () {
std::string content;
content.append("{\"lastTick\":\"");
content.append(basics::StringUtils::itoa(_slots->lastTick()));
content.append(basics::StringUtils::itoa(_slots->lastAssignedTick()));
content.append("\",\"lastCollected\":\"");
content.append(basics::StringUtils::itoa(lastCollected()));
content.append("\"}");
@ -721,7 +784,7 @@ int LogfileManager::inventory () {
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
LOG_TRACE("scanning wal directory: '%s'", _directory.c_str());
std::vector<std::string> files = basics::FileUtils::listFiles(_directory);
@ -778,19 +841,20 @@ int LogfileManager::openLogfiles () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief allocate a new datafile
/// @brief allocates a new reserve logfile
////////////////////////////////////////////////////////////////////////////////
int LogfileManager::allocateDatafile () {
int LogfileManager::createReserveLogfile () {
Logfile::IdType const id = nextId();
std::string const filename = logfileName(id);
Logfile* logfile = Logfile::create(filename.c_str(), id, _configuration->filesize());
LOG_INFO("creating empty logfile '%s'", filename.c_str());
Logfile* logfile = Logfile::create(filename.c_str(), id, filesize());
if (logfile == nullptr) {
int res = TRI_errno();
LOG_ERROR("unable to create datafile: %s", TRI_errno_string(res));
LOG_ERROR("unable to create logfile: %s", TRI_errno_string(res));
return res;
}
@ -800,15 +864,6 @@ int LogfileManager::allocateDatafile () {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief run the recovery procedure
////////////////////////////////////////////////////////////////////////////////
int LogfileManager::runRecovery () {
// TODO
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get an id for the next logfile
////////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,7 @@
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h"
#include "ApplicationServer/ApplicationFeature.h"
#include "Wal/Logfile.h"
#include "Wal/Slots.h"
@ -41,7 +42,6 @@ namespace triagens {
class AllocatorThread;
class CollectorThread;
class Configuration;
class Slot;
class SynchroniserThread;
@ -49,7 +49,7 @@ namespace triagens {
// --SECTION-- class LogfileManager
// -----------------------------------------------------------------------------
class LogfileManager {
class LogfileManager : public rest::ApplicationFeature {
friend class AllocatorThread;
friend class CollectorThread;
@ -68,11 +68,7 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief create the logfile manager
////////////////////////////////////////////////////////////////////////////////
LogfileManager (Configuration*);
LogfileManager ();
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the logfile manager
@ -80,12 +76,95 @@ namespace triagens {
~LogfileManager ();
// -----------------------------------------------------------------------------
// --SECTION-- ApplicationFeature methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void setupOptions (std::map<string, triagens::basics::ProgramOptionsDescription>&);
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool prepare ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool open ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
bool start ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void close ();
////////////////////////////////////////////////////////////////////////////////
/// {@inheritDoc}
////////////////////////////////////////////////////////////////////////////////
void stop ();
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief get the maximum size of an entry
////////////////////////////////////////////////////////////////////////////////
inline uint32_t maxEntrySize () const {
// TODO: account for datafile overhead
return _filesize;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the logfile directory
////////////////////////////////////////////////////////////////////////////////
inline std::string directory () const {
return _directory;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the logfile size
////////////////////////////////////////////////////////////////////////////////
inline uint32_t filesize () const {
return _filesize;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the number of reserve logfiles
////////////////////////////////////////////////////////////////////////////////
inline uint32_t reserveLogfiles () const {
return _reserveLogfiles;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the number of historic logfiles to keep
////////////////////////////////////////////////////////////////////////////////
inline uint32_t historicLogfiles () const {
return _historicLogfiles;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the slots manager
////////////////////////////////////////////////////////////////////////////////
@ -100,18 +179,6 @@ namespace triagens {
bool hasReserveLogfiles ();
////////////////////////////////////////////////////////////////////////////////
/// @brief startup the logfile manager
////////////////////////////////////////////////////////////////////////////////
int startup ();
////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down and closes all open logfiles
////////////////////////////////////////////////////////////////////////////////
void shutdown ();
////////////////////////////////////////////////////////////////////////////////
/// @brief signal that a sync operation is required
////////////////////////////////////////////////////////////////////////////////
@ -134,7 +201,7 @@ namespace triagens {
/// @brief finalise a log entry
////////////////////////////////////////////////////////////////////////////////
void finalise (SlotInfo&);
void finalise (SlotInfo&, bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief write data into the logfile
@ -142,7 +209,8 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int allocateAndWrite (void*,
uint32_t);
uint32_t,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief re-inserts a logfile back into the inventory only
@ -285,16 +353,10 @@ namespace triagens {
int openLogfiles ();
////////////////////////////////////////////////////////////////////////////////
/// @brief allocate a new datafile
/// @brief allocate a new reserve logfile
////////////////////////////////////////////////////////////////////////////////
int allocateDatafile ();
////////////////////////////////////////////////////////////////////////////////
/// @brief run the recovery procedure
////////////////////////////////////////////////////////////////////////////////
int runRecovery ();
int createReserveLogfile ();
////////////////////////////////////////////////////////////////////////////////
/// @brief get an id for the next logfile
@ -327,10 +389,28 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the log file manager configuration
/// @brief the logfile directory
////////////////////////////////////////////////////////////////////////////////
Configuration* _configuration;
std::string _directory;
////////////////////////////////////////////////////////////////////////////////
/// @brief the size of each logfile
////////////////////////////////////////////////////////////////////////////////
uint32_t _filesize;
////////////////////////////////////////////////////////////////////////////////
/// @brief the target number of reserve logfiles
////////////////////////////////////////////////////////////////////////////////
uint32_t _reserveLogfiles;
////////////////////////////////////////////////////////////////////////////////
/// @brief the target number of historic logfiles
////////////////////////////////////////////////////////////////////////////////
uint32_t _historicLogfiles;
////////////////////////////////////////////////////////////////////////////////
/// @brief the slots manager
@ -374,18 +454,6 @@ namespace triagens {
std::map<Logfile::IdType, Logfile*> _logfiles;
////////////////////////////////////////////////////////////////////////////////
/// @brief maximum size of a logfile entry
////////////////////////////////////////////////////////////////////////////////
uint32_t const _maxEntrySize;
////////////////////////////////////////////////////////////////////////////////
/// @brief logfile directory
////////////////////////////////////////////////////////////////////////////////
std::string const _directory;
////////////////////////////////////////////////////////////////////////////////
/// @brief regex to match logfiles
////////////////////////////////////////////////////////////////////////////////

View File

@ -42,7 +42,8 @@ Slot::Slot ()
_logfileId(0),
_mem(nullptr),
_size(0),
_status(StatusType::UNUSED) {
_status(StatusType::UNUSED),
_waitForSync(false) {
}
@ -89,11 +90,12 @@ std::string Slot::statusText () const {
void Slot::setUnused () {
assert(isReturned());
_tick = 0;
_logfileId = 0;
_mem = nullptr;
_size = 0;
_status = StatusType::UNUSED;
_tick = 0;
_logfileId = 0;
_mem = nullptr;
_size = 0;
_status = StatusType::UNUSED;
_waitForSync = false;
}
////////////////////////////////////////////////////////////////////////////////
@ -116,9 +118,10 @@ void Slot::setUsed (void* mem,
/// @brief mark as slot as returned
////////////////////////////////////////////////////////////////////////////////
void Slot::setReturned () {
void Slot::setReturned (bool waitForSync) {
assert(isUsed());
_status = StatusType::RETURNED;
_waitForSync = waitForSync;
}
// Local Variables:

View File

@ -41,6 +41,7 @@ namespace triagens {
// -----------------------------------------------------------------------------
class Slot {
friend class Slots;
// -----------------------------------------------------------------------------
@ -157,6 +158,14 @@ namespace triagens {
return _status == StatusType::RETURNED;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a sync was requested for the slot
////////////////////////////////////////////////////////////////////////////////
inline bool waitForSync () const {
return _waitForSync;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief mark as slot as unused
////////////////////////////////////////////////////////////////////////////////
@ -176,7 +185,7 @@ namespace triagens {
/// @brief mark as slot as returned
////////////////////////////////////////////////////////////////////////////////
void setReturned ();
void setReturned (bool);
// -----------------------------------------------------------------------------
// --SECTION-- private variables
@ -214,6 +223,12 @@ namespace triagens {
StatusType _status;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a sync was requested explicitly
////////////////////////////////////////////////////////////////////////////////
bool _waitForSync;
};
}

View File

@ -47,7 +47,6 @@ Slots::Slots (LogfileManager* logfileManager,
: _logfileManager(logfileManager),
_condition(),
_lock(),
_lastTick(tick),
_slots(numberOfSlots),
_numberOfSlots(numberOfSlots),
_freeSlots(numberOfSlots),
@ -55,6 +54,8 @@ Slots::Slots (LogfileManager* logfileManager,
_handoutIndex(0),
_recycleIndex(0),
_logfile(nullptr),
_lastAssignedTick(tick),
_lastCommittedTick(0),
_readOnly(false) {
for (size_t i = 0; i < _numberOfSlots; ++i) {
@ -83,23 +84,32 @@ Slots::~Slots () {
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the last tick
/// @brief sets the last assigned tick
////////////////////////////////////////////////////////////////////////////////
void Slots::setLastTick (Slot::TickType tick) {
void Slots::setLastAssignedTick (Slot::TickType tick) {
MUTEX_LOCKER(_lock);
assert(_lastTick == 0);
assert(_lastAssignedTick == 0);
_lastTick = tick;
_lastAssignedTick = tick;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last assigned tick
////////////////////////////////////////////////////////////////////////////////
Slot::TickType Slots::lastTick () {
Slot::TickType Slots::lastAssignedTick () {
MUTEX_LOCKER(_lock);
return _lastTick;
return _lastAssignedTick;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last committed tick
////////////////////////////////////////////////////////////////////////////////
Slot::TickType Slots::lastCommittedTick () {
MUTEX_LOCKER(_lock);
return _lastCommittedTick;
}
////////////////////////////////////////////////////////////////////////////////
@ -160,7 +170,7 @@ SlotInfo Slots::nextUnused (uint32_t size) {
assert(_freeSlots > 0);
_freeSlots--;
slot->setUsed(static_cast<void*>(mem), size, _logfile->id(), ++_lastTick);
slot->setUsed(static_cast<void*>(mem), size, _logfile->id(), ++_lastAssignedTick);
if (++_handoutIndex ==_numberOfSlots) {
// wrap around
@ -194,15 +204,30 @@ SlotInfo Slots::nextUnused (uint32_t size) {
/// @brief return a used slot, allowing its synchronisation
////////////////////////////////////////////////////////////////////////////////
void Slots::returnUsed (SlotInfo& slotInfo) {
void Slots::returnUsed (SlotInfo& slotInfo,
bool waitForSync) {
assert(slotInfo.slot != nullptr);
Slot::TickType tick = slotInfo.slot->tick();
{
MUTEX_LOCKER(_lock);
slotInfo.slot->setReturned();
slotInfo.slot->setReturned(waitForSync);
}
_logfileManager->signalSync();
if (waitForSync) {
// wait until data has been committed to disk
while (true) {
CONDITION_LOCKER(guard, _condition);
if (lastCommittedTick() >= tick) {
return;
}
guard.wait(10000000);
}
}
}
////////////////////////////////////////////////////////////////////////////////
@ -231,6 +256,7 @@ SyncRegion Slots::getSyncRegion () {
region.size = slot->size();
region.firstSlotIndex = slotIndex;
region.lastSlotIndex = slotIndex;
region.waitForSync = slot->waitForSync();
}
else {
if (slot->logfileId() != region.logfileId) {
@ -243,6 +269,7 @@ SyncRegion Slots::getSyncRegion () {
// update the region
region.size += static_cast<char*>(slot->mem()) - (region.mem + region.size) + slot->size();
region.lastSlotIndex = slotIndex;
region.waitForSync |= slot->waitForSync();
}
if (++slotIndex >= _numberOfSlots) {
@ -261,10 +288,10 @@ SyncRegion Slots::getSyncRegion () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return a slot for synchronisation
/// @brief return a region to the freelist
////////////////////////////////////////////////////////////////////////////////
void Slots::unuse (SyncRegion const& region) {
void Slots::returnSyncRegion (SyncRegion const& region) {
assert(region.logfileId != 0);
size_t slotIndex = region.firstSlotIndex;
@ -275,7 +302,12 @@ void Slots::unuse (SyncRegion const& region) {
while (true) {
Slot* slot = _slots[slotIndex];
assert(slot != nullptr);
// note last tick
Slot::TickType tick = slot->tick();
assert(tick >= _lastCommittedTick);
_lastCommittedTick = tick;
slot->setUnused();
++_freeSlots;
@ -297,7 +329,7 @@ void Slots::unuse (SyncRegion const& region) {
// signal that we have done something
CONDITION_LOCKER(guard, _condition);
if (_waiting > 0) {
if (_waiting > 0 || region.waitForSync) {
_condition.broadcast();
}
}

View File

@ -104,16 +104,22 @@ namespace triagens {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the last tick
/// @brief sets the last assigned tick
////////////////////////////////////////////////////////////////////////////////
void setLastTick (Slot::TickType);
void setLastAssignedTick (Slot::TickType);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last assigned tick
////////////////////////////////////////////////////////////////////////////////
Slot::TickType lastTick ();
Slot::TickType lastAssignedTick ();
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last committed tick
////////////////////////////////////////////////////////////////////////////////
Slot::TickType lastCommittedTick ();
////////////////////////////////////////////////////////////////////////////////
/// @brief return the next unused slot
@ -125,7 +131,8 @@ namespace triagens {
/// @brief return a used slot, allowing its synchronisation
////////////////////////////////////////////////////////////////////////////////
void returnUsed (SlotInfo&);
void returnUsed (SlotInfo&,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief get the next synchronisable region
@ -134,10 +141,10 @@ namespace triagens {
SyncRegion getSyncRegion ();
////////////////////////////////////////////////////////////////////////////////
/// @brief return a slot for synchronisation
/// @brief return a region to the freelist
////////////////////////////////////////////////////////////////////////////////
void unuse (SyncRegion const&);
void returnSyncRegion (SyncRegion const&);
// -----------------------------------------------------------------------------
// --SECTION-- private variables
@ -163,12 +170,6 @@ namespace triagens {
basics::Mutex _lock;
////////////////////////////////////////////////////////////////////////////////
/// @brief last tick value used
////////////////////////////////////////////////////////////////////////////////
Slot::TickType _lastTick;
////////////////////////////////////////////////////////////////////////////////
/// @brief all slots
////////////////////////////////////////////////////////////////////////////////
@ -211,6 +212,18 @@ namespace triagens {
Logfile* _logfile;
////////////////////////////////////////////////////////////////////////////////
/// @brief last assigned tick value
////////////////////////////////////////////////////////////////////////////////
Slot::TickType _lastAssignedTick;
////////////////////////////////////////////////////////////////////////////////
/// @brief last committed tick value
////////////////////////////////////////////////////////////////////////////////
Slot::TickType _lastCommittedTick;
////////////////////////////////////////////////////////////////////////////////
/// @brief read-only mode
////////////////////////////////////////////////////////////////////////////////

View File

@ -41,7 +41,8 @@ namespace triagens {
mem(nullptr),
size(0),
firstSlotIndex(0),
lastSlotIndex(0) {
lastSlotIndex(0),
waitForSync(false) {
}
~SyncRegion () {
@ -52,6 +53,7 @@ namespace triagens {
uint32_t size;
size_t firstSlotIndex;
size_t lastSlotIndex;
bool waitForSync;
};
}

View File

@ -35,6 +35,16 @@
using namespace triagens::wal;
// -----------------------------------------------------------------------------
// --SECTION-- class SynchroniserThread
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the synchroniser thread when idle
////////////////////////////////////////////////////////////////////////////////
const uint64_t SynchroniserThread::Interval = 500000;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -48,8 +58,9 @@ SynchroniserThread::SynchroniserThread (LogfileManager* logfileManager)
_logfileManager(logfileManager),
_condition(),
_waiting(0),
_stop(0) {
_stop(0),
_logfileCache() {
allowAsynchronousCancelation();
}
@ -108,25 +119,27 @@ void SynchroniserThread::run () {
waiting = _waiting;
}
// go on without the lock
if (waiting > 0) {
// get region to sync
SyncRegion region = _logfileManager->slots()->getSyncRegion();
if (region.logfileId != 0) {
void** mmHandle = NULL;
// now perform the actual syncing
// get the logfile's file descriptor
// TODO: we might cache the file descriptor here
int fd = _logfileManager->getLogfileDescriptor(region.logfileId);
int fd = getLogfileDescriptor(region);
if (fd < 0) {
// invalid file descriptor
LOG_FATAL_AND_EXIT("invalid wal logfile file descriptor");
}
else {
// LOG_INFO("Syncing from %p to %p, length: %d", region.mem, region.mem + region.size, (int) region.size);
void** mmHandle = NULL;
bool res = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size);
// LOG_INFO("Syncing from %p to %p, length: %d", region.mem, region.mem + region.size, (int) region.size);
if (! res) {
LOG_ERROR("unable to sync wal logfile region");
@ -134,25 +147,47 @@ void SynchroniserThread::run () {
}
}
_logfileManager->slots()->unuse(region);
_logfileManager->slots()->returnSyncRegion(region);
}
// seal any logfiles that require sealing
_logfileManager->sealLogfiles();
}
// now wait until we are woken up or there is something to do
CONDITION_LOCKER(guard, _condition);
if (waiting > 0) {
assert(_waiting >= waiting);
_waiting -= waiting;
}
if (_waiting == 0) {
guard.wait(100000000);
guard.wait(Interval);
}
}
_stop = 2;
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief get a logfile descriptor (it caches the descriptor for performance)
////////////////////////////////////////////////////////////////////////////////
int SynchroniserThread::getLogfileDescriptor (SyncRegion const& region) {
if (region.logfileId != _logfileCache.id ||
_logfileCache.id == 0) {
_logfileCache.id = region.logfileId;
_logfileCache.fd = _logfileManager->getLogfileDescriptor(region.logfileId);
}
return _logfileCache.fd;
}
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"

View File

@ -32,6 +32,7 @@
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include "Wal/Logfile.h"
#include "Wal/SyncRegion.h"
namespace triagens {
namespace wal {
@ -106,6 +107,12 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief get a logfile descriptor (it caches the descriptor for performance)
////////////////////////////////////////////////////////////////////////////////
int getLogfileDescriptor (SyncRegion const&);
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
@ -136,6 +143,22 @@ namespace triagens {
volatile sig_atomic_t _stop;
////////////////////////////////////////////////////////////////////////////////
/// @brief logfile descriptor cache
////////////////////////////////////////////////////////////////////////////////
struct {
Logfile::IdType id;
int fd;
}
_logfileCache;
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the synchroniser thread when idle
////////////////////////////////////////////////////////////////////////////////
static const uint64_t Interval;
};
}