1
0
Fork 0

Merge branch 'mjmh' of ssh://github.com/triAGENS/ArangoDB into mjmh

This commit is contained in:
Max Neunhoeffer 2014-06-06 17:00:26 +02:00
commit df37bcad23
7 changed files with 363 additions and 133 deletions

View File

@ -0,0 +1,132 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief collection usage guard
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2012-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef TRIAGENS_UTILS_COLLECTION_GUARD_H
#define TRIAGENS_UTILS_COLLECTION_GUARD_H 1
#include "Basics/Common.h"
#include "Utils/Exception.h"
#include "VocBase/vocbase.h"
namespace triagens {
namespace arango {
// -----------------------------------------------------------------------------
// --SECTION-- class CollectionGuard
// -----------------------------------------------------------------------------
class CollectionGuard {
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
public:
CollectionGuard (CollectionGuard const&) = delete;
CollectionGuard& operator= (CollectionGuard const&) = delete;
////////////////////////////////////////////////////////////////////////////////
/// @brief create the guard, using a collection id
////////////////////////////////////////////////////////////////////////////////
CollectionGuard (TRI_vocbase_t* vocbase,
TRI_voc_cid_t id)
: _vocbase(vocbase),
_collection(nullptr) {
_collection = TRI_UseCollectionByIdVocBase(_vocbase, id);
if (_collection == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create the guard, using a collection name
////////////////////////////////////////////////////////////////////////////////
CollectionGuard (TRI_vocbase_t* vocbase,
char const* name)
: _vocbase(vocbase),
_collection(nullptr) {
_collection = TRI_UseCollectionByNameVocBase(_vocbase, name);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the guard
////////////////////////////////////////////////////////////////////////////////
~CollectionGuard () {
if (_collection != nullptr) {
TRI_ReleaseCollectionVocBase(_vocbase, _collection);
}
}
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief return the collection pointer
////////////////////////////////////////////////////////////////////////////////
inline TRI_vocbase_col_t* collection () const {
return _collection;
}
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to vocbase
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_t* _vocbase;
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to collection
////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_col_t* _collection;
};
}
}
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -29,6 +29,7 @@
#define TRIAGENS_UTILS_DATABASE_GUARD_H 1
#include "Basics/Common.h"
#include "Utils/Exception.h"
#include "VocBase/server.h"
struct TRI_vocbase_s;
@ -61,6 +62,10 @@ namespace triagens {
_database(nullptr) {
_database = TRI_UseDatabaseByIdServer(server, id);
if (_database == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
}
////////////////////////////////////////////////////////////////////////////////
@ -73,6 +78,10 @@ namespace triagens {
_database(nullptr) {
_database = TRI_UseDatabaseServer(server, name);
if (_database == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
}
////////////////////////////////////////////////////////////////////////////////
@ -80,7 +89,9 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
~DatabaseGuard () {
release();
if (_database != nullptr) {
TRI_ReleaseDatabaseServer(_server, _database);
}
}
// -----------------------------------------------------------------------------
@ -97,17 +108,6 @@ namespace triagens {
return _database;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief release
////////////////////////////////////////////////////////////////////////////////
inline void release () {
if (_database != nullptr) {
TRI_ReleaseDatabaseServer(_server, _database);
_database = nullptr;
}
}
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------

View File

@ -28,6 +28,7 @@
#include "AllocatorThread.h"
#include "BasicsC/logging.h"
#include "Basics/ConditionLocker.h"
#include "Utils/Exception.h"
#include "Wal/LogfileManager.h"
using namespace triagens::wal;
@ -84,7 +85,7 @@ void AllocatorThread::stop () {
_condition.signal();
while (_stop != 2) {
usleep(1000);
usleep(10000);
}
}
@ -132,19 +133,28 @@ void AllocatorThread::run () {
_requestedSize = 0;
}
if (requestedSize == 0 && ! _logfileManager->hasReserveLogfiles()) {
if (createReserveLogfile(0)) {
continue;
}
try {
if (requestedSize == 0 && ! _logfileManager->hasReserveLogfiles()) {
if (createReserveLogfile(0)) {
continue;
}
LOG_ERROR("unable to create new wal reserve logfile");
}
else if (requestedSize > 0 && _logfileManager->logfileCreationAllowed(requestedSize)) {
if (createReserveLogfile(requestedSize)) {
continue;
LOG_ERROR("unable to create new wal reserve logfile");
}
else if (requestedSize > 0 && _logfileManager->logfileCreationAllowed(requestedSize)) {
if (createReserveLogfile(requestedSize)) {
continue;
}
LOG_ERROR("unable to create new wal reserve logfile");
LOG_ERROR("unable to create new wal reserve logfile");
}
}
catch (triagens::arango::Exception const& ex) {
int res = ex.code();
LOG_ERROR("got unexpected error in allocatorThread: %s", TRI_errno_string(res));
}
catch (...) {
LOG_ERROR("got unspecific error in allocatorThread");
}
{

View File

@ -30,7 +30,9 @@
#include "BasicsC/hashes.h"
#include "BasicsC/logging.h"
#include "Basics/ConditionLocker.h"
#include "Utils/CollectionGuard.h"
#include "Utils/DatabaseGuard.h"
#include "Utils/Exception.h"
#include "VocBase/document-collection.h"
#include "VocBase/server.h"
#include "VocBase/voc-shaper.h"
@ -322,17 +324,26 @@ void CollectorThread::run () {
int stop = (int) _stop;
bool worked = false;
// step 1: collect a logfile if any qualifies
if (stop == 0) {
// don't collect additional logfiles in case we want to shut down
worked |= this->collectLogfiles();
try {
// step 1: collect a logfile if any qualifies
if (stop == 0) {
// don't collect additional logfiles in case we want to shut down
worked |= this->collectLogfiles();
}
// step 2: update master pointers
worked |= this->processQueuedOperations();
// step 3: delete a logfile if any qualifies
worked |= this->removeLogfiles();
}
catch (triagens::arango::Exception const& ex) {
int res = ex.code();
LOG_ERROR("got unexpected error in collectorThread: %s", TRI_errno_string(res));
}
catch (...) {
LOG_ERROR("got unspecific error in collectorThread");
}
// step 2: update master pointers
worked |= this->processQueuedOperations();
// step 3: delete a logfile if any qualifies
worked |= this->removeLogfiles();
if (stop == 0 && ! worked) {
// sleep only if there was nothing to do
@ -368,17 +379,11 @@ bool CollectorThread::collectLogfiles () {
_logfileManager->setCollectionRequested(logfile);
try {
int res = collect(logfile);
int res = collect(logfile);
if (res == TRI_ERROR_NO_ERROR) {
_logfileManager->setCollectionDone(logfile);
return true;
}
}
catch (...) {
// collection failed
LOG_ERROR("logfile collection failed");
if (res == TRI_ERROR_NO_ERROR) {
_logfileManager->setCollectionDone(logfile);
return true;
}
return false;
@ -404,14 +409,41 @@ bool CollectorThread::processQueuedOperations () {
for (auto it2 = operations.begin(); it2 != operations.end(); /* no hoisting */ ) {
Logfile* logfile = (*it2)->logfile;
if (processCollectionOperations((*it2)) != TRI_ERROR_NO_ERROR) {
break;
int res = TRI_ERROR_INTERNAL;
try {
res = processCollectionOperations((*it2));
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
if (res == TRI_ERROR_LOCK_TIMEOUT) {
// could not acquire write-lock for collection in time
// do not delete the operations
continue;
}
// delete the object
delete (*it2);
if (res == TRI_ERROR_NO_ERROR) {
LOG_TRACE("queued operations applied successfully");
}
else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND ||
res == TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) {
LOG_TRACE("removing queued operations for already deleted collection");
}
else {
LOG_WARNING("got unexpected error code while applying queued operations: %s", TRI_errno_string(res));
}
// delete the element from the vector while iterating over the vector
it2 = operations.erase(it2);
_logfileManager->decreaseCollectQueueSize(logfile);
}
// next collection
}
@ -425,7 +457,6 @@ bool CollectorThread::processQueuedOperations () {
}
}
// TODO: report an error?
return true;
}
@ -444,26 +475,19 @@ bool CollectorThread::hasQueuedOperations () {
////////////////////////////////////////////////////////////////////////////////
int CollectorThread::processCollectionOperations (CollectorCache* cache) {
triagens::arango::DatabaseGuard guard(_server, cache->databaseId);
TRI_vocbase_t* vocbase = guard.database();
if (vocbase == nullptr) {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
triagens::arango::DatabaseGuard dbGuard(_server, cache->databaseId);
TRI_vocbase_t* vocbase = dbGuard.database();
TRI_ASSERT(vocbase != nullptr);
TRI_vocbase_col_t* collection = TRI_UseCollectionByIdVocBase(vocbase, cache->collectionId);
triagens::arango::CollectionGuard collectionGuard(vocbase, cache->collectionId);
TRI_vocbase_col_t* collection = collectionGuard.collection();
TRI_ASSERT(collection != nullptr);
if (collection == nullptr) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
TRI_document_collection_t* document = collection->_collection;
// try to acquire the write lock on the collection
if (! TRI_TRY_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document)) {
TRI_ReleaseCollectionVocBase(vocbase, collection);
LOG_TRACE("wal collector couldn't acquire write lock for collection '%llu'", (unsigned long long) document->base._info._cid);
return TRI_ERROR_LOCK_TIMEOUT;
@ -548,14 +572,10 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) {
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
LOG_TRACE("wal collector successfully processed operations for collection '%s'", document->base._info._name);
TRI_ReleaseCollectionVocBase(vocbase, collection);
delete cache;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief step 3: perform removal of a logfile (if any)
////////////////////////////////////////////////////////////////////////////////
@ -640,8 +660,20 @@ int CollectorThread::collect (Logfile* logfile) {
}
if (! sortedOperations.empty()) {
// TODO: handle errors indicated by transferMarkers!
transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations);
int res = TRI_ERROR_INTERNAL;
try {
res = transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations);
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
if (res != TRI_ERROR_NO_ERROR &&
res != TRI_ERROR_ARANGO_DATABASE_NOT_FOUND &&
res != TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) {
LOG_WARNING("got unexpected error in collect: %s", TRI_errno_string(res));
}
}
}
@ -665,23 +697,18 @@ int CollectorThread::transferMarkers (Logfile* logfile,
int64_t totalOperationsCount,
OperationsType const& operations) {
// GENERAL TODO: remove TRI_SetLastCollectedDocumentOperation or remove the lock for the collection
TRI_ASSERT(! operations.empty());
triagens::arango::DatabaseGuard guard(_server, databaseId);
// prepare database and collection
triagens::arango::DatabaseGuard dbGuard(_server, databaseId);
TRI_vocbase_t* vocbase = dbGuard.database();
TRI_ASSERT(vocbase != nullptr);
TRI_vocbase_t* vocbase = guard.database();
if (vocbase == nullptr) {
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
TRI_vocbase_col_t* collection = TRI_UseCollectionByIdVocBase(vocbase, collectionId);
if (collection == nullptr) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
triagens::arango::CollectionGuard collectionGuard(vocbase, collectionId);
TRI_vocbase_col_t* collection =collectionGuard.collection();
TRI_ASSERT(collection != nullptr);
CollectorCache* cache = new CollectorCache(collectionId,
databaseId,
logfile,
@ -692,7 +719,42 @@ int CollectorThread::transferMarkers (Logfile* logfile,
TRI_document_collection_t* document = collection->_collection;
TRI_ASSERT(document != nullptr);
TRI_voc_tick_t minTransferTick = document->base._tickMax;
int res = TRI_ERROR_INTERNAL;
try {
res = executeTransferMarkers(document, cache, operations);
if (res == TRI_ERROR_NO_ERROR) {
// now sync the datafile
res = syncDatafileCollection(document);
// note: cache is passed by reference and can be modified by queueOperations
queueOperations(logfile, cache);
}
}
catch (triagens::arango::Exception const& ex) {
res = ex.code();
}
if (cache != nullptr) {
// prevent memleak
delete cache;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief transfer markers into a collection, actual work
/// the collection must have been prepared to call this function
////////////////////////////////////////////////////////////////////////////////
int CollectorThread::executeTransferMarkers (TRI_document_collection_t* document,
CollectorCache* cache,
OperationsType const& operations) {
TRI_voc_tick_t const minTransferTick = document->base._tickMax;
for (auto it2 = operations.begin(); it2 != operations.end(); ++it2) {
TRI_df_marker_t const* source = (*it2);
@ -876,14 +938,7 @@ int CollectorThread::transferMarkers (Logfile* logfile,
}
}
// now sync the datafile
int res = syncDatafileCollection(document);
queueOperations(logfile, cache);
TRI_ReleaseCollectionVocBase(vocbase, collection);
return res;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
@ -891,7 +946,7 @@ int CollectorThread::transferMarkers (Logfile* logfile,
////////////////////////////////////////////////////////////////////////////////
int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
CollectorCache* cache) {
CollectorCache*& cache) {
TRI_voc_cid_t cid = cache->collectionId;
MUTEX_LOCKER(_operationsQueueLock);
@ -901,15 +956,17 @@ int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
std::vector<CollectorCache*> ops;
ops.push_back(cache);
_operationsQueue.insert(it, std::make_pair(cid, ops));
_logfileManager->increaseCollectQueueSize(logfile);
}
else {
(*it).second.push_back(cache);
_logfileManager->increaseCollectQueueSize(logfile);
}
// we have put the object into the queue successfully
// now set the original pointer to null so it isn't double-freed
cache = nullptr;
return TRI_ERROR_NO_ERROR;
}

View File

@ -266,12 +266,20 @@ namespace triagens {
int64_t,
OperationsType const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief transfer markers into a collection
////////////////////////////////////////////////////////////////////////////////
int executeTransferMarkers (TRI_document_collection_t*,
CollectorCache*,
OperationsType const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief insert the collect operations into a per-collection queue
////////////////////////////////////////////////////////////////////////////////
int queueOperations (triagens::wal::Logfile*,
CollectorCache*);
CollectorCache*&);
////////////////////////////////////////////////////////////////////////////////
/// @brief update a collection's datafile information

View File

@ -28,6 +28,7 @@
#include "SynchroniserThread.h"
#include "BasicsC/logging.h"
#include "Basics/ConditionLocker.h"
#include "Utils/Exception.h"
#include "VocBase/server.h"
#include "Wal/LogfileManager.h"
#include "Wal/Slots.h"
@ -88,7 +89,7 @@ void SynchroniserThread::stop () {
_condition.signal();
while (_stop != 2) {
usleep(1000);
usleep(10000);
}
}
@ -123,45 +124,15 @@ void SynchroniserThread::run () {
// go on without the lock
if (waiting > 0) {
// get region to sync
SyncRegion region = _logfileManager->slots()->getSyncRegion();
Logfile::IdType const id = region.logfileId;
if (id != 0) {
// now perform the actual syncing
Logfile::StatusType status = _logfileManager->getLogfileStatus(id);
TRI_ASSERT(status == Logfile::StatusType::OPEN || status == Logfile::StatusType::SEAL_REQUESTED);
// get the logfile's file descriptor
int fd = getLogfileDescriptor(region.logfileId);
if (fd < 0) {
// invalid file descriptor
LOG_FATAL_AND_EXIT("invalid wal logfile file descriptor");
}
else {
void** mmHandle = NULL;
bool res = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size);
LOG_TRACE("syncing logfile %llu, region %p - %p, length: %lu, wfs: %s",
(unsigned long long) id,
region.mem,
region.mem + region.size,
(unsigned long) region.size,
region.waitForSync ? "true" : "false");
if (! res) {
LOG_ERROR("unable to sync wal logfile region");
// TODO: how to recover from this state?
}
if (status == Logfile::StatusType::SEAL_REQUESTED) {
// additionally seal the logfile
_logfileManager->setLogfileSealed(id);
}
}
_logfileManager->slots()->returnSyncRegion(region);
try {
doSync();
}
catch (triagens::arango::Exception const& ex) {
int res = ex.code();
LOG_ERROR("got unexpected error in synchroniserThread: %s", TRI_errno_string(res));
}
catch (...) {
LOG_ERROR("got unspecific error in synchroniserThread");
}
}
@ -193,6 +164,52 @@ void SynchroniserThread::run () {
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief synchronise an unsynchronized region
////////////////////////////////////////////////////////////////////////////////
int SynchroniserThread::doSync () {
// get region to sync
SyncRegion region = _logfileManager->slots()->getSyncRegion();
Logfile::IdType const id = region.logfileId;
if (id != 0) {
// now perform the actual syncing
Logfile::StatusType status = _logfileManager->getLogfileStatus(id);
TRI_ASSERT(status == Logfile::StatusType::OPEN || status == Logfile::StatusType::SEAL_REQUESTED);
// get the logfile's file descriptor
int fd = getLogfileDescriptor(region.logfileId);
TRI_ASSERT(fd >= 0);
void** mmHandle = NULL;
bool result = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size);
LOG_TRACE("syncing logfile %llu, region %p - %p, length: %lu, wfs: %s",
(unsigned long long) id,
region.mem,
region.mem + region.size,
(unsigned long) region.size,
region.waitForSync ? "true" : "false");
if (! result) {
LOG_ERROR("unable to sync wal logfile region");
return TRI_ERROR_ARANGO_MSYNC_FAILED;
}
// all ok
if (status == Logfile::StatusType::SEAL_REQUESTED) {
// additionally seal the logfile
_logfileManager->setLogfileSealed(id);
}
_logfileManager->slots()->returnSyncRegion(region);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get a logfile descriptor (it caches the descriptor for performance)
////////////////////////////////////////////////////////////////////////////////

View File

@ -107,6 +107,12 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief synchronise an unsynchronized region
////////////////////////////////////////////////////////////////////////////////
int doSync ();
////////////////////////////////////////////////////////////////////////////////
/// @brief get a logfile descriptor (it caches the descriptor for performance)
////////////////////////////////////////////////////////////////////////////////