1
0
Fork 0

fixed dumping

This commit is contained in:
jsteemann 2016-08-24 16:32:20 +02:00
parent 0ceea8e4cd
commit ff3776ca01
7 changed files with 153 additions and 149 deletions

View File

@ -640,33 +640,131 @@ void MMFilesCollection::figures(std::shared_ptr<arangodb::velocypack::Builder>&
std::vector<DatafileDescription> MMFilesCollection::datafilesInRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax) {
std::vector<DatafileDescription> result;
READ_LOCKER(readLocker, _filesLock);
auto apply = [&dataMin, &dataMax, &result](TRI_datafile_t const* datafile, bool isJournal) {
DatafileDescription entry = {datafile, datafile->_dataMin, datafile->_dataMax, datafile->_tickMax, isJournal};
LOG(TRACE) << "checking datafile " << datafile->_fid << " with data range " << datafile->_dataMin << " - " << datafile->_dataMax << ", tick max: " << datafile->_tickMax;
for (auto& it : _datafiles) {
DatafileDescription entry = {it, it->_dataMin, it->_dataMax, it->_tickMax, false};
LOG(TRACE) << "checking datafile " << it->_fid << " with data range " << it->_dataMin << " - " << it->_dataMax << ", tick max: " << it->_tickMax;
if (it->_dataMin == 0 || it->_dataMax == 0) {
if (datafile->_dataMin == 0 || datafile->_dataMax == 0) {
// datafile doesn't have any data
continue;
return;
}
TRI_ASSERT(it->_tickMin <= it->_tickMax);
TRI_ASSERT(it->_dataMin <= it->_dataMax);
TRI_ASSERT(datafile->_tickMin <= datafile->_tickMax);
TRI_ASSERT(datafile->_dataMin <= datafile->_dataMax);
if (dataMax < it->_dataMin) {
if (dataMax < datafile->_dataMin) {
// datafile is newer than requested range
continue;
return;
}
if (dataMin > it->_dataMax) {
if (dataMin > datafile->_dataMax) {
// datafile is older than requested range
continue;
return;
}
result.emplace_back(entry);
};
READ_LOCKER(readLocker, _filesLock);
for (auto& it : _datafiles) {
apply(it, false);
}
for (auto& it : _journals) {
apply(it, true);
}
return result;
}
int MMFilesCollection::applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
std::function<bool(TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker)> const& callback) {
LOG(TRACE) << "getting datafiles in data range " << dataMin << " - " << dataMax;
std::vector<DatafileDescription> datafiles = datafilesInRange(dataMin, dataMax);
// now we have a list of datafiles...
size_t const n = datafiles.size();
for (size_t i = 0; i < n; ++i) {
auto const& e = datafiles[i];
TRI_datafile_t const* datafile = e._data;
// we are reading from a journal that might be modified in parallel
// so we must read-lock it
CONDITIONAL_READ_LOCKER(readLocker, _filesLock, e._isJournal);
if (!e._isJournal) {
TRI_ASSERT(datafile->_isSealed);
}
char const* ptr = datafile->_data;
char const* end = ptr + datafile->_currentSize;
while (ptr < end) {
auto const* marker = reinterpret_cast<TRI_df_marker_t const*>(ptr);
if (marker->getSize() == 0) {
// end of datafile
break;
}
TRI_df_marker_type_t type = marker->getType();
if (type <= TRI_DF_MARKER_MIN) {
break;
}
ptr += DatafileHelper::AlignedMarkerSize<size_t>(marker);
if (type == TRI_DF_MARKER_BLANK) {
// fully ignore these marker types. they don't need to be replicated,
// but we also cannot stop iteration if we find one of these
continue;
}
// get the marker's tick and check whether we should include it
TRI_voc_tick_t foundTick = marker->getTick();
if (foundTick <= dataMin) {
// marker too old
continue;
}
if (foundTick > dataMax) {
// marker too new
return false; // hasMore = false
}
if (type != TRI_DF_MARKER_VPACK_DOCUMENT &&
type != TRI_DF_MARKER_VPACK_REMOVE) {
// found a non-data marker...
// check if we can abort searching
if (foundTick >= dataMax || (foundTick > e._tickMax && i == (n - 1))) {
// fetched the last available marker
return false; // hasMore = false
}
continue;
}
// note the last tick we processed
bool doAbort = false;
if (!callback(foundTick, marker)) {
doAbort = true;
}
if (foundTick >= dataMax || (foundTick >= e._tickMax && i == (n - 1))) {
// fetched the last available marker
return false; // hasMore = false
}
if (doAbort) {
return true; // hasMore = true
}
} // next marker in datafile
} // next datafile
return false; // hasMore = false
}

View File

@ -52,6 +52,8 @@ class MMFilesCollection final : public PhysicalCollection {
void figures(std::shared_ptr<arangodb::velocypack::Builder>&) override;
// datafile management
int applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
std::function<bool(TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker)> const& callback) override;
/// @brief closes an open collection
int close() override;

View File

@ -204,8 +204,9 @@ class LogicalCollection {
return getPhysical()->closeDatafiles(files);
}
std::vector<DatafileDescription> datafilesInRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax) {
return getPhysical()->datafilesInRange(dataMin, dataMax);
int applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
std::function<bool(TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker)> const& callback) {
return getPhysical()->applyForTickRange(dataMin, dataMax, callback);
}

View File

@ -54,6 +54,9 @@ class PhysicalCollection {
virtual int close() = 0;
virtual int applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
std::function<bool(TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker)> const& callback) = 0;
/// @brief rotate the active journal - will do nothing if there is no journal
virtual int rotateActiveJournal() = 0;

View File

@ -394,132 +394,34 @@ static int DumpCollection(TRI_replication_dump_t* dump,
bool withTicks) {
LOG(TRACE) << "dumping collection " << collection->cid() << ", tick range " << dataMin << " - " << dataMax;
TRI_collection_t* document = collection->_collection;
TRI_string_buffer_t* buffer = dump->_buffer;
std::vector<DatafileDescription> datafiles;
try {
LOG(TRACE) << "getting datafiles in data range " << dataMin << " - " << dataMax;
datafiles = collection->datafilesInRange(dataMin, dataMax);
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
bool const isEdgeCollection = (document->_info.type() == TRI_COL_TYPE_EDGE);
bool const isEdgeCollection = (collection->_collection->_info.type() == TRI_COL_TYPE_EDGE);
// setup some iteration state
TRI_voc_tick_t lastFoundTick = 0;
int res = TRI_ERROR_NO_ERROR;
bool hasMore = true;
bool bufferFull = false;
size_t const n = datafiles.size();
for (size_t i = 0; i < n; ++i) {
auto const& e = datafiles[i];
TRI_datafile_t const* datafile = e._data;
// we are reading from a journal that might be modified in parallel
// so we must read-lock it
#warning FIXME
// CONDITIONAL_READ_LOCKER(readLocker, document->_filesLock, e._isJournal);
if (!e._isJournal) {
TRI_ASSERT(datafile->_isSealed);
}
char const* ptr = datafile->_data;
char const* end;
if (res == TRI_ERROR_NO_ERROR) {
// no error so far. start iterating
end = ptr + datafile->_currentSize;
} else {
// some error occurred. don't iterate
end = ptr;
}
while (ptr < end) {
auto const* marker = reinterpret_cast<TRI_df_marker_t const*>(ptr);
if (marker->getSize() == 0) {
// end of datafile
break;
}
TRI_df_marker_type_t type = marker->getType();
if (type <= TRI_DF_MARKER_MIN) {
break;
}
ptr += DatafileHelper::AlignedMarkerSize<size_t>(marker);
if (type == TRI_DF_MARKER_BLANK) {
// fully ignore these marker types. they don't need to be replicated,
// but we also cannot stop iteration if we find one of these
continue;
}
// get the marker's tick and check whether we should include it
TRI_voc_tick_t foundTick = marker->getTick();
if (foundTick <= dataMin) {
// marker too old
continue;
}
if (foundTick > dataMax) {
// marker too new
hasMore = false;
goto NEXT_DF;
}
if (type != TRI_DF_MARKER_VPACK_DOCUMENT &&
type != TRI_DF_MARKER_VPACK_REMOVE) {
// found a non-data marker...
// check if we can abort searching
if (foundTick >= dataMax || (foundTick > e._tickMax && i == (n - 1))) {
// fetched the last available marker
hasMore = false;
goto NEXT_DF;
}
continue;
}
auto callback = [&dump, &lastFoundTick, &databaseId, &collectionId, &withTicks, &isEdgeCollection, &bufferFull](TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker) {
// note the last tick we processed
lastFoundTick = foundTick;
res = StringifyMarker(dump, databaseId, collectionId, marker, true, withTicks, isEdgeCollection);
int res = StringifyMarker(dump, databaseId, collectionId, marker, true, withTicks, isEdgeCollection);
if (res != TRI_ERROR_NO_ERROR) {
break; // will go to NEXT_DF
THROW_ARANGO_EXCEPTION(res);
}
if (foundTick >= dataMax || (foundTick >= e._tickMax && i == (n - 1))) {
// fetched the last available marker
hasMore = false;
goto NEXT_DF;
}
if (static_cast<uint64_t>(TRI_LengthStringBuffer(buffer)) > dump->_chunkSize) {
if (static_cast<uint64_t>(TRI_LengthStringBuffer(dump->_buffer)) > dump->_chunkSize) {
// abort the iteration
bufferFull = true;
goto NEXT_DF;
}
return false; // stop iterating
}
NEXT_DF:
if (res != TRI_ERROR_NO_ERROR || !hasMore || bufferFull) {
break;
}
}
return true; // continue iterating
};
try {
bool hasMore = collection->applyForTickRange(dataMin, dataMax, callback);
if (res == TRI_ERROR_NO_ERROR) {
if (lastFoundTick > 0) {
// data available for requested range
dump->_lastFoundTick = lastFoundTick;
@ -531,9 +433,13 @@ static int DumpCollection(TRI_replication_dump_t* dump,
dump->_hasMore = false;
dump->_bufferFull = false;
}
}
return res;
return TRI_ERROR_NO_ERROR;
} catch (basics::Exception const& ex) {
return ex.code();
} catch (...) {
return TRI_ERROR_INTERNAL;
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -869,14 +869,14 @@ int CollectorThread::transferMarkers(Logfile* logfile,
<< document->_info.name()
<< "', totalOperationsCount: " << totalOperationsCount;
CollectorCache* cache =
std::unique_ptr<CollectorCache> cache(
new CollectorCache(collectionId, databaseId, logfile,
totalOperationsCount, operations.size());
totalOperationsCount, operations.size()));
int res = TRI_ERROR_INTERNAL;
try {
res = executeTransferMarkers(collection, cache, operations);
res = executeTransferMarkers(collection, cache.get(), operations);
TRI_IF_FAILURE("transferMarkersCrash") {
// intentionally kill the server
@ -902,11 +902,6 @@ int CollectorThread::transferMarkers(Logfile* logfile,
res = TRI_ERROR_INTERNAL;
}
if (cache != nullptr) {
// prevent memleak
delete cache;
}
return res;
}
@ -969,8 +964,11 @@ int CollectorThread::executeTransferMarkers(LogicalCollection* collection,
/// @brief insert the collect operations into a per-collection queue
int CollectorThread::queueOperations(arangodb::wal::Logfile* logfile,
CollectorCache*& cache) {
std::unique_ptr<CollectorCache>& cache) {
TRI_ASSERT(cache != nullptr);
TRI_voc_cid_t cid = cache->collectionId;
uint64_t numOperations = cache->operations->size();
uint64_t maxNumPendingOperations = _logfileManager->throttleWhenPending();
TRI_ASSERT(!cache->operations->empty());
@ -983,12 +981,14 @@ int CollectorThread::queueOperations(arangodb::wal::Logfile* logfile,
// it is only safe to access the queue if this flag is not set
auto it = _operationsQueue.find(cid);
if (it == _operationsQueue.end()) {
_operationsQueue.emplace(cid, std::vector<CollectorCache*>({cache}));
_operationsQueue.emplace(cid, std::vector<CollectorCache*>({cache.get()}));
_logfileManager->increaseCollectQueueSize(logfile);
} else {
(*it).second.push_back(cache);
(*it).second.push_back(cache.get());
_logfileManager->increaseCollectQueueSize(logfile);
}
// now _operationsQueue is responsible for managing the cache entry
cache.release();
// exit the loop
break;
@ -999,8 +999,6 @@ int CollectorThread::queueOperations(arangodb::wal::Logfile* logfile,
usleep(10000);
}
uint64_t numOperations = cache->operations->size();
if (maxNumPendingOperations > 0 &&
_numPendingOperations < maxNumPendingOperations &&
(_numPendingOperations + numOperations) >= maxNumPendingOperations) {
@ -1013,10 +1011,6 @@ int CollectorThread::queueOperations(arangodb::wal::Logfile* logfile,
_numPendingOperations += numOperations;
// we have put the object into the queue successfully
// now set the original pointer to null so it isn't double-freed
cache = nullptr;
return TRI_ERROR_NO_ERROR;
}

View File

@ -203,7 +203,7 @@ class CollectorThread : public Thread {
OperationsType const&);
/// @brief insert the collect operations into a per-collection queue
int queueOperations(arangodb::wal::Logfile*, CollectorCache*&);
int queueOperations(arangodb::wal::Logfile*, std::unique_ptr<CollectorCache>&);
/// @brief update a collection's datafile information
int updateDatafileStatistics(TRI_collection_t*, CollectorCache*);