mirror of https://gitee.com/bigwinds/arangodb
use TRI_DEFER
This commit is contained in:
parent
2a5a62a5ef
commit
1e253171e9
|
@ -1452,6 +1452,9 @@ void TRI_CompactorVocBase(void* data) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// read-unlock the compaction lock later
|
||||
TRI_DEFER(TRI_WriteUnlockReadWriteLock(&document->_compactionLock));
|
||||
|
||||
try {
|
||||
if (document->_lastCompaction + COMPACTOR_COLLECTION_INTERVAL <=
|
||||
now) {
|
||||
|
@ -1485,9 +1488,6 @@ void TRI_CompactorVocBase(void* data) {
|
|||
// in case an error occurs, we must still relase the lock
|
||||
LOG_ERROR("an unknown exception occurred during compaction");
|
||||
}
|
||||
|
||||
// read-unlock the compaction lock
|
||||
TRI_WriteUnlockReadWriteLock(&document->_compactionLock);
|
||||
}
|
||||
|
||||
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
|
||||
|
|
|
@ -697,6 +697,124 @@ size_t CollectorThread::numQueuedOperations() {
|
|||
return _operationsQueue.size();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process a single marker in collector step 2
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void CollectorThread::processCollectionMarker(arangodb::arango::SingleCollectionWriteTransaction<UINT64_MAX>& trx,
|
||||
TRI_document_collection_t* document,
|
||||
CollectorCache* cache,
|
||||
CollectorOperation const& operation) {
|
||||
TRI_df_marker_t const* walMarker =
|
||||
reinterpret_cast<TRI_df_marker_t const*>(operation.walPosition);
|
||||
TRI_df_marker_t const* marker =
|
||||
reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition);
|
||||
TRI_voc_size_t const datafileMarkerSize = operation.datafileMarkerSize;
|
||||
TRI_voc_fid_t const fid = operation.datafileId;
|
||||
|
||||
TRI_ASSERT(walMarker != nullptr);
|
||||
TRI_ASSERT(marker != nullptr);
|
||||
|
||||
if (walMarker->_type == TRI_WAL_MARKER_DOCUMENT) {
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
|
||||
wal::document_marker_t const* m =
|
||||
reinterpret_cast<wal::document_marker_t const*>(walMarker);
|
||||
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
|
||||
|
||||
auto found = document->primaryIndex()->lookupKey(&trx, key);
|
||||
|
||||
if (found == nullptr || found->_rid != m->_revisionId ||
|
||||
found->getDataPtr() != walMarker) {
|
||||
// somebody inserted a new revision of the document or the revision
|
||||
// was already moved by the compactor
|
||||
dfi.numberDead++;
|
||||
dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
} else {
|
||||
// update cap constraint info
|
||||
document->_headersPtr->adjustTotalSize(
|
||||
TRI_DF_ALIGN_BLOCK(walMarker->_size),
|
||||
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
|
||||
|
||||
// we can safely update the master pointer's dataptr value
|
||||
found->setDataPtr(static_cast<void*>(
|
||||
const_cast<char*>(operation.datafilePosition)));
|
||||
found->_fid = fid;
|
||||
|
||||
dfi.numberAlive++;
|
||||
dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_EDGE) {
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
|
||||
wal::edge_marker_t const* m =
|
||||
reinterpret_cast<wal::edge_marker_t const*>(walMarker);
|
||||
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
|
||||
|
||||
auto found = document->primaryIndex()->lookupKey(&trx, key);
|
||||
|
||||
if (found == nullptr || found->_rid != m->_revisionId ||
|
||||
found->getDataPtr() != walMarker) {
|
||||
// somebody inserted a new revision of the document or the revision
|
||||
// was already moved by the compactor
|
||||
dfi.numberDead++;
|
||||
dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
} else {
|
||||
// update cap constraint info
|
||||
document->_headersPtr->adjustTotalSize(
|
||||
TRI_DF_ALIGN_BLOCK(walMarker->_size),
|
||||
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
|
||||
|
||||
// we can safely update the master pointer's dataptr value
|
||||
found->setDataPtr(static_cast<void*>(
|
||||
const_cast<char*>(operation.datafilePosition)));
|
||||
found->_fid = fid;
|
||||
|
||||
dfi.numberAlive++;
|
||||
dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_REMOVE) {
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
dfi.numberDeletions++;
|
||||
|
||||
wal::remove_marker_t const* m =
|
||||
reinterpret_cast<wal::remove_marker_t const*>(walMarker);
|
||||
char const* key =
|
||||
reinterpret_cast<char const*>(m) + sizeof(wal::remove_marker_t);
|
||||
|
||||
auto found = document->primaryIndex()->lookupKey(&trx, key);
|
||||
|
||||
if (found != nullptr && found->_rid > m->_revisionId) {
|
||||
// somebody re-created the document with a newer revision
|
||||
dfi.numberDead++;
|
||||
dfi.sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_ATTRIBUTE) {
|
||||
// move the pointer to the attribute from WAL to the datafile
|
||||
document->getShaper()->moveMarker(
|
||||
const_cast<TRI_df_marker_t*>(marker),
|
||||
(void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION
|
||||
// LOCK and fake trx here
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
dfi.numberAttributes++;
|
||||
dfi.sizeAttributes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_SHAPE) {
|
||||
// move the pointer to the shape from WAL to the datafile
|
||||
document->getShaper()->moveMarker(
|
||||
const_cast<TRI_df_marker_t*>(marker),
|
||||
(void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION
|
||||
// LOCK and fake trx here
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
dfi.numberShapes++;
|
||||
dfi.sizeShapes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process all operations for a single collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -750,120 +868,8 @@ int CollectorThread::processCollectionOperations(CollectorCache* cache) {
|
|||
|
||||
TRI_ASSERT(!cache->operations->empty());
|
||||
|
||||
auto primaryIndex = document->primaryIndex();
|
||||
|
||||
for (auto it = cache->operations->begin(); it != cache->operations->end();
|
||||
++it) {
|
||||
auto operation = (*it);
|
||||
|
||||
TRI_df_marker_t const* walMarker =
|
||||
reinterpret_cast<TRI_df_marker_t const*>(operation.walPosition);
|
||||
TRI_df_marker_t const* marker =
|
||||
reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition);
|
||||
TRI_voc_size_t const datafileMarkerSize = operation.datafileMarkerSize;
|
||||
TRI_voc_fid_t const fid = operation.datafileId;
|
||||
|
||||
TRI_ASSERT(walMarker != nullptr);
|
||||
TRI_ASSERT(marker != nullptr);
|
||||
|
||||
if (walMarker->_type == TRI_WAL_MARKER_DOCUMENT) {
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
|
||||
wal::document_marker_t const* m =
|
||||
reinterpret_cast<wal::document_marker_t const*>(walMarker);
|
||||
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
|
||||
|
||||
auto found = primaryIndex->lookupKey(&trx, key);
|
||||
|
||||
if (found == nullptr || found->_rid != m->_revisionId ||
|
||||
found->getDataPtr() != walMarker) {
|
||||
// somebody inserted a new revision of the document or the revision
|
||||
// was already moved by the compactor
|
||||
dfi.numberDead++;
|
||||
dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
} else {
|
||||
// update cap constraint info
|
||||
document->_headersPtr->adjustTotalSize(
|
||||
TRI_DF_ALIGN_BLOCK(walMarker->_size),
|
||||
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
|
||||
|
||||
// we can safely update the master pointer's dataptr value
|
||||
found->setDataPtr(static_cast<void*>(
|
||||
const_cast<char*>(operation.datafilePosition)));
|
||||
found->_fid = fid;
|
||||
|
||||
dfi.numberAlive++;
|
||||
dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_EDGE) {
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
|
||||
wal::edge_marker_t const* m =
|
||||
reinterpret_cast<wal::edge_marker_t const*>(walMarker);
|
||||
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
|
||||
|
||||
auto found = primaryIndex->lookupKey(&trx, key);
|
||||
|
||||
if (found == nullptr || found->_rid != m->_revisionId ||
|
||||
found->getDataPtr() != walMarker) {
|
||||
// somebody inserted a new revision of the document or the revision
|
||||
// was already moved by the compactor
|
||||
dfi.numberDead++;
|
||||
dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
} else {
|
||||
// update cap constraint info
|
||||
document->_headersPtr->adjustTotalSize(
|
||||
TRI_DF_ALIGN_BLOCK(walMarker->_size),
|
||||
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
|
||||
|
||||
// we can safely update the master pointer's dataptr value
|
||||
found->setDataPtr(static_cast<void*>(
|
||||
const_cast<char*>(operation.datafilePosition)));
|
||||
found->_fid = fid;
|
||||
|
||||
dfi.numberAlive++;
|
||||
dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_REMOVE) {
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
dfi.numberDeletions++;
|
||||
|
||||
wal::remove_marker_t const* m =
|
||||
reinterpret_cast<wal::remove_marker_t const*>(walMarker);
|
||||
char const* key =
|
||||
reinterpret_cast<char const*>(m) + sizeof(wal::remove_marker_t);
|
||||
|
||||
auto found = primaryIndex->lookupKey(&trx, key);
|
||||
|
||||
if (found != nullptr && found->_rid > m->_revisionId) {
|
||||
// somebody re-created the document with a newer revision
|
||||
dfi.numberDead++;
|
||||
dfi.sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_ATTRIBUTE) {
|
||||
// move the pointer to the attribute from WAL to the datafile
|
||||
document->getShaper()->moveMarker(
|
||||
const_cast<TRI_df_marker_t*>(marker),
|
||||
(void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION
|
||||
// LOCK and fake trx here
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
dfi.numberAttributes++;
|
||||
dfi.sizeAttributes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
} else if (walMarker->_type == TRI_WAL_MARKER_SHAPE) {
|
||||
// move the pointer to the shape from WAL to the datafile
|
||||
document->getShaper()->moveMarker(
|
||||
const_cast<TRI_df_marker_t*>(marker),
|
||||
(void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION
|
||||
// LOCK and fake trx here
|
||||
auto& dfi = createDfi(cache, fid);
|
||||
dfi.numberUncollected--;
|
||||
dfi.numberShapes++;
|
||||
dfi.sizeShapes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
|
||||
}
|
||||
for (auto const& it : *(cache->operations)) {
|
||||
processCollectionMarker(trx, document, cache, it);
|
||||
}
|
||||
|
||||
// finally update all datafile statistics
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "Basics/ConditionVariable.h"
|
||||
#include "Basics/Mutex.h"
|
||||
#include "Basics/Thread.h"
|
||||
#include "Utils/transactions.h"
|
||||
#include "VocBase/datafile.h"
|
||||
#include "VocBase/DatafileStatistics.h"
|
||||
#include "VocBase/Ditch.h"
|
||||
|
@ -252,6 +253,16 @@ class CollectorThread : public basics::Thread {
|
|||
|
||||
|
||||
private:
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief process a single marker in collector step 2
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void processCollectionMarker(arangodb::arango::SingleCollectionWriteTransaction<UINT64_MAX>&,
|
||||
TRI_document_collection_t*,
|
||||
CollectorCache*,
|
||||
CollectorOperation const&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the number of queued operations
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue