mirror of https://gitee.com/bigwinds/arangodb
make collection more exception-aware
This commit is contained in:
parent
c251b3f586
commit
f730fc7ddc
|
@ -0,0 +1,132 @@
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief collection usage guard
|
||||||
|
///
|
||||||
|
/// @file
|
||||||
|
///
|
||||||
|
/// DISCLAIMER
|
||||||
|
///
|
||||||
|
/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany
|
||||||
|
///
|
||||||
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
/// you may not use this file except in compliance with the License.
|
||||||
|
/// You may obtain a copy of the License at
|
||||||
|
///
|
||||||
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
///
|
||||||
|
/// Unless required by applicable law or agreed to in writing, software
|
||||||
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
/// See the License for the specific language governing permissions and
|
||||||
|
/// limitations under the License.
|
||||||
|
///
|
||||||
|
/// Copyright holder is triAGENS GmbH, Cologne, Germany
|
||||||
|
///
|
||||||
|
/// @author Jan Steemann
|
||||||
|
/// @author Copyright 2012-2013, triAGENS GmbH, Cologne, Germany
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#ifndef TRIAGENS_UTILS_COLLECTION_GUARD_H
|
||||||
|
#define TRIAGENS_UTILS_COLLECTION_GUARD_H 1
|
||||||
|
|
||||||
|
#include "Basics/Common.h"
|
||||||
|
#include "Utils/Exception.h"
|
||||||
|
#include "VocBase/vocbase.h"
|
||||||
|
|
||||||
|
namespace triagens {
|
||||||
|
namespace arango {
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// --SECTION-- class CollectionGuard
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class CollectionGuard {
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// --SECTION-- constructors / destructors
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
CollectionGuard (CollectionGuard const&) = delete;
|
||||||
|
CollectionGuard& operator= (CollectionGuard const&) = delete;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief create the guard, using a collection id
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
CollectionGuard (TRI_vocbase_t* vocbase,
|
||||||
|
TRI_voc_cid_t id)
|
||||||
|
: _vocbase(vocbase),
|
||||||
|
_collection(nullptr) {
|
||||||
|
|
||||||
|
_collection = TRI_UseCollectionByIdVocBase(_vocbase, id);
|
||||||
|
|
||||||
|
if (_collection == nullptr) {
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief create the guard, using a collection name
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
CollectionGuard (TRI_vocbase_t* vocbase,
|
||||||
|
char const* name)
|
||||||
|
: _vocbase(vocbase),
|
||||||
|
_collection(nullptr) {
|
||||||
|
|
||||||
|
_collection = TRI_UseCollectionByNameVocBase(_vocbase, name);
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief destroy the guard
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
~CollectionGuard () {
|
||||||
|
if (_collection != nullptr) {
|
||||||
|
TRI_ReleaseCollectionVocBase(_vocbase, _collection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// --SECTION-- public functions
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief return the collection pointer
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
inline TRI_vocbase_col_t* collection () const {
|
||||||
|
return _collection;
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// --SECTION-- private variables
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief pointer to vocbase
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
TRI_vocbase_t* _vocbase;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief pointer to collection
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
TRI_vocbase_col_t* _collection;
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// Local Variables:
|
||||||
|
// mode: outline-minor
|
||||||
|
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}"
|
||||||
|
// End:
|
|
@ -29,6 +29,7 @@
|
||||||
#define TRIAGENS_UTILS_DATABASE_GUARD_H 1
|
#define TRIAGENS_UTILS_DATABASE_GUARD_H 1
|
||||||
|
|
||||||
#include "Basics/Common.h"
|
#include "Basics/Common.h"
|
||||||
|
#include "Utils/Exception.h"
|
||||||
#include "VocBase/server.h"
|
#include "VocBase/server.h"
|
||||||
|
|
||||||
struct TRI_vocbase_s;
|
struct TRI_vocbase_s;
|
||||||
|
@ -61,6 +62,10 @@ namespace triagens {
|
||||||
_database(nullptr) {
|
_database(nullptr) {
|
||||||
|
|
||||||
_database = TRI_UseDatabaseByIdServer(server, id);
|
_database = TRI_UseDatabaseByIdServer(server, id);
|
||||||
|
|
||||||
|
if (_database == nullptr) {
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -73,6 +78,10 @@ namespace triagens {
|
||||||
_database(nullptr) {
|
_database(nullptr) {
|
||||||
|
|
||||||
_database = TRI_UseDatabaseServer(server, name);
|
_database = TRI_UseDatabaseServer(server, name);
|
||||||
|
|
||||||
|
if (_database == nullptr) {
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -80,7 +89,9 @@ namespace triagens {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
~DatabaseGuard () {
|
~DatabaseGuard () {
|
||||||
release();
|
if (_database != nullptr) {
|
||||||
|
TRI_ReleaseDatabaseServer(_server, _database);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
@ -97,17 +108,6 @@ namespace triagens {
|
||||||
return _database;
|
return _database;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief release
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
inline void release () {
|
|
||||||
if (_database != nullptr) {
|
|
||||||
TRI_ReleaseDatabaseServer(_server, _database);
|
|
||||||
_database = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
// --SECTION-- private variables
|
// --SECTION-- private variables
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "AllocatorThread.h"
|
#include "AllocatorThread.h"
|
||||||
#include "BasicsC/logging.h"
|
#include "BasicsC/logging.h"
|
||||||
#include "Basics/ConditionLocker.h"
|
#include "Basics/ConditionLocker.h"
|
||||||
|
#include "Utils/Exception.h"
|
||||||
#include "Wal/LogfileManager.h"
|
#include "Wal/LogfileManager.h"
|
||||||
|
|
||||||
using namespace triagens::wal;
|
using namespace triagens::wal;
|
||||||
|
@ -84,7 +85,7 @@ void AllocatorThread::stop () {
|
||||||
_condition.signal();
|
_condition.signal();
|
||||||
|
|
||||||
while (_stop != 2) {
|
while (_stop != 2) {
|
||||||
usleep(1000);
|
usleep(10000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,19 +133,28 @@ void AllocatorThread::run () {
|
||||||
_requestedSize = 0;
|
_requestedSize = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requestedSize == 0 && ! _logfileManager->hasReserveLogfiles()) {
|
try {
|
||||||
if (createReserveLogfile(0)) {
|
if (requestedSize == 0 && ! _logfileManager->hasReserveLogfiles()) {
|
||||||
continue;
|
if (createReserveLogfile(0)) {
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
LOG_ERROR("unable to create new wal reserve logfile");
|
LOG_ERROR("unable to create new wal reserve logfile");
|
||||||
}
|
|
||||||
else if (requestedSize > 0 && _logfileManager->logfileCreationAllowed(requestedSize)) {
|
|
||||||
if (createReserveLogfile(requestedSize)) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
else if (requestedSize > 0 && _logfileManager->logfileCreationAllowed(requestedSize)) {
|
||||||
|
if (createReserveLogfile(requestedSize)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
LOG_ERROR("unable to create new wal reserve logfile");
|
LOG_ERROR("unable to create new wal reserve logfile");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (triagens::arango::Exception const& ex) {
|
||||||
|
int res = ex.code();
|
||||||
|
LOG_ERROR("got unexpected error in allocatorThread: %s", TRI_errno_string(res));
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
LOG_ERROR("got unspecific error in allocatorThread");
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
@ -30,7 +30,9 @@
|
||||||
#include "BasicsC/hashes.h"
|
#include "BasicsC/hashes.h"
|
||||||
#include "BasicsC/logging.h"
|
#include "BasicsC/logging.h"
|
||||||
#include "Basics/ConditionLocker.h"
|
#include "Basics/ConditionLocker.h"
|
||||||
|
#include "Utils/CollectionGuard.h"
|
||||||
#include "Utils/DatabaseGuard.h"
|
#include "Utils/DatabaseGuard.h"
|
||||||
|
#include "Utils/Exception.h"
|
||||||
#include "VocBase/document-collection.h"
|
#include "VocBase/document-collection.h"
|
||||||
#include "VocBase/server.h"
|
#include "VocBase/server.h"
|
||||||
#include "VocBase/voc-shaper.h"
|
#include "VocBase/voc-shaper.h"
|
||||||
|
@ -322,17 +324,26 @@ void CollectorThread::run () {
|
||||||
int stop = (int) _stop;
|
int stop = (int) _stop;
|
||||||
bool worked = false;
|
bool worked = false;
|
||||||
|
|
||||||
// step 1: collect a logfile if any qualifies
|
try {
|
||||||
if (stop == 0) {
|
// step 1: collect a logfile if any qualifies
|
||||||
// don't collect additional logfiles in case we want to shut down
|
if (stop == 0) {
|
||||||
worked |= this->collectLogfiles();
|
// don't collect additional logfiles in case we want to shut down
|
||||||
|
worked |= this->collectLogfiles();
|
||||||
|
}
|
||||||
|
|
||||||
|
// step 2: update master pointers
|
||||||
|
worked |= this->processQueuedOperations();
|
||||||
|
|
||||||
|
// step 3: delete a logfile if any qualifies
|
||||||
|
worked |= this->removeLogfiles();
|
||||||
|
}
|
||||||
|
catch (triagens::arango::Exception const& ex) {
|
||||||
|
int res = ex.code();
|
||||||
|
LOG_ERROR("got unexpected error in collectorThread: %s", TRI_errno_string(res));
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
LOG_ERROR("got unspecific error in collectorThread");
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 2: update master pointers
|
|
||||||
worked |= this->processQueuedOperations();
|
|
||||||
|
|
||||||
// step 3: delete a logfile if any qualifies
|
|
||||||
worked |= this->removeLogfiles();
|
|
||||||
|
|
||||||
if (stop == 0 && ! worked) {
|
if (stop == 0 && ! worked) {
|
||||||
// sleep only if there was nothing to do
|
// sleep only if there was nothing to do
|
||||||
|
@ -368,17 +379,11 @@ bool CollectorThread::collectLogfiles () {
|
||||||
|
|
||||||
_logfileManager->setCollectionRequested(logfile);
|
_logfileManager->setCollectionRequested(logfile);
|
||||||
|
|
||||||
try {
|
int res = collect(logfile);
|
||||||
int res = collect(logfile);
|
|
||||||
|
|
||||||
if (res == TRI_ERROR_NO_ERROR) {
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
_logfileManager->setCollectionDone(logfile);
|
_logfileManager->setCollectionDone(logfile);
|
||||||
return true;
|
return true;
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (...) {
|
|
||||||
// collection failed
|
|
||||||
LOG_ERROR("logfile collection failed");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -404,14 +409,41 @@ bool CollectorThread::processQueuedOperations () {
|
||||||
for (auto it2 = operations.begin(); it2 != operations.end(); /* no hoisting */ ) {
|
for (auto it2 = operations.begin(); it2 != operations.end(); /* no hoisting */ ) {
|
||||||
Logfile* logfile = (*it2)->logfile;
|
Logfile* logfile = (*it2)->logfile;
|
||||||
|
|
||||||
if (processCollectionOperations((*it2)) != TRI_ERROR_NO_ERROR) {
|
int res = TRI_ERROR_INTERNAL;
|
||||||
break;
|
|
||||||
|
try {
|
||||||
|
res = processCollectionOperations((*it2));
|
||||||
}
|
}
|
||||||
|
catch (triagens::arango::Exception const& ex) {
|
||||||
|
res = ex.code();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_LOCK_TIMEOUT) {
|
||||||
|
// could not acquire write-lock for collection in time
|
||||||
|
// do not delete the operations
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete the object
|
||||||
|
delete (*it2);
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
LOG_TRACE("queued operations applied successfully");
|
||||||
|
}
|
||||||
|
else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND ||
|
||||||
|
res == TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) {
|
||||||
|
LOG_TRACE("removing queued operations for already deleted collection");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
LOG_WARNING("got unexpected error code while applying queued operations: %s", TRI_errno_string(res));
|
||||||
|
}
|
||||||
|
|
||||||
// delete the element from the vector while iterating over the vector
|
// delete the element from the vector while iterating over the vector
|
||||||
it2 = operations.erase(it2);
|
it2 = operations.erase(it2);
|
||||||
|
|
||||||
_logfileManager->decreaseCollectQueueSize(logfile);
|
_logfileManager->decreaseCollectQueueSize(logfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
// next collection
|
// next collection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,7 +457,6 @@ bool CollectorThread::processQueuedOperations () {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: report an error?
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,26 +475,19 @@ bool CollectorThread::hasQueuedOperations () {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
int CollectorThread::processCollectionOperations (CollectorCache* cache) {
|
int CollectorThread::processCollectionOperations (CollectorCache* cache) {
|
||||||
triagens::arango::DatabaseGuard guard(_server, cache->databaseId);
|
triagens::arango::DatabaseGuard dbGuard(_server, cache->databaseId);
|
||||||
|
TRI_vocbase_t* vocbase = dbGuard.database();
|
||||||
TRI_vocbase_t* vocbase = guard.database();
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
|
|
||||||
if (vocbase == nullptr) {
|
|
||||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
TRI_vocbase_col_t* collection = TRI_UseCollectionByIdVocBase(vocbase, cache->collectionId);
|
triagens::arango::CollectionGuard collectionGuard(vocbase, cache->collectionId);
|
||||||
|
TRI_vocbase_col_t* collection = collectionGuard.collection();
|
||||||
|
TRI_ASSERT(collection != nullptr);
|
||||||
|
|
||||||
if (collection == nullptr) {
|
|
||||||
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
TRI_document_collection_t* document = collection->_collection;
|
TRI_document_collection_t* document = collection->_collection;
|
||||||
|
|
||||||
// try to acquire the write lock on the collection
|
// try to acquire the write lock on the collection
|
||||||
if (! TRI_TRY_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document)) {
|
if (! TRI_TRY_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document)) {
|
||||||
TRI_ReleaseCollectionVocBase(vocbase, collection);
|
|
||||||
|
|
||||||
LOG_TRACE("wal collector couldn't acquire write lock for collection '%llu'", (unsigned long long) document->base._info._cid);
|
LOG_TRACE("wal collector couldn't acquire write lock for collection '%llu'", (unsigned long long) document->base._info._cid);
|
||||||
|
|
||||||
return TRI_ERROR_LOCK_TIMEOUT;
|
return TRI_ERROR_LOCK_TIMEOUT;
|
||||||
|
@ -548,14 +572,10 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) {
|
||||||
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
|
TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document);
|
||||||
|
|
||||||
LOG_TRACE("wal collector successfully processed operations for collection '%s'", document->base._info._name);
|
LOG_TRACE("wal collector successfully processed operations for collection '%s'", document->base._info._name);
|
||||||
TRI_ReleaseCollectionVocBase(vocbase, collection);
|
|
||||||
|
|
||||||
delete cache;
|
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief step 3: perform removal of a logfile (if any)
|
/// @brief step 3: perform removal of a logfile (if any)
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -640,8 +660,20 @@ int CollectorThread::collect (Logfile* logfile) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (! sortedOperations.empty()) {
|
if (! sortedOperations.empty()) {
|
||||||
// TODO: handle errors indicated by transferMarkers!
|
int res = TRI_ERROR_INTERNAL;
|
||||||
transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations);
|
|
||||||
|
try {
|
||||||
|
res = transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations);
|
||||||
|
}
|
||||||
|
catch (triagens::arango::Exception const& ex) {
|
||||||
|
res = ex.code();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR &&
|
||||||
|
res != TRI_ERROR_ARANGO_DATABASE_NOT_FOUND &&
|
||||||
|
res != TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) {
|
||||||
|
LOG_WARNING("got unexpected error in collect: %s", TRI_errno_string(res));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -665,23 +697,18 @@ int CollectorThread::transferMarkers (Logfile* logfile,
|
||||||
int64_t totalOperationsCount,
|
int64_t totalOperationsCount,
|
||||||
OperationsType const& operations) {
|
OperationsType const& operations) {
|
||||||
|
|
||||||
// GENERAL TODO: remove TRI_SetLastCollectedDocumentOperation or remove the lock for the collection
|
|
||||||
TRI_ASSERT(! operations.empty());
|
TRI_ASSERT(! operations.empty());
|
||||||
|
|
||||||
triagens::arango::DatabaseGuard guard(_server, databaseId);
|
// prepare database and collection
|
||||||
|
triagens::arango::DatabaseGuard dbGuard(_server, databaseId);
|
||||||
|
TRI_vocbase_t* vocbase = dbGuard.database();
|
||||||
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
|
|
||||||
TRI_vocbase_t* vocbase = guard.database();
|
triagens::arango::CollectionGuard collectionGuard(vocbase, collectionId);
|
||||||
|
TRI_vocbase_col_t* collection =collectionGuard.collection();
|
||||||
if (vocbase == nullptr) {
|
TRI_ASSERT(collection != nullptr);
|
||||||
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
TRI_vocbase_col_t* collection = TRI_UseCollectionByIdVocBase(vocbase, collectionId);
|
|
||||||
|
|
||||||
if (collection == nullptr) {
|
|
||||||
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
CollectorCache* cache = new CollectorCache(collectionId,
|
CollectorCache* cache = new CollectorCache(collectionId,
|
||||||
databaseId,
|
databaseId,
|
||||||
logfile,
|
logfile,
|
||||||
|
@ -692,7 +719,42 @@ int CollectorThread::transferMarkers (Logfile* logfile,
|
||||||
TRI_document_collection_t* document = collection->_collection;
|
TRI_document_collection_t* document = collection->_collection;
|
||||||
TRI_ASSERT(document != nullptr);
|
TRI_ASSERT(document != nullptr);
|
||||||
|
|
||||||
TRI_voc_tick_t minTransferTick = document->base._tickMax;
|
int res = TRI_ERROR_INTERNAL;
|
||||||
|
|
||||||
|
try {
|
||||||
|
res = executeTransferMarkers(document, cache, operations);
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
// now sync the datafile
|
||||||
|
res = syncDatafileCollection(document);
|
||||||
|
|
||||||
|
// note: cache is passed by reference and can be modified by queueOperations
|
||||||
|
queueOperations(logfile, cache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (triagens::arango::Exception const& ex) {
|
||||||
|
res = ex.code();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cache != nullptr) {
|
||||||
|
// prevent memleak
|
||||||
|
delete cache;
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief transfer markers into a collection, actual work
|
||||||
|
/// the collection must have been prepared to call this function
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int CollectorThread::executeTransferMarkers (TRI_document_collection_t* document,
|
||||||
|
CollectorCache* cache,
|
||||||
|
OperationsType const& operations) {
|
||||||
|
|
||||||
|
TRI_voc_tick_t const minTransferTick = document->base._tickMax;
|
||||||
|
|
||||||
for (auto it2 = operations.begin(); it2 != operations.end(); ++it2) {
|
for (auto it2 = operations.begin(); it2 != operations.end(); ++it2) {
|
||||||
TRI_df_marker_t const* source = (*it2);
|
TRI_df_marker_t const* source = (*it2);
|
||||||
|
|
||||||
|
@ -876,14 +938,7 @@ int CollectorThread::transferMarkers (Logfile* logfile,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// now sync the datafile
|
return TRI_ERROR_NO_ERROR;
|
||||||
int res = syncDatafileCollection(document);
|
|
||||||
|
|
||||||
queueOperations(logfile, cache);
|
|
||||||
|
|
||||||
TRI_ReleaseCollectionVocBase(vocbase, collection);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -891,7 +946,7 @@ int CollectorThread::transferMarkers (Logfile* logfile,
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
|
int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
|
||||||
CollectorCache* cache) {
|
CollectorCache*& cache) {
|
||||||
TRI_voc_cid_t cid = cache->collectionId;
|
TRI_voc_cid_t cid = cache->collectionId;
|
||||||
|
|
||||||
MUTEX_LOCKER(_operationsQueueLock);
|
MUTEX_LOCKER(_operationsQueueLock);
|
||||||
|
@ -901,15 +956,17 @@ int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
|
||||||
std::vector<CollectorCache*> ops;
|
std::vector<CollectorCache*> ops;
|
||||||
ops.push_back(cache);
|
ops.push_back(cache);
|
||||||
_operationsQueue.insert(it, std::make_pair(cid, ops));
|
_operationsQueue.insert(it, std::make_pair(cid, ops));
|
||||||
|
|
||||||
_logfileManager->increaseCollectQueueSize(logfile);
|
_logfileManager->increaseCollectQueueSize(logfile);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
(*it).second.push_back(cache);
|
(*it).second.push_back(cache);
|
||||||
|
|
||||||
_logfileManager->increaseCollectQueueSize(logfile);
|
_logfileManager->increaseCollectQueueSize(logfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we have put the object into the queue successfully
|
||||||
|
// now set the original pointer to null so it isn't double-freed
|
||||||
|
cache = nullptr;
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -266,12 +266,20 @@ namespace triagens {
|
||||||
int64_t,
|
int64_t,
|
||||||
OperationsType const&);
|
OperationsType const&);
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief transfer markers into a collection
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int executeTransferMarkers (TRI_document_collection_t*,
|
||||||
|
CollectorCache*,
|
||||||
|
OperationsType const&);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief insert the collect operations into a per-collection queue
|
/// @brief insert the collect operations into a per-collection queue
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
int queueOperations (triagens::wal::Logfile*,
|
int queueOperations (triagens::wal::Logfile*,
|
||||||
CollectorCache*);
|
CollectorCache*&);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief update a collection's datafile information
|
/// @brief update a collection's datafile information
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "SynchroniserThread.h"
|
#include "SynchroniserThread.h"
|
||||||
#include "BasicsC/logging.h"
|
#include "BasicsC/logging.h"
|
||||||
#include "Basics/ConditionLocker.h"
|
#include "Basics/ConditionLocker.h"
|
||||||
|
#include "Utils/Exception.h"
|
||||||
#include "VocBase/server.h"
|
#include "VocBase/server.h"
|
||||||
#include "Wal/LogfileManager.h"
|
#include "Wal/LogfileManager.h"
|
||||||
#include "Wal/Slots.h"
|
#include "Wal/Slots.h"
|
||||||
|
@ -88,7 +89,7 @@ void SynchroniserThread::stop () {
|
||||||
_condition.signal();
|
_condition.signal();
|
||||||
|
|
||||||
while (_stop != 2) {
|
while (_stop != 2) {
|
||||||
usleep(1000);
|
usleep(10000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,45 +124,15 @@ void SynchroniserThread::run () {
|
||||||
// go on without the lock
|
// go on without the lock
|
||||||
|
|
||||||
if (waiting > 0) {
|
if (waiting > 0) {
|
||||||
// get region to sync
|
try {
|
||||||
SyncRegion region = _logfileManager->slots()->getSyncRegion();
|
doSync();
|
||||||
Logfile::IdType const id = region.logfileId;
|
}
|
||||||
|
catch (triagens::arango::Exception const& ex) {
|
||||||
if (id != 0) {
|
int res = ex.code();
|
||||||
// now perform the actual syncing
|
LOG_ERROR("got unexpected error in synchroniserThread: %s", TRI_errno_string(res));
|
||||||
Logfile::StatusType status = _logfileManager->getLogfileStatus(id);
|
}
|
||||||
TRI_ASSERT(status == Logfile::StatusType::OPEN || status == Logfile::StatusType::SEAL_REQUESTED);
|
catch (...) {
|
||||||
|
LOG_ERROR("got unspecific error in synchroniserThread");
|
||||||
// get the logfile's file descriptor
|
|
||||||
int fd = getLogfileDescriptor(region.logfileId);
|
|
||||||
|
|
||||||
if (fd < 0) {
|
|
||||||
// invalid file descriptor
|
|
||||||
LOG_FATAL_AND_EXIT("invalid wal logfile file descriptor");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
void** mmHandle = NULL;
|
|
||||||
bool res = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size);
|
|
||||||
|
|
||||||
LOG_TRACE("syncing logfile %llu, region %p - %p, length: %lu, wfs: %s",
|
|
||||||
(unsigned long long) id,
|
|
||||||
region.mem,
|
|
||||||
region.mem + region.size,
|
|
||||||
(unsigned long) region.size,
|
|
||||||
region.waitForSync ? "true" : "false");
|
|
||||||
|
|
||||||
if (! res) {
|
|
||||||
LOG_ERROR("unable to sync wal logfile region");
|
|
||||||
// TODO: how to recover from this state?
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status == Logfile::StatusType::SEAL_REQUESTED) {
|
|
||||||
// additionally seal the logfile
|
|
||||||
_logfileManager->setLogfileSealed(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_logfileManager->slots()->returnSyncRegion(region);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,6 +164,52 @@ void SynchroniserThread::run () {
|
||||||
// --SECTION-- private methods
|
// --SECTION-- private methods
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief synchronise an unsynchronized region
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int SynchroniserThread::doSync () {
|
||||||
|
// get region to sync
|
||||||
|
SyncRegion region = _logfileManager->slots()->getSyncRegion();
|
||||||
|
Logfile::IdType const id = region.logfileId;
|
||||||
|
|
||||||
|
if (id != 0) {
|
||||||
|
// now perform the actual syncing
|
||||||
|
Logfile::StatusType status = _logfileManager->getLogfileStatus(id);
|
||||||
|
TRI_ASSERT(status == Logfile::StatusType::OPEN || status == Logfile::StatusType::SEAL_REQUESTED);
|
||||||
|
|
||||||
|
// get the logfile's file descriptor
|
||||||
|
int fd = getLogfileDescriptor(region.logfileId);
|
||||||
|
TRI_ASSERT(fd >= 0);
|
||||||
|
void** mmHandle = NULL;
|
||||||
|
bool result = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size);
|
||||||
|
|
||||||
|
LOG_TRACE("syncing logfile %llu, region %p - %p, length: %lu, wfs: %s",
|
||||||
|
(unsigned long long) id,
|
||||||
|
region.mem,
|
||||||
|
region.mem + region.size,
|
||||||
|
(unsigned long) region.size,
|
||||||
|
region.waitForSync ? "true" : "false");
|
||||||
|
|
||||||
|
if (! result) {
|
||||||
|
LOG_ERROR("unable to sync wal logfile region");
|
||||||
|
|
||||||
|
return TRI_ERROR_ARANGO_MSYNC_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// all ok
|
||||||
|
|
||||||
|
if (status == Logfile::StatusType::SEAL_REQUESTED) {
|
||||||
|
// additionally seal the logfile
|
||||||
|
_logfileManager->setLogfileSealed(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logfileManager->slots()->returnSyncRegion(region);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief get a logfile descriptor (it caches the descriptor for performance)
|
/// @brief get a logfile descriptor (it caches the descriptor for performance)
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -107,6 +107,12 @@ namespace triagens {
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief synchronise an unsynchronized region
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int doSync ();
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief get a logfile descriptor (it caches the descriptor for performance)
|
/// @brief get a logfile descriptor (it caches the descriptor for performance)
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
Loading…
Reference in New Issue