1
0
Fork 0
arangodb/arangod/RocksDBEngine/RocksDBEventListener.cpp

335 lines
11 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEventListener.h"
#include <algorithm>
#include "Basics/ConditionLocker.h"
#include "Basics/MutexLocker.h"
#include "Basics/files.h"
#include "Logger/Logger.h"
#include "RestServer/DatabasePathFeature.h"
namespace arangodb {
// must call Thread::shutdown() in order to properly shutdown
RocksDBEventListenerThread::~RocksDBEventListenerThread() { shutdown(); }
void RocksDBEventListenerThread::run() {
while (!isStopping()) {
try {
{
MUTEX_LOCKER(mutexLock, _pendingMutex);
while (!_pendingQueue.empty()) {
actionNeeded_t next(_pendingQueue.front());
_pendingQueue.pop();
mutexLock.unlock();
switch (next._action) {
case actionNeeded_t::CALC_SHA:
shaCalcFile(next._path);
break;
case actionNeeded_t::DELETE_ACTION:
deleteFile(next._path);
break;
default:
LOG_TOPIC("7c75c", ERR, arangodb::Logger::ENGINES)
<< "RocksDBEventListenerThread::run encountered unknown _action";
TRI_ASSERT(false);
break;
} // switch
mutexLock.lock();
} // while
}
// we could find files that subsequently post to _pendingQueue ... no worries.
checkMissingShaFiles(getRocksDBPath(), 5 * 60);
// need not to be written to in the past 5 minutes!
// no need for fast retry, hotbackups do not happen often
{
CONDITION_LOCKER(condLock, _loopingCondvar);
if (!isStopping()) {
condLock.wait(std::chrono::minutes(5));
} // if
}
} catch (std::exception const& ex) {
LOG_TOPIC("a27a1", ERR, arangodb::Logger::ENGINES)
<< "RocksDBEventListenerThread::run caught exception: " << ex.what();
} catch (...) {
LOG_TOPIC("66a10", ERR, arangodb::Logger::ENGINES)
<< "RocksDBEventListenerThread::run caught an exception";
} // catch
} // while
} // RocksDBEventListenerThread::run()
void RocksDBEventListenerThread::queueShaCalcFile(std::string const& pathName) {
{
MUTEX_LOCKER(lock, _pendingMutex);
_pendingQueue.emplace(actionNeeded_t{actionNeeded_t::CALC_SHA, pathName});
}
{
CONDITION_LOCKER(lock, _loopingCondvar);
lock.signal();
}
} // RocksDBEventListenerThread::queueShaCalcFile
void RocksDBEventListenerThread::queueDeleteFile(std::string const & pathName) {
{
MUTEX_LOCKER(lock, _pendingMutex);
_pendingQueue.emplace(actionNeeded_t{actionNeeded_t::DELETE_ACTION, pathName});
}
signalLoop();
} // RocksDBEventListenerThread::queueDeleteFile
void RocksDBEventListenerThread::signalLoop() {
{
CONDITION_LOCKER(lock, _loopingCondvar);
lock.signal();
}
} // RocksDBEventListenerThread::signalLoop
bool RocksDBEventListenerThread::shaCalcFile(std::string const& filename) {
bool good = false;
if (4 < filename.size() && 0 == filename.substr(filename.size() - 4).compare(".sst")) {
TRI_SHA256Functor sha;
LOG_TOPIC("af088", DEBUG, arangodb::Logger::ENGINES) << "shaCalcFile: computing "
<< filename;
good = TRI_ProcessFile(filename.c_str(), std::ref(sha));
if (good) {
std::string newfile = filename.substr(0, filename.size() - 4);
newfile += ".sha.";
newfile += sha.final();
newfile += ".hash";
LOG_TOPIC("80257", DEBUG, arangodb::Logger::ENGINES) << "shaCalcFile: done "
<< filename << " result: " << newfile;
int ret_val = TRI_WriteFile(newfile.c_str(), "", 0);
if (TRI_ERROR_NO_ERROR != ret_val) {
good = false;
LOG_TOPIC("8f7ef", DEBUG, arangodb::Logger::ENGINES)
<< "shaCalcFile: TRI_WriteFile failed with " << ret_val
<< " for " << newfile.c_str();
}
} else {
LOG_TOPIC("7f3fd", DEBUG, arangodb::Logger::ENGINES)
<< "shaCalcFile: TRI_ProcessFile failed for " << filename.c_str();
} // else
} // if
return good;
} // RocksDBEventListenerThread::shaCalcFile
bool RocksDBEventListenerThread::deleteFile(std::string const& filename) {
bool good = false, found = false;
// need filename without ".sst" for matching to ".sha." file
std::string basename = TRI_Basename(filename.c_str());
if (4 < basename.size()
&& 0 == basename.substr(basename.size() - 4).compare(".sst")) {
basename = basename.substr(0, basename.size() - 4);
} else {
// abort looking
found = true;
} // else
if (!found) {
std::string dirname = TRI_Dirname(filename.c_str());
std::vector<std::string> filelist = TRI_FilesDirectory(dirname.c_str());
// future thought: are there faster ways to find matching .sha. file?
for (auto iter = filelist.begin(); !found && filelist.end() != iter; ++iter) {
// sha256 is 64 characters long. ".sha." is added 5. So 69 characters is minimum length
if (69 < iter->size() && 0 == iter->substr(0, basename.size()).compare(basename)
&& 0 == iter->substr(basename.size(), 5).compare(".sha.")) {
found = true;
std::string deletefile = dirname;
deletefile += TRI_DIR_SEPARATOR_CHAR;
deletefile += *iter;
int ret_val = TRI_UnlinkFile(deletefile.c_str());
good = (TRI_ERROR_NO_ERROR == ret_val);
if (!good) {
LOG_TOPIC("acb34", DEBUG, arangodb::Logger::ENGINES)
<< "deleteCalcFile: TRI_UnlinkFile failed with " << ret_val
<< " for " << deletefile.c_str();
} else {
LOG_TOPIC("e0a0d", DEBUG, arangodb::Logger::ENGINES)
<< "deleteCalcFile: TRI_UnlinkFile succeeded for "
<< deletefile.c_str();
}// if
} // if
} // for
} // if
return good;
} // RocksDBEventListenerThread::shaCalcFile
//
// @brief Wrapper for getFeature<DatabasePathFeature> to simplify
// unit testing
//
std::string RocksDBEventListenerThread::getRocksDBPath() {
// get base path from DatabaseServerFeature
auto databasePathFeature =
application_features::ApplicationServer::getFeature<DatabasePathFeature>(
"DatabasePath");
std::string rockspath = databasePathFeature->directory();
rockspath += TRI_DIR_SEPARATOR_CHAR;
rockspath += "engine-rocksdb";
return rockspath;
} // RocksDBEventListenerThread::getRocksDBPath
///
/// @brief Double check the active directory to see that all .sst files
/// have a matching .sha. (and delete any none matched .sha. files)
/// Will only consider .sst files which have not been written to for
/// `requireAge` seconds.
void RocksDBEventListenerThread::checkMissingShaFiles(std::string const& pathname, int64_t requireAge) {
std::string temppath, tempname;
std::vector<std::string> filelist = TRI_FilesDirectory(pathname.c_str());
// sorting will put xxxxxx.sha.yyy just before xxxxxx.sst
std::sort(filelist.begin(), filelist.end());
for (auto iter = filelist.begin(); filelist.end() != iter; ++iter) {
std::string::size_type shaIdx;
shaIdx = iter->find(".sha.");
if (std::string::npos != shaIdx) {
// two cases: 1. its .sst follows so skip both, 2. no matching .sst, so delete
bool match = false;
auto next = iter + 1;
if (filelist.end() != next) {
tempname = iter->substr(0, shaIdx);
tempname += ".sst";
if (0 == next->compare(tempname)) {
match = true;
iter = next;
} // if
} // if
if (!match) {
temppath = pathname;
temppath += TRI_DIR_SEPARATOR_CHAR;
temppath += *iter;
LOG_TOPIC("4eac9", DEBUG, arangodb::Logger::ENGINES) << "checkMissingShaFiles:"
" Deleting file " << temppath;
TRI_UnlinkFile(temppath.c_str());
} // if
} else if (0 == iter->substr(iter->size() - 4).compare(".sst")) {
// reaching this point means no .sha. preceeded, now check the
// modification time, if younger than 5 mins, just leave it, otherwise,
// we create a sha file. This is to ensure that sha files are eventually
// generated if somebody switches on backup after the fact. However,
// normally, the shas should only be computed when the sst file has
// been fully written, which can only be guaranteed if we got a
// creation event.
temppath = pathname;
temppath += TRI_DIR_SEPARATOR_CHAR;
temppath += *iter;
int64_t now = ::time(nullptr);
int64_t modTime;
int r = TRI_MTimeFile(temppath.c_str(), &modTime);
if (r == 0 && (now - modTime) >= requireAge) {
LOG_TOPIC("d6c86", DEBUG, arangodb::Logger::ENGINES) << "checkMissingShaFiles:"
" Computing checksum for " << temppath;
shaCalcFile(temppath);
} else {
LOG_TOPIC("7f70f", DEBUG, arangodb::Logger::ENGINES) << "checkMissingShaFiles:"
" Not computing checksum for " << temppath << " since it is too young";
}
} // else
} // for
return;
} // RocksDBEventListenerThread::checkMissingShaFiles
//
// Setup the object, clearing variables, but do no real work
//
RocksDBEventListener::RocksDBEventListener()
: _shaThread("Sha256Thread") {
_shaThread.start(&_threadDone);
} // RocksDBEventListener::RocksDBEventListener
//
// Shutdown the background thread only if it was ever started
//
RocksDBEventListener::~RocksDBEventListener() {
_shaThread.signalLoop();
CONDITION_LOCKER(locker, _threadDone);
if (_shaThread.isRunning()) {
_threadDone.wait();
}
} // RocksDBEventListener::~RocksDBEventListener
///
/// @brief
///
///
void RocksDBEventListener::OnFlushCompleted(rocksdb::DB* db,
const rocksdb::FlushJobInfo& flush_job_info) {
_shaThread.queueShaCalcFile(flush_job_info.file_path);
} // RocksDBEventListener::OnFlushCompleted
void RocksDBEventListener::OnTableFileDeleted(const rocksdb::TableFileDeletionInfo& info) {
_shaThread.queueDeleteFile(info.file_path);
} // RocksDBEventListener::OnTableFileDeleted
void RocksDBEventListener::OnCompactionCompleted(rocksdb::DB* db,
const rocksdb::CompactionJobInfo& ci) {
for (auto filename : ci.output_files ) {
_shaThread.queueShaCalcFile(filename);
} // for
} // RocksDBEventListener::OnCompactionCompleted
} // namespace arangodb