1
0
Fork 0
Jan 2018-10-19 15:26:23 +02:00 committed by GitHub
parent c0455e9c60
commit 1002928b5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 203 additions and 122 deletions

View File

@ -95,7 +95,7 @@ ClusterCollection::ClusterCollection(
TRI_ERROR_BAD_PARAMETER,
"volatile collections are unsupported in the RocksDB engine");
}
} else {
} else if (_engineType != ClusterEngineType::MockEngine) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
@ -144,7 +144,6 @@ Result ClusterCollection::updateProperties(VPackSlice const& slice,
// duplicate all the error handling of the storage engines
if (_engineType == ClusterEngineType::MMFilesEngine) { // duplicate the error validation
// validation
uint32_t tmp = Helper::getNumericValue<uint32_t>(
slice, "indexBuckets",
@ -195,12 +194,22 @@ Result ClusterCollection::updateProperties(VPackSlice const& slice,
merge.add("cacheEnabled",
VPackValue(Helper::readBooleanValue(slice, "cacheEnabled", def)));
} else {
} else if (_engineType != ClusterEngineType::MockEngine) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
merge.close();
_info = VPackCollection::merge(_info.slice(), merge.slice(), true);
TRI_ASSERT(merge.slice().isObject());
TRI_ASSERT(merge.isClosed());
TRI_ASSERT(_info.slice().isObject());
TRI_ASSERT(_info.isClosed());
VPackBuilder tmp = VPackCollection::merge(_info.slice(), merge.slice(), true);
_info = std::move(tmp);
TRI_ASSERT(_info.slice().isObject());
TRI_ASSERT(_info.isClosed());
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index>& idx : _indexes) {
@ -242,7 +251,7 @@ void ClusterCollection::getPropertiesVPack(velocypack::Builder& result) const {
result.add("cacheEnabled", VPackValue(Helper::readBooleanValue(
_info.slice(), "cacheEnabled", false)));
} else {
} else if (_engineType != ClusterEngineType::MockEngine) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}

View File

@ -64,6 +64,9 @@ using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::options;
// fall back to the using the mock storage engine
bool ClusterEngine::Mocking = false;
// create the storage engine
ClusterEngine::ClusterEngine(application_features::ApplicationServer& server)
: StorageEngine(
@ -86,13 +89,19 @@ bool ClusterEngine::isMMFiles() const {
return _actualEngine && _actualEngine->name() == MMFilesEngine::FeatureName;
}
bool ClusterEngine::isMock() const {
return ClusterEngine::Mocking || (_actualEngine && _actualEngine->name() == "Mock");
}
ClusterEngineType ClusterEngine::engineType() const {
TRI_ASSERT(_actualEngine != nullptr);
if (_actualEngine->name() == MMFilesEngine::FeatureName) {
if (isMMFiles()) {
return ClusterEngineType::MMFilesEngine;
} else if (_actualEngine->name() == RocksDBEngine::FeatureName) {
} else if (isRocksDB()) {
return ClusterEngineType::RocksDBEngine;
} else if (isMock()) {
return ClusterEngineType::MockEngine;
}
TRI_ASSERT(false);
@ -105,13 +114,6 @@ ClusterEngineType ClusterEngine::engineType() const {
// preparation phase for storage engine. can be used for internal setup.
// the storage engine must not start any threads here or write any files
void ClusterEngine::prepare() {
// get base path from DatabasePathFeature
auto databasePathFeature =
application_features::ApplicationServer::getFeature<DatabasePathFeature>(
"DatabasePath");
_basePath = databasePathFeature->directory();
TRI_ASSERT(!_basePath.empty());
if (!ServerState::instance()->isCoordinator()) {
setEnabled(false);
}
@ -216,10 +218,6 @@ int ClusterEngine::getViews(
return TRI_ERROR_NO_ERROR;
}
std::string ClusterEngine::versionFilename(TRI_voc_tick_t id) const {
return _basePath + TRI_DIR_SEPARATOR_CHAR + "VERSION-" + std::to_string(id);
}
VPackBuilder ClusterEngine::getReplicationApplierConfiguration(
TRI_vocbase_t& vocbase,
int& status
@ -391,7 +389,7 @@ void ClusterEngine::addOptimizerRules() {
MMFilesOptimizerRules::registerResources();
} else if (engineType() == ClusterEngineType::RocksDBEngine) {
RocksDBOptimizerRules::registerResources();
} else {
} else if (engineType() != ClusterEngineType::MockEngine) {
TRI_ASSERT(false);
}
}

View File

@ -58,6 +58,7 @@ class ClusterEngine final : public StorageEngine {
StorageEngine* actualEngine() const { return _actualEngine; }
bool isRocksDB() const;
bool isMMFiles() const;
bool isMock() const;
ClusterEngineType engineType() const;
// storage engine overrides
@ -127,9 +128,13 @@ class ClusterEngine final : public StorageEngine {
arangodb::velocypack::Builder& result
) override;
std::string versionFilename(TRI_voc_tick_t id) const override;
std::string versionFilename(TRI_voc_tick_t id) const override {
// the cluster engine does not have any versioning information
return std::string();
}
std::string databasePath(TRI_vocbase_t const* vocbase) const override {
return _basePath;
// the cluster engine does not have any database path
return std::string();
}
std::string collectionPath(
TRI_vocbase_t const& vocbase,
@ -342,6 +347,9 @@ class ClusterEngine final : public StorageEngine {
static std::string const EngineName;
static std::string const FeatureName;
// mock mode
static bool Mocking;
private:
/// path to arangodb data dir
std::string _basePath;

View File

@ -63,6 +63,7 @@ ClusterIndex::ClusterIndex(
_info(info),
_clusterSelectivity(/* default */0.1) {
TRI_ASSERT(_info.slice().isObject());
TRI_ASSERT(_info.isClosed());
}
ClusterIndex::~ClusterIndex() {}
@ -109,6 +110,8 @@ bool ClusterIndex::hasSelectivityEstimate() const {
_indexType == Index::TRI_IDX_TYPE_HASH_INDEX ||
_indexType == Index::TRI_IDX_TYPE_SKIPLIST_INDEX ||
_indexType == Index::TRI_IDX_TYPE_PERSISTENT_INDEX;
} else if (_engineType == ClusterEngineType::MockEngine) {
return false;
}
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -137,6 +140,8 @@ bool ClusterIndex::isPersistent() const {
return _indexType == Index::TRI_IDX_TYPE_PERSISTENT_INDEX;
} else if (_engineType == ClusterEngineType::RocksDBEngine) {
return true;
} else if (_engineType == ClusterEngineType::MockEngine) {
return false;
}
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -153,6 +158,8 @@ bool ClusterIndex::isSorted() const {
_indexType == Index::TRI_IDX_TYPE_SKIPLIST_INDEX ||
_indexType == Index::TRI_IDX_TYPE_PERSISTENT_INDEX ||
_indexType == Index::TRI_IDX_TYPE_FULLTEXT_INDEX;
} else if (_engineType == ClusterEngineType::MockEngine) {
return false;
}
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -176,7 +183,12 @@ void ClusterIndex::updateProperties(velocypack::Slice const& slice) {
}
merge.close();
_info = VPackCollection::merge(_info.slice(), merge.slice(), true);
TRI_ASSERT(merge.slice().isObject());
TRI_ASSERT(_info.slice().isObject());
VPackBuilder tmp = VPackCollection::merge(_info.slice(), merge.slice(), true);
_info = std::move(tmp);
TRI_ASSERT(_info.slice().isObject());
TRI_ASSERT(_info.isClosed());
}
bool ClusterIndex::hasCoveringIterator() const {
@ -190,7 +202,6 @@ bool ClusterIndex::hasCoveringIterator() const {
return false;
}
bool ClusterIndex::matchesDefinition(VPackSlice const& info) const {
// TODO implement faster version of this
return Index::Compare(_info.slice(), info);
@ -227,7 +238,7 @@ bool ClusterIndex::supportsFilterCondition(
} else if (_engineType == ClusterEngineType::RocksDBEngine) {
return PersistentIndexAttributeMatcher::supportsFilterCondition(allIndexes, this, node, reference, itemsInIndex,
estimatedItems, estimatedCost);
}
}
break;
}
case TRI_IDX_TYPE_EDGE_INDEX: {
@ -256,6 +267,10 @@ bool ClusterIndex::supportsFilterCondition(
case TRI_IDX_TYPE_UNKNOWN:
break;
}
if (_engineType == ClusterEngineType::MockEngine) {
return false;
}
TRI_ASSERT(false);
return false;
}
@ -312,6 +327,10 @@ bool ClusterIndex::supportsSortCondition(
case TRI_IDX_TYPE_UNKNOWN:
break;
}
if (_engineType == ClusterEngineType::MockEngine) {
return false;
}
TRI_ASSERT(false);
return false;
}
@ -365,6 +384,10 @@ aql::AstNode* ClusterIndex::specializeCondition(
case TRI_IDX_TYPE_UNKNOWN:
break;
}
if (_engineType == ClusterEngineType::MockEngine) {
return node;
}
TRI_ASSERT(false);
return node;
}

View File

@ -27,7 +27,8 @@ namespace arangodb {
enum class ClusterEngineType {
MMFilesEngine,
RocksDBEngine
RocksDBEngine,
MockEngine
};
} // namespace arangodb

View File

@ -27,11 +27,13 @@
#include "IResearchFeature.h"
#include "IResearchViewCoordinator.h"
#include "VelocyPackHelper.h"
#include "ClusterEngine/ClusterEngine.h"
#include "Cluster/ClusterInfo.h"
#include "Basics/StringUtils.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBIndex.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/LogicalCollection.h"
#include "velocypack/Builder.h"
#include "velocypack/Slice.h"
@ -63,7 +65,7 @@ namespace iresearch {
IResearchLinkCoordinator::IResearchLinkCoordinator(
TRI_idx_iid_t id,
LogicalCollection& collection
): arangodb::Index(id, collection, IResearchLinkHelper::emptyIndexSlice()) {
): arangodb::ClusterIndex(id, collection, static_cast<arangodb::ClusterEngine*>(arangodb::EngineSelectorFeature::ENGINE)->engineType(), arangodb::Index::TRI_IDX_TYPE_IRESEARCH_LINK, IResearchLinkHelper::emptyIndexSlice()) {
TRI_ASSERT(ServerState::instance()->isCoordinator());
_unique = false; // cannot be unique since multiple fields are indexed
_sparse = true; // always sparse

View File

@ -25,7 +25,8 @@
#define ARANGODB_IRESEARCH__IRESEARCH_LINK_COORDINATOR_H 1
#include "Indexes/Index.h"
#include "IResearchLinkMeta.h"
#include "ClusterEngine/ClusterIndex.h"
#include "IResearch/IResearchLinkMeta.h"
namespace arangodb {
namespace iresearch {
@ -36,7 +37,7 @@ class IResearchViewCoordinator;
/// @brief common base class for functionality required to link an ArangoDB
/// LogicalCollection with an IResearchView on a coordinator in cluster
////////////////////////////////////////////////////////////////////////////////
class IResearchLinkCoordinator final: public arangodb::Index {
class IResearchLinkCoordinator final: public arangodb::ClusterIndex {
public:
DECLARE_SHARED_PTR(Index);

View File

@ -874,6 +874,7 @@ std::string MMFilesEngine::createCollection(
LogicalCollection const& collection
) {
auto path = databasePath(&vocbase);
TRI_ASSERT(!path.empty());
// sanity check
if (sizeof(MMFilesDatafileHeaderMarker) + sizeof(MMFilesDatafileFooterMarker) >

View File

@ -171,10 +171,12 @@ std::shared_ptr<TailingSyncer> DatabaseReplicationApplier::buildTailingSyncer(TR
std::string DatabaseReplicationApplier::getStateFilename() const {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
return arangodb::basics::FileUtils::buildFilename(
engine->databasePath(&_vocbase), "REPLICATION-APPLIER-STATE"
);
std::string const path = engine->databasePath(&_vocbase);
if (path.empty()) {
return std::string();
}
return arangodb::basics::FileUtils::buildFilename(path, "REPLICATION-APPLIER-STATE");
}
} // arangodb

View File

@ -106,8 +106,11 @@ std::string GlobalReplicationApplier::getStateFilename() const {
arangodb::SystemDatabaseFeature
>();
auto vocbase = sysDbFeature->use();
std::string const path = engine->databasePath(vocbase.get());
if (path.empty()) {
return std::string();
}
return arangodb::basics::FileUtils::buildFilename(
engine->databasePath(vocbase.get()), "GLOBAL-REPLICATION-APPLIER-STATE"
);
return arangodb::basics::FileUtils::buildFilename(path, "GLOBAL-REPLICATION-APPLIER-STATE");
}

View File

@ -386,12 +386,16 @@ void ReplicationApplier::removeState() {
if (!applies()) {
return;
}
std::string const filename = getStateFilename();
if (filename.empty()) {
// will happen during testing and for coordinator engine
return;
}
WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock);
_state.reset(false);
std::string const filename = getStateFilename();
if (TRI_ExistsFile(filename.c_str())) {
LOG_TOPIC(TRACE, Logger::REPLICATION) << "removing replication state file '"
<< filename << "' for " << _databaseName;
@ -436,6 +440,11 @@ bool ReplicationApplier::loadState() {
}
std::string const filename = getStateFilename();
if (filename.empty()) {
// will happen during testing and for coordinator engine
return false;
}
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "looking for replication state file '" << filename << "' for " << _databaseName;
@ -486,11 +495,16 @@ void ReplicationApplier::persistState(bool doSync) {
if (!applies()) {
return;
}
std::string const filename = getStateFilename();
if (filename.empty()) {
// will happen during testing and for coordinator engine
return;
}
VPackBuilder builder;
_state.toVelocyPack(builder, false);
std::string const filename = getStateFilename();
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "saving replication applier state to file '" << filename << "' for " << _databaseName;

View File

@ -114,6 +114,9 @@ TailingSyncer::TailingSyncer(
// FIXME: move this into engine code
std::string const& engineName = EngineSelectorFeature::ENGINE->typeName();
_supportsSingleOperations = (engineName == "mmfiles");
// Replication for RocksDB expects only one open transaction at a time
_supportsMultipleOpenTransactions = (engineName != "rocksdb");
}
TailingSyncer::~TailingSyncer() { abortOngoingTransactions(); }
@ -500,6 +503,8 @@ Result TailingSyncer::startTransaction(VPackSlice const& slice) {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "starting replication transaction " << tid;
TRI_ASSERT(_ongoingTransactions.empty() || _supportsMultipleOpenTransactions);
auto trx = std::make_unique<ReplicationTransaction>(*vocbase);
Result res = trx->begin();
@ -572,6 +577,8 @@ Result TailingSyncer::commitTransaction(VPackSlice const& slice) {
Result res = trx->commit();
_ongoingTransactions.erase(it);
TRI_ASSERT(_ongoingTransactions.empty() || _supportsMultipleOpenTransactions);
return res;
}
@ -1618,6 +1625,8 @@ Result TailingSyncer::fetchOpenTransactions(TRI_voc_tick_t fromTick,
_ongoingTransactions.emplace(StringUtils::uint64(it.copyString()), nullptr);
}
TRI_ASSERT(_ongoingTransactions.size() <= 1 || _supportsMultipleOpenTransactions);
{
std::string const progress =
"fetched initial master state for from tick " +

View File

@ -197,6 +197,10 @@ class TailingSyncer : public Syncer {
/// @brief whether or not master & slave can work in parallel
bool _workInParallel;
/// @brief max parallel open transactions
/// this will be set to false for RocksDB, and to true for MMFiles
bool _supportsMultipleOpenTransactions;
/// @brief which transactions were open and need to be treated specially
std::unordered_map<TRI_voc_tid_t, std::unique_ptr<ReplicationTransaction>>
_ongoingTransactions;

View File

@ -106,8 +106,9 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
builder.clear();
builder.add(velocypack::ValuePair(docKey.data(), docKey.size(),
velocypack::ValueType::String));
trx.remove(col->name(), builder.slice(), options);
++stats.numDocsRemoved;
if (trx.remove(col->name(), builder.slice(), options).ok()) {
++stats.numDocsRemoved;
}
// continue iteration
return true;
}
@ -133,8 +134,9 @@ Result removeKeysOutsideRange(VPackSlice chunkSlice,
builder.clear();
builder.add(velocypack::ValuePair(docKey.data(), docKey.size(),
velocypack::ValueType::String));
trx.remove(col->name(), builder.slice(), options);
++stats.numDocsRemoved;
if (trx.remove(col->name(), builder.slice(), options).ok()) {
++stats.numDocsRemoved;
}
}
// continue iteration until end
@ -272,8 +274,9 @@ Result syncChunkRocksDB(
// we have a local key that is not present remotely
keyBuilder->clear();
keyBuilder->add(VPackValue(localKey));
trx->remove(collectionName, keyBuilder->slice(), options);
++stats.numDocsRemoved;
if (trx->remove(collectionName, keyBuilder->slice(), options).ok()) {
++stats.numDocsRemoved;
}
++nextStart;
} else if (res == 0) {
@ -316,8 +319,9 @@ Result syncChunkRocksDB(
// we have a local key that is not present remotely
keyBuilder->clear();
keyBuilder->add(VPackValue(localKey));
trx->remove(collectionName, keyBuilder->slice(), options);
++stats.numDocsRemoved;
if (trx->remove(collectionName, keyBuilder->slice(), options).ok()) {
++stats.numDocsRemoved;
}
}
++nextStart;
}
@ -439,6 +443,9 @@ Result syncChunkRocksDB(
if (conflictId.isSet()) {
physical->readDocumentWithCallback(trx, conflictId, [&](LocalDocumentId const&, VPackSlice doc) {
res = trx->remove(collectionName, doc, options).result;
if (res.ok()) {
++stats.numDocsRemoved;
}
});
}
return res;
@ -467,6 +474,8 @@ Result syncChunkRocksDB(
return opRes.result;
}
}
++stats.numDocsInserted;
} else {
// REPLACE
TRI_ASSERT(options.indexOperationMode == Index::OperationMode::internal);
@ -490,7 +499,6 @@ Result syncChunkRocksDB(
}
}
}
++stats.numDocsInserted;
}
if (foundLength >= toFetch.size()) {
@ -586,6 +594,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
}
size_t const numChunks = static_cast<size_t>(chunkSlice.length());
uint64_t const numberDocumentsRemovedBeforeStart = stats.numDocsRemoved;
{
if (syncer.isAborted()) {
@ -610,7 +619,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
res.errorNumber(),
std::string("unable to start transaction: ") + res.errorMessage());
}
// We do not take responsibility for the index.
// The LogicalCollection is protected by trx.
// Neither it nor its indexes can be invalidated
@ -683,8 +692,10 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
// smaller values than lowKey mean they don't exist remotely
tempBuilder.clear();
tempBuilder.add(VPackValue(docKey));
trx.remove(col->name(), tempBuilder.slice(), options);
++stats.numDocsRemoved;
if (trx.remove(col->name(), tempBuilder.slice(), options).ok()) {
++stats.numDocsRemoved;
}
return;
}
@ -749,8 +760,10 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
LogicalCollection* coll = trx.documentCollection();
auto iterator = createPrimaryIndexIterator(&trx, coll);
uint64_t documentsFound = 0;
iterator.next(
[&](rocksdb::Slice const& rocksKey, rocksdb::Slice const& rocksValue) {
++documentsFound;
std::string docKey = RocksDBKey::primaryKey(rocksKey).toString();
TRI_voc_rid_t docRev;
if (!RocksDBValue::revisionId(rocksValue, docRev)) {
@ -779,6 +792,15 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
}
}
{
uint64_t numberDocumentsAfterSync = documentsFound + stats.numDocsInserted - (stats.numDocsRemoved - numberDocumentsRemovedBeforeStart);
uint64_t numberDocumentsDueToCounter = col->numberDocuments(&trx, transaction::CountType::Normal);
syncer.setProgress(std::string("number of remaining documents in collection '") + col->name() + "' " + std::to_string(numberDocumentsAfterSync) + ", number of documents due to collection count: " + std::to_string(numberDocumentsDueToCounter));
if (numberDocumentsAfterSync != numberDocumentsDueToCounter) {
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "number of remaining documents in collection '" + col->name() + "' is " + std::to_string(numberDocumentsAfterSync) + " and differs from number of documents returned by collection count " + std::to_string(numberDocumentsDueToCounter);
}
}
res = trx.commit();
if (!res.ok()) {
return res;

View File

@ -219,7 +219,6 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
}
bool includeSystem = _request->parsedValue("includeSystem", true);
// TODO: determine good default value?
uint64_t chunkSize = _request->parsedValue<uint64_t>("chunkSize", 1024 * 1024);
grantTemporaryRights();
@ -462,19 +461,14 @@ void RocksDBRestReplicationHandler::handleCommandGetKeys() {
}
static uint64_t const DefaultChunkSize = 5000;
uint64_t chunkSize = DefaultChunkSize;
// determine chunk size
bool found;
std::string const& value = _request->value("chunkSize", found);
uint64_t chunkSize = _request->parsedValue("chunkSize", DefaultChunkSize);
if (found) {
chunkSize = StringUtils::uint64(value);
if (chunkSize < 100) {
chunkSize = DefaultChunkSize;
} else if (chunkSize > 20000) {
chunkSize = 20000;
}
if (chunkSize < 100) {
chunkSize = DefaultChunkSize;
} else if (chunkSize > 20000) {
chunkSize = 20000;
}
//first suffix needs to be the batch id
@ -515,36 +509,28 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
}
static uint64_t const DefaultChunkSize = 5000;
uint64_t chunkSize = DefaultChunkSize;
// determine chunk size
bool found;
std::string const& value1 = _request->value("chunkSize", found);
uint64_t chunkSize = _request->parsedValue("chunkSize", DefaultChunkSize);
if (found) {
chunkSize = StringUtils::uint64(value1);
if (chunkSize < 100) {
chunkSize = DefaultChunkSize;
} else if (chunkSize > 20000) {
chunkSize = 20000;
}
if (chunkSize < 100) {
chunkSize = DefaultChunkSize;
} else if (chunkSize > 20000) {
chunkSize = 20000;
}
// chunk is supplied by old clients, low is an optimization
// for rocksdb, because seeking should be cheaper
std::string const& value2 = _request->value("chunk", found);
size_t chunk = 0;
if (found) {
chunk = static_cast<size_t>(StringUtils::uint64(value2));
}
std::string const& lowKey = _request->value("low", found);
size_t chunk = static_cast<size_t>(_request->parsedValue("chunk", uint64_t(0)));
std::string const& value3 = _request->value("type", found);
bool found;
std::string const& lowKey = _request->value("low", found);
std::string const& value = _request->value("type", found);
bool keys = true;
if (value3 == "keys") {
if (value == "keys") {
keys = true;
} else if (value3 == "docs") {
} else if (value == "docs") {
keys = false;
} else {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
@ -554,9 +540,9 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
size_t offsetInChunk = 0;
size_t maxChunkSize = SIZE_MAX;
std::string const& value4 = _request->value("offset", found);
std::string const& value2 = _request->value("offset", found);
if (found) {
offsetInChunk = static_cast<size_t>(StringUtils::uint64(value4));
offsetInChunk = static_cast<size_t>(StringUtils::uint64(value2));
// "offset" was introduced with ArangoDB 3.3. if the client sends it,
// it means we can adapt the result size dynamically and the client
// may refetch data for the same chunk

View File

@ -113,12 +113,16 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC
_lastWrittenSequence(0) {}
bool Continue() override {
if (_stopOnNext) {
return false;
}
if (_responseSize > _maxResponseSize) {
// it should only be possible to be in the middle of a huge batch,
// if and only if we are in one big transaction. We may not stop
// while
if (_state == TRANSACTION && _removedDocRid == 0) {
return false;
// this will make us process one more marker still
_stopOnNext = true;
}
}
return true;
@ -128,8 +132,8 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC
// rocksdb does not count LogData towards sequence-number
RocksDBLogType type = RocksDBLogValue::type(blob);
//LOG_TOPIC(ERR, Logger::ENGINES) << "[LOG] " << _currentSequence
// << " " << rocksDBLogTypeName(type);
// LOG_TOPIC(WARN, Logger::REPLICATION) << "[LOG] " << _currentSequence
// << " " << rocksDBLogTypeName(type);
switch (type) {
case RocksDBLogType::DatabaseCreate:
resetTransientState(); // finish ongoing trx
@ -402,8 +406,7 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC
rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key,
rocksdb::Slice const& value) override {
incTick();
//LOG_TOPIC(ERR, Logger::ENGINES) << "[PUT] cf: " << column_family_id
// LOG_TOPIC(WARN, Logger::ENGINES) << "[PUT] cf: " << column_family_id
// << ", key:" << key.ToString() << " value: " << value.ToString();
if (column_family_id == _definitionsCF) {
@ -634,6 +637,7 @@ public:
}
void startNewBatch(rocksdb::SequenceNumber startSequence) {
TRI_ASSERT(!_stopOnNext);
// starting new write batch
_startSequence = startSequence;
_currentSequence = startSequence;
@ -645,7 +649,7 @@ public:
}
uint64_t endBatch() {
TRI_ASSERT(_removedDocRid == 0);
TRI_ASSERT(_removedDocRid == 0 || _stopOnNext);
resetTransientState();
return _currentSequence;
}
@ -720,6 +724,7 @@ public:
TRI_voc_tick_t _currentTrxId = 0;
TRI_voc_tick_t _trxDbId = 0; // remove eventually
TRI_voc_rid_t _removedDocRid = 0;
bool _stopOnNext = false;
};
// iterates over WAL starting at 'from' and returns up to 'chunkSize' documents
@ -735,6 +740,7 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
if (chunkSize < 16384) { // we need to have some sensible minimum
chunkSize = 16384;
}
// pre 3.4 breaking up write batches is not supported
size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX;
@ -815,7 +821,7 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
if (!s.ok()) {
result.Result::reset(convertStatus(s, rocksutils::StatusHint::wal));
}
//LOG_TOPIC(WARN, Logger::ENGINES) << "2. firstTick: " << firstTick << " lastWrittenTick: " << lastWrittenTick
//<< " latestTick: " << latestTick;
// LOG_TOPIC(WARN, Logger::REPLICATION) << "2. firstTick: " << firstTick << " lastWrittenTick: " << lastWrittenTick
// << " latestTick: " << latestTick;
return result;
}

View File

@ -88,7 +88,6 @@ void EngineSelectorFeature::prepare() {
TRI_ASSERT(_engine != "auto");
if (ServerState::instance()->isCoordinator()) {
ClusterEngine* ce = application_features::ApplicationServer::getFeature<ClusterEngine>("ClusterEngine");
ENGINE = ce;

View File

@ -1321,6 +1321,7 @@ static void JS_PropertiesVocbaseCol(
TRI_V8_THROW_EXCEPTION(res);
}
}
TRI_ASSERT(builder.isClosed());
Result res = methods::Collections::updateProperties(consoleColl, builder.slice());
if (res.fail() && ServerState::instance()->isCoordinator()) {
TRI_V8_THROW_EXCEPTION(res);

View File

@ -155,7 +155,10 @@ Result Version::write(TRI_vocbase_t* vocbase,
TRI_ASSERT(engine != nullptr);
std::string versionFile = engine->versionFilename(vocbase->id());
TRI_ASSERT(!versionFile.empty());
if (versionFile.empty()) {
// cluster engine
return Result();
}
VPackOptions opts;
opts.buildUnindexedObjects = true;

View File

@ -31,6 +31,7 @@
#include "Aql/ExecutionPlan.h"
#include "Aql/ExpressionContext.h"
#include "Aql/Ast.h"
#include "ClusterEngine/ClusterEngine.h"
#include "Basics/files.h"
#include "RestServer/DatabasePathFeature.h"
#include "V8/v8-utils.h"
@ -130,6 +131,7 @@ namespace tests {
void init(bool withICU /*= false*/) {
arangodb::transaction::Methods::clearDataSourceRegistrationCallbacks();
ClusterEngine::Mocking = true;
}
// @Note: once V8 is initialized all 'CATCH' errors will result in SIGILL

View File

@ -69,15 +69,6 @@ function DatabaseSuite () {
assertEqual("_system", internal.db._name());
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test _path function
////////////////////////////////////////////////////////////////////////////////
testPath : function () {
assertTrue(typeof internal.db._path() === "string");
assertTrue(internal.db._path() !== "");
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test _isSystem function
////////////////////////////////////////////////////////////////////////////////

View File

@ -112,24 +112,23 @@ function ReplicationSuite() {
var slaveState = replication.applier.state();
if (slaveState.state.lastError.errorNum > 0) {
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
break;
}
if (!slaveState.state.running) {
console.log("slave is not running");
console.topic("replication=error", "slave is not running");
break;
}
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
break;
}
if (!printed) {
console.log("waiting for slave to catch up");
console.topic("replication=debug", "waiting for slave to catch up");
printed = true;
}
internal.wait(0.5, false);

View File

@ -96,24 +96,23 @@ function ReplicationSuite() {
let slaveState = replication.globalApplier.state();
if (slaveState.state.lastError.errorNum > 0) {
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
break;
}
if (!slaveState.state.running) {
console.log("slave is not running");
console.topic("replication=error", "slave is not running");
break;
}
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
break;
}
if (!printed) {
console.log("waiting for slave to catch up");
console.topic("replication=debug", "waiting for slave to catch up");
printed = true;
}
internal.wait(0.5, false);

View File

@ -112,24 +112,23 @@ function ReplicationSuite() {
var slaveState = replication.applier.state();
if (slaveState.state.lastError.errorNum > 0) {
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
break;
}
if (!slaveState.state.running) {
console.log("slave is not running");
console.topic("replication=error", "slave is not running");
break;
}
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
break;
}
if (!printed) {
console.log("waiting for slave to catch up");
console.topic("replication=debug", "waiting for slave to catch up");
printed = true;
}
internal.wait(0.5, false);

View File

@ -114,24 +114,23 @@ function ReplicationSuite() {
var slaveState = replication.applier.state();
if (slaveState.state.lastError.errorNum > 0) {
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
break;
}
if (!slaveState.state.running) {
console.log("slave is not running");
console.topic("replication=error", "slave is not running");
break;
}
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
break;
}
if (!printed) {
console.log("waiting for slave to catch up");
console.topic("replication=debug", "waiting for slave to catch up");
printed = true;
}
internal.wait(0.5, false);