1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2017-05-09 12:49:25 +02:00
commit a80f6099dc
36 changed files with 700 additions and 277 deletions

View File

@ -94,7 +94,7 @@ These will be resolved in the following releases:
* index selectivity estimates are missing. All indexes will report their selectivity
estimate as `0.2`. This may lead to non-optimal indexes being used in a query.
* geo and fulltext indexes are not yet implemented
* the geo index is not yet implemented
* the number of documents reported for collections (`db.<collection>.count()`) may be
slightly wrong during transactions
@ -112,14 +112,18 @@ These will be resolved in the following releases:
The existing indexes in the RocksDB engine are all persistent. The following indexes are
supported there:
* primary: automatically created, indexing `_id` / `_key`
* primary: this type of index is automatically created. It indexes `_id` / `_key`
* edge: automatically created for edge collections, indexing `_from` and `_to`
* edge: this index is automatically created for edge collections. It indexes
`_from` and `_to`
* hash, skiplist, persistent: user-defined index, technically it is neither a hash
nor a skiplist index. All these index types map to the same RocksDB-based
sorted index implementation. The names "hash", "skiplist" and "persistent" are
only used for compatibility with the MMFiles engine.
* hash, skiplist, persistent: these are user-defined indexes, Despite their names, they are
neither hash nor skiplist indexes. These index types map to the same RocksDB-based
sorted index implementation. The same is true for the "persistent" index. The names
"hash", "skiplist" and "persistent" are only used for compatibility with the MMFiles
engine where these indexes existed in previous and the current version of ArangoDB.
* fulltext: user-defined sorted reverted index on words occurring in documents
Memory management

View File

@ -653,7 +653,7 @@ fi
PARTIAL_STATE=$?
set -e
if test "${isCygwin}" == 1 -a "${PARTIAL_STATE}" == 0; then
if test "${isCygwin}" == 0 -a "${PARTIAL_STATE}" == 0; then
# windows fails to partialy re-configure - so do a complete configure run.
if test -f CMakeFiles/generate.stamp -a CMakeFiles/generate.stamp -ot "${SOURCE_DIR}/CMakeList.txt"; then
echo "CMakeList older - Forcing complete configure run!"

View File

@ -61,15 +61,13 @@ GeneralCommTask::GeneralCommTask(EventLoop loop, GeneralServer* server,
_server(server) {}
GeneralCommTask::~GeneralCommTask() {
for (auto&& statistics : _statisticsMap) {
for (auto& statistics : _statisticsMap) {
auto stat = statistics.second;
if (stat != nullptr) {
stat->release();
}
}
_statisticsMap.clear();
}
// -----------------------------------------------------------------------------

View File

@ -26,6 +26,7 @@
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/HttpCommTask.h"
#include "GeneralServer/VppCommTask.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"

View File

@ -27,13 +27,9 @@
#include "Basics/ConditionLocker.h"
#include "Basics/MutexLocker.h"
#include "Endpoint/EndpointList.h"
#include "GeneralServer/AsyncJobManager.h"
#include "GeneralServer/GeneralDefinitions.h"
#include "GeneralServer/GeneralListenTask.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/CommonDefines.h"
#include "Rest/GeneralResponse.h"
#include "Scheduler/ListenTask.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Scheduler/Task.h"
@ -47,9 +43,7 @@ using namespace arangodb::rest;
// -----------------------------------------------------------------------------
GeneralServer::~GeneralServer() {
for (auto& task : _listenTasks) {
delete task;
}
_listenTasks.clear();
}
// -----------------------------------------------------------------------------
@ -108,13 +102,10 @@ bool GeneralServer::openEndpoint(Endpoint* endpoint) {
std::unique_ptr<ListenTask> task(new GeneralListenTask(
SchedulerFeature::SCHEDULER->eventLoop(), this, endpoint, protocolType));
task->start();
if (!task->isBound()) {
if (!task->start()) {
return false;
}
_listenTasks.emplace_back(task.get());
task.release();
_listenTasks.emplace_back(std::move(task));
return true;
}

View File

@ -28,11 +28,8 @@
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Endpoint/ConnectionInfo.h"
#include "GeneralServer/GeneralDefinitions.h"
#include "GeneralServer/HttpCommTask.h"
#include "GeneralServer/RestHandler.h"
#include "Scheduler/ListenTask.h"
namespace arangodb {
class EndpointList;
@ -48,7 +45,7 @@ class GeneralServer {
virtual ~GeneralServer();
public:
void setEndpointList(const EndpointList* list);
void setEndpointList(EndpointList const* list);
void startListening();
void stopListening();
@ -56,7 +53,7 @@ class GeneralServer {
bool openEndpoint(Endpoint* endpoint);
private:
std::vector<ListenTask*> _listenTasks;
std::vector<std::unique_ptr<ListenTask>> _listenTasks;
EndpointList const* _endpointList = nullptr;
};
}

View File

@ -22,13 +22,13 @@
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "MMFilesEngine.h"
#include "Basics/FileUtils.h"
#include "Basics/MutexLocker.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/build.h"
#include "Basics/encoding.h"
#include "Basics/files.h"
#include "MMFiles/MMFilesAqlFunctions.h"
@ -37,6 +37,7 @@
#include "MMFiles/MMFilesCompactorThread.h"
#include "MMFiles/MMFilesDatafile.h"
#include "MMFiles/MMFilesDatafileHelper.h"
#include "MMFiles/MMFilesEngine.h"
#include "MMFiles/MMFilesIndexFactory.h"
#include "MMFiles/MMFilesInitialSync.h"
#include "MMFiles/MMFilesLogfileManager.h"
@ -51,9 +52,11 @@
#include "MMFiles/MMFilesV8Functions.h"
#include "MMFiles/MMFilesView.h"
#include "MMFiles/MMFilesWalRecoveryFeature.h"
#include "MMFiles/mmfiles-replication-dump.h"
#include "Random/RandomGenerator.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/ServerIdFeature.h"
#include "RestServer/ViewTypesFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/LogicalCollection.h"
@ -3356,3 +3359,95 @@ int MMFilesEngine::handleSyncKeys(arangodb::InitialSyncer& syncer,
std::string& errorMsg) {
return handleSyncKeysMMFiles(syncer, col, keysId, cid, collectionName,maxTick, errorMsg);
}
Result MMFilesEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder){
MMFilesLogfileManagerState const s = MMFilesLogfileManager::instance()->state();
builder.openObject(); // Base
// "state" part
builder.add("state", VPackValue(VPackValueType::Object)); // open
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(s.lastCommittedTick)));
builder.add("lastUncommittedLogTick", VPackValue(std::to_string(s.lastAssignedTick)));
builder.add("totalEvents", VPackValue(static_cast<double>(s.numEvents + s.numEventsSync))); // s.numEvents + s.numEventsSync
builder.add("time", VPackValue(s.timeString));
builder.close();
// "server" part
builder.add("server", VPackValue(VPackValueType::Object)); // open
builder.add("version", VPackValue(ARANGODB_VERSION));
builder.add("serverId", VPackValue(std::to_string(ServerIdFeature::getId())));
builder.close();
// "clients" part
builder.add("clients", VPackValue(VPackValueType::Array)); // open
if (vocbase != nullptr) { // add clients
auto allClients = vocbase->getReplicationClients();
for (auto& it : allClients) {
// One client
builder.add(VPackValue(VPackValueType::Object));
builder.add("serverId", VPackValue(std::to_string(std::get<0>(it))));
char buffer[21];
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
builder.add("lastServedTick",
VPackValue(std::to_string(std::get<2>(it))));
builder.close();
}
}
builder.close(); // clients
builder.close(); // base
return Result();
}
Result MMFilesEngine::createTickRanges(VPackBuilder& builder){
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
builder.openArray();
for (auto& it : ranges) {
builder.openObject();
//filename and state are already of type string
builder.add("datafile", VPackValue(it.filename));
builder.add("state", VPackValue(it.state));
builder.add("tickMin", VPackValue(std::to_string(it.tickMin)));
builder.add("tickMax", VPackValue(std::to_string(it.tickMax)));
builder.close();
}
builder.close();
return Result{};
}
Result MMFilesEngine::firstTick(uint64_t& tick){
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
for (auto& it : ranges) {
if (it.tickMin == 0) {
continue;
}
if (it.tickMin < tick) {
tick = it.tickMin;
}
}
return Result{};
};
Result MMFilesEngine::lastLogger(TRI_vocbase_t* /*vocbase*/, std::shared_ptr<transaction::Context> transactionContext, uint64_t tickStart, uint64_t tickEnd, std::shared_ptr<VPackBuilder>& builderSPtr) {
Result res{};
std::shared_ptr<transaction::StandaloneContext> scontext =
std::dynamic_pointer_cast<transaction::StandaloneContext>(transactionContext);
TRI_ASSERT(scontext);
MMFilesReplicationDumpContext dump(scontext, 0, true, 0);
int r = MMFilesDumpLogReplication(&dump, std::unordered_set<TRI_voc_tid_t>(),
0, tickStart, tickEnd, true);
if (r != TRI_ERROR_NO_ERROR) {
res.reset(r);
return res;
}
// parsing JSON
VPackParser parser;
parser.parse(dump._buffer->_buffer);
builderSPtr = parser.steal();
return res;
}

View File

@ -99,6 +99,11 @@ class MMFilesEngine final : public StorageEngine {
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg) override;
Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder) override;
Result createTickRanges(VPackBuilder& builder) override;
Result firstTick(uint64_t& tick) override;
Result lastLogger(TRI_vocbase_t* vocbase, std::shared_ptr<transaction::Context>, uint64_t tickStart, uint64_t tickEnd, std::shared_ptr<VPackBuilder>& builderSPtr) override;
TransactionManager* createTransactionManager() override;
transaction::ContextData* createTransactionContextData() override;
TransactionState* createTransactionState(TRI_vocbase_t*) override;

View File

@ -1280,7 +1280,7 @@ bool TRI_InsertWordsMMFilesFulltextIndex(TRI_fts_index_t* const ftx,
// LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "checking word " << wordlist->_words[w];
if (w > 0) {
std::string tmp = wordlist[w];
std::string const& tmp = wordlist[w];
// check if current word has a shared/common prefix with the previous word
// inserted
// in case this is true, we can use an optimisation and do not need to

View File

@ -19,6 +19,7 @@ set(ROCKSDB_SOURCES
RocksDBEngine/RocksDBKey.cpp
RocksDBEngine/RocksDBKeyBounds.cpp
RocksDBEngine/RocksDBLogValue.cpp
RocksDBEngine/RocksDBPrefixExtractor.cpp
RocksDBEngine/RocksDBPrimaryIndex.cpp
RocksDBEngine/RocksDBReplicationCommon.cpp
RocksDBEngine/RocksDBReplicationContext.cpp

View File

@ -25,6 +25,7 @@
#include "RocksDBEngine/RocksDBComparator.h"
#include "Basics/VelocyPackHelper.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBPrefixExtractor.h"
#include "RocksDBEngine/RocksDBTypes.h"
using namespace arangodb;
@ -77,9 +78,6 @@ int RocksDBComparator::compareLexicographic(rocksdb::Slice const& lhs,
int RocksDBComparator::compareIndexValues(rocksdb::Slice const& lhs,
rocksdb::Slice const& rhs) const {
TRI_ASSERT(lhs.size() > sizeof(char) + sizeof(uint64_t));
TRI_ASSERT(rhs.size() > sizeof(char) + sizeof(uint64_t));
size_t offset = sizeof(char);
int result =
memcmp((lhs.data() + offset), (rhs.data() + offset), sizeof(uint64_t));
@ -87,6 +85,19 @@ int RocksDBComparator::compareIndexValues(rocksdb::Slice const& lhs,
return result;
}
size_t prefixLength = RocksDBPrefixExtractor::getPrefixLength(
static_cast<RocksDBEntryType>(lhs[0]));
if (lhs.size() == prefixLength || rhs.size() == prefixLength) {
if (lhs.size() == rhs.size()) {
return 0;
}
return ((lhs.size() < rhs.size()) ? -1 : 1);
}
TRI_ASSERT(lhs.size() > sizeof(char) + sizeof(uint64_t));
TRI_ASSERT(rhs.size() > sizeof(char) + sizeof(uint64_t));
VPackSlice const lSlice = RocksDBKey::indexedVPack(lhs);
VPackSlice const rSlice = RocksDBKey::indexedVPack(rhs);
@ -127,7 +138,9 @@ int RocksDBComparator::compareIndexedValues(VPackSlice const& lhs,
size_t const rLength = rhs.length();
size_t const n = lLength < rLength ? rLength : lLength;
// LOG_TOPIC(ERR, Logger::FIXME) << "COMPARING INDEX VALUES: " << lhs.toJson() << "; " << rhs.toJson() << "; LLENGTH: " << lLength << ", RLENGTH: " << rLength << ", N: " << n;
// LOG_TOPIC(ERR, Logger::FIXME) << "COMPARING INDEX VALUES: " << lhs.toJson()
// << "; " << rhs.toJson() << "; LLENGTH: " << lLength << ", RLENGTH: " <<
// rLength << ", N: " << n;
for (size_t i = 0; i < n; ++i) {
int res = arangodb::basics::VelocyPackHelper::compare(

View File

@ -27,6 +27,7 @@
#include "Basics/Exceptions.h"
#include "Basics/FileUtils.h"
#include "Basics/Result.h"
#include "Basics/RocksDBLogger.h"
#include "Basics/StaticStrings.h"
#include "Basics/Thread.h"
#include "Basics/VelocyPackHelper.h"
@ -50,7 +51,9 @@
#include "RocksDBEngine/RocksDBInitialSync.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBPrefixExtractor.h"
#include "RocksDBEngine/RocksDBReplicationManager.h"
#include "RocksDBEngine/RocksDBReplicationTailing.h"
#include "RocksDBEngine/RocksDBRestHandlers.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "RocksDBEngine/RocksDBTransactionContextData.h"
@ -172,7 +175,7 @@ void RocksDBEngine::start() {
// transactionOptions.num_stripes = TRI_numberProcessors();
// options imported set by RocksDBOptionFeature
auto* opts = ApplicationServer::getFeature<arangodb::RocksDBOptionFeature>(
auto const* opts = ApplicationServer::getFeature<arangodb::RocksDBOptionFeature>(
"RocksDBOption");
_options.write_buffer_size = static_cast<size_t>(opts->_writeBufferSize);
@ -216,6 +219,26 @@ void RocksDBEngine::start() {
_options.env->SetBackgroundThreads(opts->_numThreadsLow,
rocksdb::Env::Priority::LOW);
_options.info_log_level = rocksdb::InfoLogLevel::ERROR_LEVEL;
// intentionally do not start the logger (yet)
// as it will produce a lot of log spam
// _options.info_log = std::make_shared<RocksDBLogger>(_options.info_log_level);
// _options.statistics = rocksdb::CreateDBStatistics();
// _options.stats_dump_period_sec = 1;
rocksdb::BlockBasedTableOptions table_options;
if (opts->_blockCacheSize > 0) {
auto cache =
rocksdb::NewLRUCache(opts->_blockCacheSize, opts->_blockCacheShardBits);
table_options.block_cache = cache;
} else {
table_options.no_block_cache = true;
}
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true));
_options.table_factory.reset(
rocksdb::NewBlockBasedTableFactory(table_options));
_options.create_if_missing = true;
_options.max_open_files = -1;
_options.comparator = _cmp.get();
@ -226,7 +249,11 @@ void RocksDBEngine::start() {
// garbage collect them
_options.WAL_size_limit_MB = 0;
double counter_sync_seconds = 2.5;
// TODO: prefix_extractior + memtable_insert_with_hint_prefix
_options.prefix_extractor.reset(new RocksDBPrefixExtractor());
_options.memtable_prefix_bloom_size_ratio = 0.1; // TODO: pick better value?
// TODO: enable memtable_insert_with_hint_prefix_extractor?
_options.bloom_locality = 1;
rocksdb::Status status =
rocksdb::TransactionDB::Open(_options, transactionOptions, _path, &_db);
@ -995,7 +1022,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase,
VPackBuilder& builder) {
syncWal();
builder.add(VPackValue(VPackValueType::Object)); // Base
builder.openObject(); // Base
rocksdb::SequenceNumber lastTick = _db->GetLatestSequenceNumber();
// "state" part
@ -1037,7 +1064,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase,
builder.close(); // base
return Result();
return Result{};
}
void RocksDBEngine::determinePrunableWalFiles(TRI_voc_tick_t minTickToKeep) {
@ -1335,4 +1362,77 @@ int RocksDBEngine::handleSyncKeys(arangodb::InitialSyncer& syncer,
return handleSyncKeysRocksDB(syncer, col, keysId, cid, collectionName,
maxTick, errorMsg);
}
Result RocksDBEngine::createTickRanges(VPackBuilder& builder) {
rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB();
rocksdb::VectorLogPtr walFiles;
rocksdb::Status s = tdb->GetSortedWalFiles(walFiles);
Result res = rocksutils::convertStatus(s);
if (res.fail()) {
return res;
}
builder.openArray();
for (auto lfile = walFiles.begin(); lfile != walFiles.end(); ++lfile) {
auto& logfile = *lfile;
builder.openObject();
// filename and state are already of type string
builder.add("datafile", VPackValue(logfile->PathName()));
if (logfile->Type() == rocksdb::WalFileType::kAliveLogFile) {
builder.add("state", VPackValue("open"));
} else if (logfile->Type() == rocksdb::WalFileType::kArchivedLogFile) {
builder.add("state", VPackValue("collected"));
}
rocksdb::SequenceNumber min = logfile->StartSequence();
builder.add("tickMin", VPackValue(std::to_string(min)));
rocksdb::SequenceNumber max;
if (std::next(lfile) != walFiles.end()) {
max = (*std::next(lfile))->StartSequence();
} else {
max = tdb->GetLatestSequenceNumber();
}
builder.add("tickMax", VPackValue(std::to_string(max)));
builder.close();
}
builder.close();
return Result{};
}
Result RocksDBEngine::firstTick(uint64_t& tick) {
Result res{};
rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB();
rocksdb::VectorLogPtr walFiles;
rocksdb::Status s = tdb->GetSortedWalFiles(walFiles);
if (!s.ok()) {
res = rocksutils::convertStatus(s);
return res;
}
// read minium possible tick
if (!walFiles.empty()) {
tick = walFiles[0]->StartSequence();
}
return res;
}
Result RocksDBEngine::lastLogger(
TRI_vocbase_t* vocbase,
std::shared_ptr<transaction::Context> transactionContext,
uint64_t tickStart, uint64_t tickEnd,
std::shared_ptr<VPackBuilder>& builderSPtr) {
bool includeSystem = true;
size_t chunkSize = 32 * 1024 * 1024; // TODO: determine good default value?
// construct vocbase with proper handler
auto builder =
std::make_unique<VPackBuilder>(transactionContext->getVPackOptions());
builder->openArray();
RocksDBReplicationResult rep = rocksutils::tailWal(
vocbase, tickStart, tickEnd, chunkSize, includeSystem, 0, *builder);
builder->close();
builderSPtr = std::move(builder);
return rep;
}
} // namespace arangodb

View File

@ -125,6 +125,11 @@ class RocksDBEngine final : public StorageEngine {
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg) override;
Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder) override;
Result createTickRanges(VPackBuilder& builder) override;
Result firstTick(uint64_t& tick) override;
Result lastLogger(TRI_vocbase_t* vocbase, std::shared_ptr<transaction::Context>
,uint64_t tickStart, uint64_t tickEnd, std::shared_ptr<VPackBuilder>& builderSPtr) override;
// database, collection and index management
// -----------------------------------------
@ -259,8 +264,6 @@ class RocksDBEngine final : public StorageEngine {
void addCollectionMapping(uint64_t, TRI_voc_tick_t, TRI_voc_cid_t);
std::pair<TRI_voc_tick_t, TRI_voc_cid_t> mapObjectToCollection(uint64_t);
Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder);
void determinePrunableWalFiles(TRI_voc_tick_t minTickToKeep);
void pruneWalFiles();

View File

@ -0,0 +1,78 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEngine/RocksDBPrefixExtractor.h"
#include "Basics/VelocyPackHelper.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBTypes.h"
using namespace arangodb;
using namespace arangodb::velocypack;
RocksDBPrefixExtractor::RocksDBPrefixExtractor()
: _name("ArangoRocksDBPrefixExtractor") {}
RocksDBPrefixExtractor::~RocksDBPrefixExtractor() {}
const char* RocksDBPrefixExtractor::Name() const { return _name.data(); };
rocksdb::Slice RocksDBPrefixExtractor::Transform(
rocksdb::Slice const& key) const {
size_t length = _prefixLength[static_cast<uint8_t>(key[0])];
return rocksdb::Slice(key.data(), length);
}
bool RocksDBPrefixExtractor::InDomain(rocksdb::Slice const& key) const {
return ((key.size() > 0) &&
(_prefixLength[static_cast<uint8_t>(key[0])] > 0) &&
(_prefixLength[static_cast<uint8_t>(key[0])] <= key.size()));
}
bool RocksDBPrefixExtractor::InRange(rocksdb::Slice const& dst) const {
return ((dst.size() > 0) &&
(dst.size() == _prefixLength[static_cast<uint8_t>(dst[0])]));
}
size_t RocksDBPrefixExtractor::getPrefixLength(RocksDBEntryType type) {
return _prefixLength[static_cast<uint8_t>(type)];
}
const size_t RocksDBPrefixExtractor::_prefixLength[] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x00 - 0x0f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x10 - 0x1f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x20 - 0x2f
1, 1, 1, 9, 9, 9, 9, 9, 1, 1, 1, 1, 0, 0, 0, 0, // 0x30 - 0x3f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x40 - 0x4f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x50 - 0x5f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x60 - 0x6f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x70 - 0x7f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x80 - 0x8f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0x90 - 0x9f
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xa0 - 0xaf
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xb0 - 0xbf
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xc0 - 0xcf
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xd0 - 0xdf
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 0xe0 - 0xef
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 // 0xf0 - 0xff
};

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGO_ROCKSDB_ROCKSDB_PREFIX_EXTRACTOR_H
#define ARANGO_ROCKSDB_ROCKSDB_PREFIX_EXTRACTOR_H 1
#include "Basics/Common.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include <rocksdb/slice.h>
#include <rocksdb/slice_transform.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
class RocksDBPrefixExtractor final : public rocksdb::SliceTransform {
public:
RocksDBPrefixExtractor();
~RocksDBPrefixExtractor();
const char* Name() const;
rocksdb::Slice Transform(rocksdb::Slice const& key) const;
bool InDomain(rocksdb::Slice const& key) const;
bool InRange(rocksdb::Slice const& dst) const;
static size_t getPrefixLength(RocksDBEntryType type);
private:
const std::string _name;
static const size_t _prefixLength[];
};
} // namespace arangodb
#endif

View File

@ -203,7 +203,13 @@ Result RocksDBTransactionState::commitTransaction(
// TODO wait for response on github issue to see how we can use the
// sequence number
double t1 = TRI_microtime();
result = rocksutils::convertStatus(_rocksTransaction->Commit());
double t2 = TRI_microtime();
if (t2 - t1 > 0.25) {
LOG_TOPIC(ERR, Logger::FIXME) << "COMMIT TOOK: " << (t2 - t1) << " S. NUMINSERTS: " << _numInserts << ", NUMUPDATES: " << _numUpdates << ", NUMREMOVES: " << _numRemoves << ", TRANSACTIONSIZE: " << _transactionSize;
}
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (!result.ok()) {

View File

@ -31,6 +31,10 @@
namespace arangodb {
////////////////////////////////////////////////////////////////////////////////
/// If these values change, make sure to reflect the changes in
/// RocksDBPrefixExtractor as well.
////////////////////////////////////////////////////////////////////////////////
enum class RocksDBEntryType : char {
Database = '0',
Collection = '1',
@ -67,6 +71,6 @@ enum class RocksDBLogType : char {
};
rocksdb::Slice const& rocksDBSlice(RocksDBEntryType const& type);
}
} // namespace arangodb
#endif

View File

@ -24,10 +24,10 @@
#include "ListenTask.h"
#include "Basics/MutexLocker.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "Logger/Logger.h"
#include "Scheduler/Acceptor.h"
#include "Ssl/SslServerFeature.h"
using namespace arangodb;
using namespace arangodb::rest;
@ -40,38 +40,39 @@ ListenTask::ListenTask(EventLoop loop, Endpoint* endpoint)
: Task(loop, "ListenTask"),
_endpoint(endpoint),
_bound(false),
_ioService(loop._ioService),
_acceptor(Acceptor::factory(*loop._ioService, endpoint)) {}
ListenTask::~ListenTask() {}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
void ListenTask::start() {
bool ListenTask::start() {
MUTEX_LOCKER(mutex, _shutdownMutex);
try {
_acceptor->open();
_bound = true;
} catch (boost::system::system_error const& err) {
LOG_TOPIC(WARN, arangodb::Logger::COMMUNICATION) << "failed to open endpoint '" << _endpoint->specification()
<< "' with error: " << err.what();
return;
return false;
} catch (std::exception const& err) {
LOG_TOPIC(WARN, arangodb::Logger::COMMUNICATION) << "failed to open endpoint '" << _endpoint->specification()
<< "' with error: " << err.what();
return true;
}
_handler = [this](boost::system::error_code const& ec) {
// copy the shared_ptr so nobody can delete the Acceptor while the
// callback is running
std::shared_ptr<Acceptor> acceptorCopy(_acceptor);
MUTEX_LOCKER(mutex, _shutdownMutex);
if (acceptorCopy == nullptr) {
// ListenTask already stopped
if (!_bound) {
_handler = nullptr;
return;
}
// now it is safe to use acceptorCopy
TRI_ASSERT(acceptorCopy != nullptr);
TRI_ASSERT(_handler != nullptr);
TRI_ASSERT(_acceptor != nullptr);
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
@ -90,7 +91,7 @@ void ListenTask::start() {
ConnectionInfo info;
auto peer = acceptorCopy->movePeer();
auto peer = _acceptor->movePeer();
// set the endpoint
info.endpoint = _endpoint->specification();
@ -103,20 +104,24 @@ void ListenTask::start() {
handleConnected(std::move(peer), std::move(info));
if (_bound) {
acceptorCopy->asyncAccept(_handler);
}
_acceptor->asyncAccept(_handler);
};
_bound = true;
_acceptor->asyncAccept(_handler);
return true;
}
void ListenTask::stop() {
MUTEX_LOCKER(mutex, _shutdownMutex);
if (!_bound) {
return;
}
_bound = false;
_handler = nullptr;
_acceptor->close();
_acceptor.reset();
}

View File

@ -27,6 +27,7 @@
#include "Scheduler/Task.h"
#include "Basics/Mutex.h"
#include "Endpoint/ConnectionInfo.h"
#include "Endpoint/Endpoint.h"
#include "Scheduler/Acceptor.h"
@ -39,15 +40,15 @@ class ListenTask : virtual public rest::Task {
public:
ListenTask(EventLoop, Endpoint*);
~ListenTask();
public:
virtual void handleConnected(std::unique_ptr<Socket>, ConnectionInfo&&) = 0;
public:
bool isBound() const { return _bound.load(); }
Endpoint* endpoint() const { return _endpoint; }
void start();
bool start();
void stop();
private:
@ -56,12 +57,11 @@ class ListenTask : virtual public rest::Task {
private:
Endpoint* _endpoint;
size_t _acceptFailures = 0;
std::atomic<bool> _bound;
boost::asio::io_service* _ioService;
std::shared_ptr<Acceptor> _acceptor;
Mutex _shutdownMutex;
bool _bound;
std::unique_ptr<Acceptor> _acceptor;
std::function<void(boost::system::error_code const&)> _handler;
};
}

View File

@ -34,6 +34,7 @@
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/Thread.h"
#include "Basics/WorkMonitor.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/GeneralResponse.h"
@ -440,6 +441,18 @@ void Scheduler::shutdown() {
deleteOldThreads();
// remove all queued work descriptions in the work monitor first
// before freeing the io service a few lines later
// this is required because the work descriptions may have captured
// HttpCommTasks etc. which have references to the io service and
// access it in their destructors
// so the proper shutdown order is:
// - stop accepting further requests (already done by GeneralServerFeature::stop)
// - cancel all running scheduler tasks
// - free all work descriptions in work monitor
// - delete io service
WorkMonitor::clearWorkDescriptions();
_managerService.reset();
_ioService.reset();
}

View File

@ -56,10 +56,12 @@ class RestHandlerFactory;
}
namespace transaction {
class Context;
class ContextData;
}
class StorageEngine : public application_features::ApplicationFeature {
public:
// create the storage engine
@ -418,7 +420,14 @@ class StorageEngine : public application_features::ApplicationFeature {
std::string const& collectionName,
TRI_voc_tick_t maxTick,
std::string& errorMsg) = 0;
virtual Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder) = 0;
virtual Result createTickRanges(VPackBuilder& builder) = 0;
virtual Result firstTick(uint64_t& tick) = 0;
virtual Result lastLogger(TRI_vocbase_t* vocbase
,std::shared_ptr<transaction::Context>
,uint64_t tickStart, uint64_t tickEnd
,std::shared_ptr<VPackBuilder>& builderSPtr) = 0;
void getCapabilities(VPackBuilder& builder) const {
builder.openObject();
builder.add("name", VPackValue(typeName()));

View File

@ -34,7 +34,6 @@
#include "Rest/HttpRequest.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Statistics/RequestStatistics.h"
#include "VocBase/vocbase.h"
using namespace arangodb;
@ -43,6 +42,24 @@ using namespace arangodb::rest;
// -----------------------------------------------------------------------------
// --SECTION-- WorkMonitor
// -----------------------------------------------------------------------------
bool WorkMonitor::clearWorkDescriptions() {
bool found = false;
WorkDescription* desc;
// handle freeable work descriptions
while (_freeableWorkDescription.pop(desc)) {
found = true;
if (desc != nullptr) {
deleteWorkDescription(desc, false);
found = true;
desc = nullptr;
}
}
return found;
}
void WorkMonitor::run() {
CONDITION_LOCKER(guard, _waiter);
@ -54,17 +71,7 @@ void WorkMonitor::run() {
// clean old entries and create summary if requested
while (!isStopping()) {
try {
bool found = false;
WorkDescription* desc;
// handle freeable work descriptions
while (_freeableWorkDescription.pop(desc)) {
found = true;
if (desc != nullptr) {
deleteWorkDescription(desc, false);
}
}
bool found = clearWorkDescriptions();
if (found) {
s = minSleep;
@ -129,14 +136,9 @@ void WorkMonitor::run() {
_stopped.store(true);
// cleanup old entries
clearWorkDescriptions();
WorkDescription* desc;
while (_freeableWorkDescription.pop(desc)) {
if (desc != nullptr) {
deleteWorkDescription(desc, false);
}
}
while (_emptyWorkDescription.pop(desc)) {
if (desc != nullptr) {
delete desc;

View File

@ -24,22 +24,17 @@
#include "Basics/ReadLocker.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h"
// FIXME to be removed (should be storage engine independent - get it working now)
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/mmfiles-replication-dump.h"
#include "Replication/InitialSyncer.h"
#include "Rest/Version.h"
#include "RestServer/ServerIdFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "V8/v8-conv.h"
#include "V8/v8-globals.h"
#include "V8/v8-utils.h"
#include "V8/v8-vpack.h"
#include "V8Server/v8-vocbaseprivate.h"
#include "v8-replication.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBReplicationTailing.h"
#include <velocypack/Builder.h>
#include <velocypack/Parser.h>
@ -62,42 +57,16 @@ static void JS_StateLoggerReplication(
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
StorageEngine* engine = EngineSelectorFeature::ENGINE;
v8::Handle<v8::Object> result = v8::Object::New(isolate);
if(engineName == "mmfiles"){
v8::Handle<v8::Object> state = v8::Object::New(isolate);
MMFilesLogfileManagerState const s = MMFilesLogfileManager::instance()->state();
state->Set(TRI_V8_ASCII_STRING("running"), v8::True(isolate));
state->Set(TRI_V8_ASCII_STRING("lastLogTick"),
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastCommittedTick));
state->Set(TRI_V8_ASCII_STRING("lastUncommittedLogTick"),
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastAssignedTick));
state->Set(TRI_V8_ASCII_STRING("totalEvents"),
v8::Number::New(isolate, static_cast<double>(s.numEvents + s.numEventsSync)));
state->Set(TRI_V8_ASCII_STRING("time"), TRI_V8_STD_STRING(s.timeString));
result->Set(TRI_V8_ASCII_STRING("state"), state);
v8::Handle<v8::Object> server = v8::Object::New(isolate);
server->Set(TRI_V8_ASCII_STRING("version"),
TRI_V8_ASCII_STRING(ARANGODB_VERSION));
server->Set(TRI_V8_ASCII_STRING("serverId"),
TRI_V8_STD_STRING(StringUtils::itoa(ServerIdFeature::getId())));
result->Set(TRI_V8_ASCII_STRING("server"), server);
v8::Handle<v8::Object> clients = v8::Object::New(isolate);
result->Set(TRI_V8_ASCII_STRING("clients"), clients);
} else if (engineName == "rocksdb") {
VPackBuilder builder;
auto res = rocksutils::globalRocksEngine()->createLoggerState(nullptr,builder);
if(res.fail()){
TRI_V8_THROW_EXCEPTION(res);
}
v8::Handle<v8::Value>resultValue = TRI_VPackToV8(isolate, builder.slice());
result = v8::Handle<v8::Object>::Cast(resultValue);
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
VPackBuilder builder;
auto res = engine->createLoggerState(nullptr,builder);
if(res.fail()){
TRI_V8_THROW_EXCEPTION(res);
}
v8::Handle<v8::Value>resultValue = TRI_VPackToV8(isolate, builder.slice());
result = v8::Handle<v8::Object>::Cast(resultValue);
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
@ -112,63 +81,16 @@ static void JS_TickRangesLoggerReplication(
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
v8::Handle<v8::Array> result;
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if (engineName == "mmfiles") {
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
result = v8::Array::New(isolate, (int)ranges.size());
uint32_t i = 0;
for (auto& it : ranges) {
v8::Handle<v8::Object> df = v8::Object::New(isolate);
df->ForceSet(TRI_V8_ASCII_STRING("datafile"), TRI_V8_STD_STRING(it.filename));
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STD_STRING(it.state));
df->ForceSet(TRI_V8_ASCII_STRING("tickMin"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, it.tickMin));
df->ForceSet(TRI_V8_ASCII_STRING("tickMax"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, it.tickMax));
result->Set(i++, df);
}
} else if (engineName == "rocksdb") {
rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB();
rocksdb::VectorLogPtr walFiles;
rocksdb::Status s = tdb->GetSortedWalFiles(walFiles);
if (!s.ok()) {
Result r = rocksutils::convertStatus(s);
TRI_V8_THROW_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
result = v8::Array::New(isolate, (int)walFiles.size());
for(uint32_t i = 0; i < walFiles.size(); i++) {
std::unique_ptr<rocksdb::LogFile>& logfile = walFiles[i];
v8::Handle<v8::Object> df = v8::Object::New(isolate);
df->ForceSet(TRI_V8_ASCII_STRING("datafile"), TRI_V8_STD_STRING(logfile->PathName()));
// setting state of each file
if (logfile->Type() == rocksdb::WalFileType::kAliveLogFile) {
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STRING("open"));
} else if (logfile->Type() == rocksdb::WalFileType::kArchivedLogFile) {
df->ForceSet(TRI_V8_ASCII_STRING("state"), TRI_V8_STRING("collected"));
}
rocksdb::SequenceNumber min = logfile->StartSequence();
df->ForceSet(TRI_V8_ASCII_STRING("tickMin"),
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, min));
rocksdb::SequenceNumber max;
if (i+1 < walFiles.size()) {
max = walFiles[i+1]->StartSequence();
} else {
max = tdb->GetLatestSequenceNumber();
}
df->ForceSet(TRI_V8_ASCII_STRING("tickMax"),
TRI_V8UInt64String<rocksdb::SequenceNumber>(isolate, max));
result->Set(i, df);
}
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
}
VPackBuilder builder;
Result res = EngineSelectorFeature::ENGINE->createTickRanges(builder);
if(res.fail()){
TRI_V8_THROW_EXCEPTION(res);
}
v8::Handle<v8::Value>resultValue = TRI_VPackToV8(isolate, builder.slice());
result = v8::Handle<v8::Array>::Cast(resultValue);
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
}
@ -183,35 +105,11 @@ static void JS_FirstTickLoggerReplication(
v8::HandleScope scope(isolate);
TRI_voc_tick_t tick = UINT64_MAX;
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if (engineName == "mmfiles") {
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
for (auto& it : ranges) {
if (it.tickMin == 0) {
continue;
}
if (it.tickMin < tick) {
tick = it.tickMin;
}
}
} else if (engineName == "rocksdb") {
rocksdb::TransactionDB *tdb = rocksutils::globalRocksDB();
rocksdb::VectorLogPtr walFiles;
rocksdb::Status s = tdb->GetSortedWalFiles(walFiles);
if (!s.ok()) {
Result r = rocksutils::convertStatus(s);
TRI_V8_THROW_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
// read minium possible tick
if (!walFiles.empty()) {
tick = walFiles[0]->StartSequence();
}
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
Result res = EngineSelectorFeature::ENGINE->firstTick(tick);
if(res.fail()){
TRI_V8_THROW_EXCEPTION(res);
}
if (tick == UINT64_MAX) {
TRI_V8_RETURN(v8::Null(isolate));
}
@ -224,73 +122,40 @@ static void JS_FirstTickLoggerReplication(
/// @brief get the last WAL entries
////////////////////////////////////////////////////////////////////////////////
static void JS_LastLoggerReplication(
v8::FunctionCallbackInfo<v8::Value> const& args) {
static void JS_LastLoggerReplication( v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
if (vocbase == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (args.Length() != 2) {
TRI_V8_THROW_EXCEPTION_USAGE(
"REPLICATION_LOGGER_LAST(<fromTick>, <toTick>)");
TRI_V8_THROW_EXCEPTION_USAGE("REPLICATION_LOGGER_LAST(<fromTick>, <toTick>)");
}
TRI_voc_tick_t tickStart = TRI_ObjectToUInt64(args[0], true);
TRI_voc_tick_t tickEnd = TRI_ObjectToUInt64(args[1], true);
if (tickEnd <= tickStart) {
TRI_V8_THROW_EXCEPTION_USAGE(
"tickStart < tickEnd");
TRI_V8_THROW_EXCEPTION_USAGE("tickStart < tickEnd");
}
v8::Handle<v8::Value> result;
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
if(engineName == "mmfiles"){
auto transactionContext = std::make_shared<transaction::StandaloneContext>(vocbase);
MMFilesReplicationDumpContext dump(transactionContext, 0, true, 0);
int res = MMFilesDumpLogReplication(&dump, std::unordered_set<TRI_voc_tid_t>(),
0, tickStart, tickEnd, true);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
// parsing JSON
VPackParser parser;
parser.parse(dump._buffer->_buffer);
result = TRI_VPackToV8(isolate, VPackSlice(parser.start()));
} else if (engineName == "rocksdb") {
bool includeSystem = true;
size_t chunkSize = 32 * 1024 * 1024; // TODO: determine good default value?
// construct vocbase with proper handler
std::shared_ptr<transaction::Context> transactionContext =
transaction::StandaloneContext::Create(vocbase);
VPackBuilder builder(transactionContext->getVPackOptions());
builder.openArray();
RocksDBReplicationResult rep = rocksutils::tailWal(vocbase, tickStart,
tickEnd, chunkSize,
includeSystem, 0, builder);
builder.close();
if (rep.ok()) {
result = TRI_VPackToV8(isolate, builder.slice(),
transactionContext->getVPackOptions());
} else {
result = v8::Null(isolate);
}
} else {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
auto transactionContext = transaction::StandaloneContext::Create(vocbase);
auto builderSPtr = std::make_shared<VPackBuilder>();
Result res = EngineSelectorFeature::ENGINE->lastLogger(
vocbase, transactionContext, tickStart, tickEnd, builderSPtr);
v8::Handle<v8::Value> result;
if(res.fail()){
result = v8::Null(isolate);
TRI_V8_THROW_EXCEPTION(res);
}
result = TRI_VPackToV8(isolate, builderSPtr->slice(),
transactionContext->getVPackOptions());
TRI_V8_RETURN(result);
TRI_V8_TRY_CATCH_END
}

View File

@ -203,7 +203,7 @@ install(FILES ${ICU_DT}
DESTINATION "${INSTALL_ICU_DT_DEST}"
RENAME ${ICU_DT_DEST})
if (MSVC)
if (MSVC AND NOT(SKIP_PACKAGING))
# so we don't need to ship dll's twice, make it one directory:
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/InstallMacros.cmake)
set(CMAKE_INSTALL_FULL_SBINDIR "${CMAKE_INSTALL_FULL_BINDIR}")

View File

@ -31,9 +31,10 @@ if(SNAPCRAFT_FOUND)
COPY "${SNAPCRAFT_TEMPLATE_DIR}/arangodb.png"
DESTINATION "${SNAPCRAFT_SOURCE_DIR}/"
)
add_custom_target(snap
COMMENT "create snap-package"
COMMAND ${SNAP_EXE} clean ${CPACK_PACKAGE_NAME}
COMMAND ${SNAP_EXE} snap
COMMAND ${CMAKE_COMMAND} -E copy ${SNAPCRAFT_SOURCE_DIR}/${CPACK_PACKAGE_NAME}_${CPACK_PACKAGE_VERSION}*_${ARANGODB_PACKAGE_ARCHITECTURE}.snap ${PROJECT_BINARY_DIR}
DEPENDS TGZ_package

View File

@ -928,7 +928,7 @@ function ReplicationLoggerSuite () {
}
var entry = getLogEntries(tick, 2300);
assertEqual(100, entry.length);
assertEqual(100, entry.length, JSON.stringify(entry));
},
////////////////////////////////////////////////////////////////////////////////
@ -1562,7 +1562,7 @@ function ReplicationLoggerSuite () {
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300 ]);
assertEqual(4, entry.length);
assertEqual(4, entry.length, JSON.stringify(entry));
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);

View File

@ -486,7 +486,10 @@ exports.historian = function () {
}
}
} catch (err) {
require('console').warn('catch error in historian: %s', err.stack);
// errors on shutdown are expected. do not log them in case they occur
if (err.errorNum !== internal.errors.ERROR_SHUTTING_DOWN.code) {
require('console').warn('catch error in historian: %s', err.stack);
}
}
};

View File

@ -55,8 +55,8 @@ RocksDBOptionFeature::RocksDBOptionFeature(
_baseBackgroundCompactions(rocksDBDefaults.base_background_compactions),
_maxBackgroundCompactions(rocksDBDefaults.max_background_compactions),
_maxFlushes(rocksDBDefaults.max_background_flushes),
_numThreadsHigh(1),
_numThreadsLow(1),
_numThreadsHigh(rocksDBDefaults.max_background_flushes),
_numThreadsLow(rocksDBDefaults.max_background_compactions),
_blockCacheSize(8 * 1024 * 1024),
_blockCacheShardBits(4),
_maxLogFileSize(rocksDBDefaults.max_log_file_size),
@ -113,11 +113,13 @@ void RocksDBOptionFeature::collectOptions(
new UInt64Parameter(&_numLevels));
options->addHiddenOption("--rocksdb.max-bytes-for-level-base",
"control maximum total data size for a level",
"control maximum total data size for level-1",
new UInt64Parameter(&_maxBytesForLevelBase));
options->addOption("--rocksdb.max-bytes-for-level-multiplier",
"control maximum total data size for a level",
"maximum number of bytes for level L can be calculated as "
"max-bytes-for-level-base * "
"(max-bytes-for-level-multiplier ^ (L-1))",
new DoubleParameter(&_maxBytesForLevelMultiplier));
options->addHiddenOption(
@ -261,3 +263,27 @@ void RocksDBOptionFeature::validateOptions(
FATAL_ERROR_EXIT();
}
}
void RocksDBOptionFeature::start() {
LOG_TOPIC(TRACE, Logger::FIXME) << "using RocksDB options:"
<< " write_buffer_size: " << _writeBufferSize
<< " max_write_buffer_number: " << _maxWriteBufferNumber
<< " delayed_write_rate: " << _delayedWriteRate
<< " min_write_buffer_number_to_merge: " << _minWriteBufferNumberToMerge
<< " num_levels: " << _numLevels
<< " max_bytes_for_level_base: " << _maxBytesForLevelBase
<< " max_bytes_for_level_multiplier: " << _maxBytesForLevelMultiplier
<< " base_background_compactions: " << _baseBackgroundCompactions
<< " max_background_compactions: " << _maxBackgroundCompactions
<< " max_flushes: " << _maxFlushes
<< " num_threads_high: " << _numThreadsHigh
<< " num_threads_low: " << _numThreadsLow
<< " block_cache_size: " << _blockCacheSize
<< " block_cache_shard_bits: " << _blockCacheShardBits
<< " compaction_read_ahead_size: " << _compactionReadaheadSize
<< " verify_checksums_in_compaction: " << std::boolalpha << _verifyChecksumsInCompaction
<< " optimize_filters_for_hits: " << std::boolalpha << _optimizeFiltersForHits
<< " use_direct_reads: " << std::boolalpha << _useDirectReads
<< " use_direct_writes: " << std::boolalpha << _useDirectWrites
<< " use_fsync: " << std::boolalpha << _useFSync;
}

View File

@ -45,9 +45,7 @@ class RocksDBOptionFeature final
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final{};
void start() override final {}
void unprepare() override final {}
void start() override final;
uint64_t _writeBufferSize;
uint64_t _maxWriteBufferNumber;

View File

@ -25,7 +25,7 @@
#define ARANGODB_BASICS_COMMON_H 1
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN 1
// debug malloc for Windows (only used when DEBUG is set)
#define _CRTDBG_MAP_ALLOC
#include <stdlib.h>

View File

@ -0,0 +1,87 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBLogger.h"
#include "Basics/StringRef.h"
#include "Logger/Logger.h"
using namespace arangodb;
RocksDBLogger::RocksDBLogger(rocksdb::InfoLogLevel level) : rocksdb::Logger(level) {}
RocksDBLogger::~RocksDBLogger() {}
void RocksDBLogger::Logv(const rocksdb::InfoLogLevel logLevel, char const* format, va_list ap) {
if (logLevel < GetInfoLogLevel()) {
return;
}
static constexpr size_t prefixSize = 9; // strlen("rocksdb: ");
char buffer[2048];
memcpy(&buffer[0], "rocksdb: \0", prefixSize); // add trailing \0 byte already for safety
va_list backup;
va_copy(backup, ap);
int length = vsnprintf(&buffer[0] + prefixSize, sizeof(buffer) - prefixSize - 1, format, backup);
va_end(backup);
buffer[sizeof(buffer) - 1] = '\0'; // Windows
if (length == 0) {
return;
}
size_t l = static_cast<size_t>(length) + prefixSize;
if (l >= sizeof(buffer)) {
// truncation!
l = sizeof(buffer) - 1;
}
TRI_ASSERT(l > 0 && l < sizeof(buffer));
if (buffer[l - 1] == '\n' || buffer[l - 1] == '\0') {
// strip tailing \n or \0 in log message
--l;
}
switch (logLevel) {
case rocksdb::InfoLogLevel::DEBUG_LEVEL:
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << StringRef(buffer, l);
break;
case rocksdb::InfoLogLevel::INFO_LEVEL:
LOG_TOPIC(INFO, arangodb::Logger::FIXME) << StringRef(buffer, l);
break;
case rocksdb::InfoLogLevel::WARN_LEVEL:
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << StringRef(buffer, l);
break;
case rocksdb::InfoLogLevel::ERROR_LEVEL:
case rocksdb::InfoLogLevel::FATAL_LEVEL:
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << StringRef(buffer, l);
break;
default: {
// ignore other levels
}
}
}
void RocksDBLogger::Logv(char const* format, va_list ap) {
// forward to the level-aware method
Logv(rocksdb::InfoLogLevel::INFO_LEVEL, format, ap);
}

View File

@ -0,0 +1,51 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_BASICS_ROCKSDB_LOGGER_H
#define ARANGODB_BASICS_ROCKSDB_LOGGER_H 1
#include "Basics/Common.h"
#include <rocksdb/env.h>
namespace arangodb {
class RocksDBLogger final : public rocksdb::Logger {
public:
explicit RocksDBLogger(rocksdb::InfoLogLevel level);
~RocksDBLogger();
// intentionally do not log header information here
// as this does not seem to honor the loglevel correctly
void LogHeader(const char* format, va_list ap) override {}
void Logv(char const* format, va_list ap) override;
void Logv(const rocksdb::InfoLogLevel, char const* format, va_list ap) override;
// nothing to do here, as ArangoDB logger infrastructure takes care of flushing itself
void Flush() override {}
};
} // namespace arangodb
#endif

View File

@ -244,7 +244,9 @@ void Thread::shutdown() {
if (_state.load() == ThreadState::STARTED) {
beginShutdown();
if (!isSilent() && _state.load() != ThreadState::STOPPING) {
if (!isSilent() &&
_state.load() != ThreadState::STOPPING &&
_state.load() != ThreadState::STOPPED) {
LOG_TOPIC(WARN, Logger::THREADS) << "forcefully shutting down thread '"
<< _name << "' in state "
<< stringify(_state.load());

View File

@ -67,6 +67,7 @@ class WorkMonitor : public Thread {
static void initialize();
static void shutdown();
static void clearHandlers();
static bool clearWorkDescriptions();
private:
static WorkDescription* createWorkDescription(WorkType);

View File

@ -48,3 +48,8 @@ void WorkMonitor::addWorkOverview(std::shared_ptr<rest::RestHandler>,
void WorkMonitor::clearAllHandlers() {
TRI_ASSERT(false);
}
bool WorkMonitor::clearWorkDescriptions() {
TRI_ASSERT(false);
return false;
}

View File

@ -135,6 +135,7 @@ add_library(${LIB_ARANGO} STATIC
Basics/Nonce.cpp
Basics/OpenFilesTracker.cpp
Basics/ReadWriteLockCPP11.cpp
Basics/RocksDBLogger.cpp
Basics/StaticStrings.cpp
Basics/StringBuffer.cpp
Basics/StringHeap.cpp