mirror of https://gitee.com/bigwinds/arangodb
Merge remote-tracking branch 'origin/devel' into feature/ldap-auth
This commit is contained in:
commit
579718ad37
|
@ -1,6 +1,14 @@
|
|||
devel
|
||||
-----
|
||||
|
||||
* fixed issue #2450
|
||||
|
||||
* fixed issue #2448
|
||||
|
||||
* fixed issue #2442
|
||||
|
||||
* added 'x-content-type-options: nosniff' to avoid MSIE bug
|
||||
|
||||
* set default value for `--ssl.protocol` from TLSv1 to TLSv1.2.
|
||||
|
||||
* AQL breaking change in cluster:
|
||||
|
@ -245,6 +253,7 @@ v3.1.19 (XXXX-XX-XX)
|
|||
* fixed issue #2440
|
||||
|
||||
|
||||
|
||||
v3.1.18 (2017-04-18)
|
||||
--------------------
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ The request function takes the following options:
|
|||
* *json*: if `true`, *body* will be serialized to a JSON string and the *Content-Type* header will be set to `"application/json"`. Additionally the response body will also be parsed as JSON (unless *encoding* is set to `null`). Default: `false`.
|
||||
* *form* (optional): when set to a string or object and no *body* has been set, *body* will be set to a querystring representation of that value and the *Content-Type* header will be set to `"application/x-www-form-urlencoded"`. Also see *useQuerystring*.
|
||||
* *auth* (optional): an object with the properties *username* and *password* for HTTP Basic authentication or the property *bearer* for HTTP Bearer token authentication.
|
||||
* *sslProtocol* (optional): which tls version should be used to connect to the url. The default is `4` which is TLS 1.0. See [ssl protocol](../../Administration/Configuration/SSL.md#ssl-protocol) for more opitions.
|
||||
* *followRedirect*: whether HTTP 3xx redirects should be followed. Default: `true`.
|
||||
* *maxRedirects*: the maximum number of redirects to follow. Default: `10`.
|
||||
* *encoding*: encoding to be used for the response body. If set to `null`, the response body will be returned as a `Buffer`. Default: `"utf-8"`.
|
||||
|
|
|
@ -19,7 +19,7 @@ echo "CXX: $CXX"
|
|||
echo
|
||||
echo "$0: compiling ArangoDB"
|
||||
|
||||
(cd build && make -j2)
|
||||
(cd build && make -j4)
|
||||
|
||||
echo
|
||||
echo "$0: testing ArangoDB"
|
||||
|
|
|
@ -1448,11 +1448,6 @@ AgencyCommResult AgencyComm::sendWithFailover(
|
|||
"Inquiry failed (" << inq._statusCode << "). Keep trying ...";
|
||||
continue;
|
||||
}
|
||||
|
||||
AgencyCommManager::MANAGER->failed(std::move(connection), endpoint);
|
||||
endpoint.clear();
|
||||
connection = AgencyCommManager::MANAGER->acquire(endpoint);
|
||||
continue;
|
||||
}
|
||||
|
||||
// sometimes the agency will return a 307 (temporary redirect)
|
||||
|
|
|
@ -101,7 +101,9 @@ Agent::~Agent() {
|
|||
}
|
||||
}
|
||||
|
||||
shutdown();
|
||||
if (!isStopping()) {
|
||||
shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,9 @@ Compactor::Compactor(Agent* agent) :
|
|||
|
||||
/// Dtor shuts down thread
|
||||
Compactor::~Compactor() {
|
||||
//shutdown();
|
||||
if (!isStopping()) {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -82,7 +82,11 @@ Constituent::Constituent()
|
|||
_votedFor(NO_LEADER) {}
|
||||
|
||||
/// Shutdown if not already
|
||||
Constituent::~Constituent() { shutdown(); }
|
||||
Constituent::~Constituent() {
|
||||
if (!isStopping()) {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for sync
|
||||
bool Constituent::waitForSync() const { return _agent->config().waitForSync(); }
|
||||
|
|
|
@ -41,7 +41,11 @@ Inception::Inception() : Thread("Inception"), _agent(nullptr) {}
|
|||
Inception::Inception(Agent* agent) : Thread("Inception"), _agent(agent) {}
|
||||
|
||||
// Shutdown if not already
|
||||
Inception::~Inception() { shutdown(); }
|
||||
Inception::~Inception() {
|
||||
if (!isStopping()) {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/// Gossip to others
|
||||
/// - Get snapshot of gossip peers and agent pool
|
||||
|
|
|
@ -242,7 +242,7 @@ std::vector<Job::shard_t> Job::clones(
|
|||
|
||||
std::vector<shard_t> ret;
|
||||
ret.emplace_back(collection, shard); // add (collection, shard) as first item
|
||||
typedef std::unordered_map<std::string, std::shared_ptr<Node>> UChildren;
|
||||
//typedef std::unordered_map<std::string, std::shared_ptr<Node>> UChildren;
|
||||
|
||||
try {
|
||||
std::string databasePath = planColPrefix + database,
|
||||
|
|
|
@ -151,7 +151,11 @@ Store& Store::operator=(Store&& rhs) {
|
|||
}
|
||||
|
||||
/// Default dtor
|
||||
Store::~Store() { shutdown(); }
|
||||
Store::~Store() {
|
||||
if (!isStopping()) {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply array of queries multiple queries to store
|
||||
/// Return vector of according success
|
||||
|
|
|
@ -38,9 +38,6 @@
|
|||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
/// @brief typedef the template instantiation of the PathFinder
|
||||
typedef arangodb::graph::AttributeWeightShortestPathFinder ArangoDBPathFinder;
|
||||
|
||||
using namespace arangodb::aql;
|
||||
using namespace arangodb::graph;
|
||||
|
||||
|
@ -91,22 +88,12 @@ ShortestPathBlock::ShortestPathBlock(ExecutionEngine* engine,
|
|||
}
|
||||
_path = std::make_unique<arangodb::graph::ShortestPathResult>();
|
||||
|
||||
if (arangodb::ServerState::instance()->isCoordinator()) {
|
||||
if (_opts->useWeight()) {
|
||||
_finder.reset(
|
||||
new arangodb::graph::AttributeWeightShortestPathFinder(_opts));
|
||||
} else {
|
||||
_finder.reset(
|
||||
new arangodb::graph::ConstantWeightShortestPathFinder(_opts));
|
||||
}
|
||||
if (_opts->useWeight()) {
|
||||
_finder.reset(
|
||||
new arangodb::graph::AttributeWeightShortestPathFinder(_opts));
|
||||
} else {
|
||||
if (_opts->useWeight()) {
|
||||
_finder.reset(
|
||||
new arangodb::graph::AttributeWeightShortestPathFinder(_opts));
|
||||
} else {
|
||||
_finder.reset(
|
||||
new arangodb::graph::ConstantWeightShortestPathFinder(_opts));
|
||||
}
|
||||
_finder.reset(
|
||||
new arangodb::graph::ConstantWeightShortestPathFinder(_opts));
|
||||
}
|
||||
|
||||
if (arangodb::ServerState::instance()->isCoordinator()) {
|
||||
|
|
|
@ -476,6 +476,29 @@ void ClusterFeature::start() {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
void ClusterFeature::stop() {
|
||||
|
||||
if (_enableCluster) {
|
||||
if (_heartbeatThread != nullptr) {
|
||||
_heartbeatThread->beginShutdown();
|
||||
}
|
||||
|
||||
if (_heartbeatThread != nullptr) {
|
||||
int counter = 0;
|
||||
while (_heartbeatThread->isRunning()) {
|
||||
usleep(100000);
|
||||
// emit warning after 5 seconds
|
||||
if (++counter == 10 * 5) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "waiting for heartbeat thread to finish";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
void ClusterFeature::unprepare() {
|
||||
if (_enableCluster) {
|
||||
if (_heartbeatThread != nullptr) {
|
||||
|
|
|
@ -73,6 +73,8 @@ class ClusterFeature : public application_features::ApplicationFeature {
|
|||
|
||||
void setUnregisterOnShutdown(bool);
|
||||
|
||||
void stop() override final;
|
||||
|
||||
private:
|
||||
bool _unregisterOnShutdown;
|
||||
bool _enableCluster;
|
||||
|
|
|
@ -1300,6 +1300,18 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
|
|||
AgencyComm ac;
|
||||
AgencyCommResult res;
|
||||
|
||||
// First check that no other collection has a distributeShardsLike
|
||||
// entry pointing to us:
|
||||
auto coll = getCollection(databaseName, collectionID);
|
||||
// not used # std::string id = std::to_string(coll->cid());
|
||||
auto colls = getCollections(databaseName);
|
||||
for (std::shared_ptr<LogicalCollection> const& p : colls) {
|
||||
if (p->distributeShardsLike() == coll->name() ||
|
||||
p->distributeShardsLike() == collectionID) {
|
||||
return TRI_ERROR_CLUSTER_MUST_NOT_DROP_COLL_OTHER_DISTRIBUTESHARDSLIKE;
|
||||
}
|
||||
}
|
||||
|
||||
double const realTimeout = getTimeout(timeout);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
double const interval = getPollInterval();
|
||||
|
|
|
@ -2261,13 +2261,14 @@ std::unordered_map<std::string, std::vector<std::string>> distributeShards(
|
|||
|
||||
#ifndef USE_ENTERPRISE
|
||||
std::unique_ptr<LogicalCollection>
|
||||
ClusterMethods::createCollectionOnCoordinator(
|
||||
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase, VPackSlice parameters,
|
||||
bool ignoreDistributeShardsLikeErrors) {
|
||||
ClusterMethods::createCollectionOnCoordinator(TRI_col_type_e collectionType,
|
||||
TRI_vocbase_t* vocbase,
|
||||
VPackSlice parameters,
|
||||
bool ignoreDistributeShardsLikeErrors) {
|
||||
auto col = std::make_unique<LogicalCollection>(vocbase, parameters);
|
||||
// Collection is a temporary collection object that undergoes sanity checks etc.
|
||||
// It is not used anywhere and will be cleaned up after this call.
|
||||
// Persist collection will return the real object.
|
||||
// Collection is a temporary collection object that undergoes sanity checks etc.
|
||||
// It is not used anywhere and will be cleaned up after this call.
|
||||
// Persist collection will return the real object.
|
||||
return persistCollectionInAgency(col.get(), ignoreDistributeShardsLikeErrors);
|
||||
}
|
||||
#endif
|
||||
|
@ -2282,22 +2283,22 @@ ClusterMethods::persistCollectionInAgency(
|
|||
std::string distributeShardsLike = col->distributeShardsLike();
|
||||
std::vector<std::string> dbServers;
|
||||
std::vector<std::string> avoid = col->avoidServers();
|
||||
|
||||
bool chainOfDistributeShardsLike = false;
|
||||
|
||||
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
if (!distributeShardsLike.empty()) {
|
||||
|
||||
CollectionNameResolver resolver(col->vocbase());
|
||||
TRI_voc_cid_t otherCid =
|
||||
resolver.getCollectionIdCluster(distributeShardsLike);
|
||||
|
||||
if (otherCid != 0) {
|
||||
bool chainOfDistributeShardsLike = false;
|
||||
|
||||
std::string otherCidString
|
||||
= arangodb::basics::StringUtils::itoa(otherCid);
|
||||
|
||||
try {
|
||||
std::shared_ptr<LogicalCollection> collInfo =
|
||||
ci->getCollection(col->dbName(), otherCidString);
|
||||
ci->getCollection(col->dbName(), otherCidString);
|
||||
if (!collInfo->distributeShardsLike().empty()) {
|
||||
chainOfDistributeShardsLike = true;
|
||||
}
|
||||
|
@ -2312,21 +2313,21 @@ ClusterMethods::persistCollectionInAgency(
|
|||
}
|
||||
}
|
||||
} catch (...) {}
|
||||
|
||||
|
||||
if (chainOfDistributeShardsLike) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE);
|
||||
}
|
||||
|
||||
col->distributeShardsLike(otherCidString);
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER) << "WTF? " << ignoreDistributeShardsLikeErrors;
|
||||
if (ignoreDistributeShardsLikeErrors) {
|
||||
col->distributeShardsLike(std::string());
|
||||
} else {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE);
|
||||
}
|
||||
}
|
||||
} else if(!avoid.empty()) {
|
||||
|
||||
} else if (!avoid.empty()) {
|
||||
size_t replicationFactor = col->replicationFactor();
|
||||
dbServers = ci->getCurrentDBServers();
|
||||
if (dbServers.size() - avoid.size() >= replicationFactor) {
|
||||
|
@ -2337,9 +2338,8 @@ ClusterMethods::persistCollectionInAgency(
|
|||
}), dbServers.end());
|
||||
}
|
||||
std::random_shuffle(dbServers.begin(), dbServers.end());
|
||||
|
||||
}
|
||||
|
||||
|
||||
// If the list dbServers is still empty, it will be filled in
|
||||
// distributeShards below.
|
||||
|
||||
|
|
|
@ -267,7 +267,7 @@ class ClusterMethods {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static std::unique_ptr<LogicalCollection> persistCollectionInAgency(
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors);
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = false);
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -825,10 +825,8 @@ void HeartbeatThread::syncDBServerStatusQuo() {
|
|||
_backgroundJobScheduledOrRunning = true;
|
||||
|
||||
// the JobGuard is in the operator() of HeartbeatBackgroundJob
|
||||
if (!isStopping() && !_ioService->stopped()) {
|
||||
_ioService->
|
||||
post(HeartbeatBackgroundJob(shared_from_this(), TRI_microtime()));
|
||||
}
|
||||
_ioService->
|
||||
post(HeartbeatBackgroundJob(shared_from_this(), TRI_microtime()));
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -45,7 +45,7 @@ class Query;
|
|||
}
|
||||
|
||||
namespace graph {
|
||||
class ShortestPathOptions;
|
||||
struct ShortestPathOptions;
|
||||
}
|
||||
|
||||
namespace velocypack {
|
||||
|
@ -54,7 +54,7 @@ class Slice;
|
|||
}
|
||||
|
||||
namespace traverser {
|
||||
class TraverserOptions;
|
||||
struct TraverserOptions;
|
||||
|
||||
class BaseEngine {
|
||||
friend class TraverserEngineRegistry;
|
||||
|
|
|
@ -59,7 +59,7 @@ TraverserEngineRegistry::EngineInfo::EngineInfo(TRI_vocbase_t* vocbase,
|
|||
VPackSlice info)
|
||||
: _isInUse(false),
|
||||
_toBeDeleted(false),
|
||||
_engine(std::move(BaseEngine::BuildEngine(vocbase, info))),
|
||||
_engine(BaseEngine::BuildEngine(vocbase, info)),
|
||||
_timeToLive(0),
|
||||
_expires(0) {}
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ void GeneralCommTask::executeRequest(
|
|||
GeneralServerFeature::HANDLER_FACTORY->createHandler(
|
||||
std::move(request), std::move(response)));
|
||||
|
||||
// transfer statistics into handler
|
||||
// give up, if we cannot find a handler
|
||||
if (handler == nullptr) {
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "no handler is known, giving up";
|
||||
handleSimpleError(rest::ResponseCode::NOT_FOUND, messageId);
|
||||
|
|
|
@ -129,6 +129,10 @@ void HttpCommTask::addResponse(HttpResponse* response,
|
|||
StaticStrings::ExposedCorsHeaders);
|
||||
}
|
||||
|
||||
// use "IfNotSet"
|
||||
response->setHeaderNCIfNotSet(StaticStrings::XContentTypeOptions,
|
||||
StaticStrings::NoSniff);
|
||||
|
||||
// set "connection" header, keep-alive is the default
|
||||
response->setConnectionType(_closeRequested
|
||||
? rest::ConnectionType::C_CLOSE
|
||||
|
@ -160,13 +164,27 @@ void HttpCommTask::addResponse(HttpResponse* response,
|
|||
if (!buffer._buffer->empty()) {
|
||||
LOG_TOPIC(TRACE, Logger::REQUESTS)
|
||||
<< "\"http-request-response\",\"" << (void*)this << "\",\"" << _fullUrl
|
||||
<< "\",\"" << StringUtils::escapeUnicode(std::string(
|
||||
buffer._buffer->c_str(), buffer._buffer->length()))
|
||||
<< "\",\""
|
||||
<< StringUtils::escapeUnicode(
|
||||
std::string(buffer._buffer->c_str(), buffer._buffer->length()))
|
||||
<< "\"";
|
||||
}
|
||||
|
||||
// append write buffer and statistics
|
||||
double const totalTime = RequestStatistics::ELAPSED_SINCE_READ_START(stat);
|
||||
|
||||
if (stat != nullptr && arangodb::Logger::isEnabled(arangodb::LogLevel::TRACE,
|
||||
Logger::REQUESTS)) {
|
||||
LOG_TOPIC(TRACE, Logger::REQUESTS)
|
||||
<< "\"http-request-statistics\",\"" << (void*)this << "\",\""
|
||||
<< _connectionInfo.clientAddress << "\",\""
|
||||
<< HttpRequest::translateMethod(_requestType) << "\",\""
|
||||
<< HttpRequest::translateVersion(_protocolVersion) << "\","
|
||||
<< static_cast<int>(response->responseCode()) << ","
|
||||
<< _originalBodyLength << "," << responseBodyLength << ",\"" << _fullUrl
|
||||
<< "\"," << stat->timingsCsv();
|
||||
}
|
||||
|
||||
addWriteBuffer(buffer);
|
||||
|
||||
// and give some request information
|
||||
|
@ -700,7 +718,7 @@ std::unique_ptr<GeneralResponse> HttpCommTask::createResponse(
|
|||
}
|
||||
|
||||
void HttpCommTask::compactify() {
|
||||
if (! _newRequest) {
|
||||
if (!_newRequest) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -731,7 +749,7 @@ void HttpCommTask::compactify() {
|
|||
TRI_ASSERT(_bodyPosition >= _readPosition);
|
||||
_bodyPosition -= _readPosition;
|
||||
}
|
||||
|
||||
|
||||
_readPosition = 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,6 +139,15 @@ void VppCommTask::addResponse(VppResponse* response, RequestStatistics* stat) {
|
|||
auto buffers = createChunkForNetwork(slices, id, chunkSize, false);
|
||||
double const totalTime = RequestStatistics::ELAPSED_SINCE_READ_START(stat);
|
||||
|
||||
if (stat != nullptr && arangodb::Logger::isEnabled(arangodb::LogLevel::TRACE,
|
||||
Logger::REQUESTS)) {
|
||||
LOG_TOPIC(TRACE, Logger::REQUESTS)
|
||||
<< "\"vst-request-statistics\",\"" << (void*)this << "\",\""
|
||||
<< VppRequest::translateVersion(_protocolVersion) << "\","
|
||||
<< static_cast<int>(response->responseCode()) << ","
|
||||
<< _connectionInfo.clientAddress << "\"," << stat->timingsCsv();
|
||||
}
|
||||
|
||||
if (buffers.empty()) {
|
||||
if (stat != nullptr) {
|
||||
stat->release();
|
||||
|
|
|
@ -708,10 +708,16 @@ bool Index::canUseConditionPart(
|
|||
other->type == arangodb::aql::NODE_TYPE_ATTRIBUTE_ACCESS)) {
|
||||
// value IN a.b OR value IN a.b[*]
|
||||
arangodb::aql::Ast::getReferencedVariables(access, variables);
|
||||
if (other->type == arangodb::aql::NODE_TYPE_ATTRIBUTE_ACCESS &&
|
||||
variables.find(reference) != variables.end()) {
|
||||
variables.clear();
|
||||
arangodb::aql::Ast::getReferencedVariables(other, variables);
|
||||
}
|
||||
} else {
|
||||
// a.b == value OR a.b IN values
|
||||
arangodb::aql::Ast::getReferencedVariables(other, variables);
|
||||
}
|
||||
|
||||
if (variables.find(reference) != variables.end()) {
|
||||
// yes. then we cannot use an index here
|
||||
return false;
|
||||
|
@ -758,22 +764,30 @@ void Index::expandInSearchValues(VPackSlice const base,
|
|||
VPackSlice current = oneLookup.at(i);
|
||||
if (current.hasKey(StaticStrings::IndexIn)) {
|
||||
VPackSlice inList = current.get(StaticStrings::IndexIn);
|
||||
if (!inList.isArray()) {
|
||||
// IN value is a non-array
|
||||
result.clear();
|
||||
result.openArray();
|
||||
return;
|
||||
}
|
||||
|
||||
TRI_ASSERT(inList.isArray());
|
||||
VPackValueLength nList = inList.length();
|
||||
|
||||
if (nList == 0) {
|
||||
// Empty Array. short circuit, no matches possible
|
||||
result.clear();
|
||||
result.openArray();
|
||||
return;
|
||||
}
|
||||
|
||||
std::unordered_set<VPackSlice,
|
||||
arangodb::basics::VelocyPackHelper::VPackHash,
|
||||
arangodb::basics::VelocyPackHelper::VPackEqual>
|
||||
tmp(static_cast<size_t>(inList.length()),
|
||||
tmp(static_cast<size_t>(nList),
|
||||
arangodb::basics::VelocyPackHelper::VPackHash(),
|
||||
arangodb::basics::VelocyPackHelper::VPackEqual());
|
||||
|
||||
TRI_ASSERT(inList.isArray());
|
||||
if (inList.length() == 0) {
|
||||
// Empty Array. short circuit, no matches possible
|
||||
result.clear();
|
||||
result.openArray();
|
||||
result.close();
|
||||
return;
|
||||
}
|
||||
for (auto const& el : VPackArrayIterator(inList)) {
|
||||
tmp.emplace(el);
|
||||
}
|
||||
|
|
|
@ -58,7 +58,6 @@ class LogicalCollection;
|
|||
namespace transaction {
|
||||
class Methods;
|
||||
}
|
||||
;
|
||||
|
||||
/// @brief a base class to iterate over the index. An iterator is requested
|
||||
/// at the index itself
|
||||
|
@ -102,23 +101,23 @@ class IndexIterator {
|
|||
|
||||
/// @brief Special iterator if the condition cannot have any result
|
||||
class EmptyIndexIterator final : public IndexIterator {
|
||||
public:
|
||||
EmptyIndexIterator(LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::Index const* index)
|
||||
: IndexIterator(collection, trx, mmdr, index) {}
|
||||
public:
|
||||
EmptyIndexIterator(LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::Index const* index)
|
||||
: IndexIterator(collection, trx, mmdr, index) {}
|
||||
|
||||
~EmptyIndexIterator() {}
|
||||
~EmptyIndexIterator() {}
|
||||
|
||||
char const* typeName() const override { return "empty-index-iterator"; }
|
||||
char const* typeName() const override { return "empty-index-iterator"; }
|
||||
|
||||
bool next(TokenCallback const&, size_t) override {
|
||||
return false;
|
||||
}
|
||||
bool next(TokenCallback const&, size_t) override {
|
||||
return false;
|
||||
}
|
||||
|
||||
void reset() override {}
|
||||
void reset() override {}
|
||||
|
||||
void skip(uint64_t, uint64_t& skipped) override {
|
||||
skipped = 0;
|
||||
}
|
||||
void skip(uint64_t, uint64_t& skipped) override {
|
||||
skipped = 0;
|
||||
}
|
||||
};
|
||||
|
||||
/// @brief a wrapper class to iterate over several IndexIterators.
|
||||
|
|
|
@ -1871,6 +1871,13 @@ int MMFilesCollection::iterateMarkersOnLoad(transaction::Methods* trx) {
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
DocumentIdentifierToken MMFilesCollection::lookupKey(transaction::Methods *trx,
|
||||
VPackSlice const& key) {
|
||||
MMFilesPrimaryIndex *index = primaryIndex();
|
||||
MMFilesSimpleIndexElement element = index->lookupKey(trx, key);
|
||||
return element ? MMFilesToken(element.revisionId()) : MMFilesToken();
|
||||
}
|
||||
|
||||
int MMFilesCollection::read(transaction::Methods* trx, VPackSlice const key,
|
||||
ManagedDocumentResult& result, bool lock) {
|
||||
TRI_IF_FAILURE("ReadDocumentNoLock") {
|
||||
|
|
|
@ -72,7 +72,8 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
arangodb::MMFilesPrimaryIndex* _primaryIndex;
|
||||
TRI_voc_tid_t _tid;
|
||||
TRI_voc_fid_t _fid;
|
||||
std::unordered_map<TRI_voc_fid_t, MMFilesDatafileStatisticsContainer*> _stats;
|
||||
std::unordered_map<TRI_voc_fid_t, MMFilesDatafileStatisticsContainer*>
|
||||
_stats;
|
||||
MMFilesDatafileStatisticsContainer* _dfi;
|
||||
transaction::Methods* _trx;
|
||||
ManagedDocumentResult _mmdr;
|
||||
|
@ -85,8 +86,9 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
OpenIteratorState(LogicalCollection* collection, transaction::Methods* trx)
|
||||
: _collection(collection),
|
||||
_primaryIndex(static_cast<MMFilesCollection*>(collection->getPhysical())
|
||||
->primaryIndex()),
|
||||
_primaryIndex(
|
||||
static_cast<MMFilesCollection*>(collection->getPhysical())
|
||||
->primaryIndex()),
|
||||
_tid(0),
|
||||
_fid(0),
|
||||
_stats(),
|
||||
|
@ -120,24 +122,23 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
public:
|
||||
explicit MMFilesCollection(LogicalCollection*, VPackSlice const& info);
|
||||
explicit MMFilesCollection(LogicalCollection*, PhysicalCollection*); //use in cluster only!!!!!
|
||||
explicit MMFilesCollection(LogicalCollection*,
|
||||
PhysicalCollection*); // use in cluster only!!!!!
|
||||
|
||||
~MMFilesCollection();
|
||||
|
||||
constexpr static double defaultLockTimeout = 10.0 * 60.0;
|
||||
|
||||
std::string const& path() const override {
|
||||
return _path;
|
||||
};
|
||||
std::string const& path() const override { return _path; };
|
||||
|
||||
void setPath(std::string const& path) override {
|
||||
_path = path;
|
||||
};
|
||||
void setPath(std::string const& path) override { _path = path; };
|
||||
|
||||
arangodb::Result updateProperties(VPackSlice const& slice, bool doSync) override;
|
||||
arangodb::Result updateProperties(VPackSlice const& slice,
|
||||
bool doSync) override;
|
||||
virtual arangodb::Result persistProperties() override;
|
||||
|
||||
virtual PhysicalCollection* clone(LogicalCollection*, PhysicalCollection*) override;
|
||||
virtual PhysicalCollection* clone(LogicalCollection*,
|
||||
PhysicalCollection*) override;
|
||||
|
||||
TRI_voc_rid_t revision(arangodb::transaction::Methods* trx) const override;
|
||||
TRI_voc_rid_t revision() const;
|
||||
|
@ -148,7 +149,7 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
size_t journalSize() const;
|
||||
bool isVolatile() const;
|
||||
|
||||
|
||||
TRI_voc_tick_t maxTick() const { return _maxTick; }
|
||||
void maxTick(TRI_voc_tick_t value) { _maxTick = value; }
|
||||
|
||||
|
@ -156,12 +157,14 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
void getPropertiesVPackCoordinator(velocypack::Builder&) const override;
|
||||
|
||||
// datafile management
|
||||
bool applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
|
||||
std::function<bool(TRI_voc_tick_t foundTick, MMFilesMarker const* marker)> const& callback);
|
||||
bool applyForTickRange(
|
||||
TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
|
||||
std::function<bool(TRI_voc_tick_t foundTick,
|
||||
MMFilesMarker const* marker)> const& callback);
|
||||
|
||||
/// @brief closes an open collection
|
||||
int close() override;
|
||||
|
||||
|
||||
/// @brief rotate the active journal - will do nothing if there is no journal
|
||||
int rotateActiveJournal();
|
||||
|
||||
|
@ -170,28 +173,32 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
int syncActiveJournal();
|
||||
|
||||
int reserveJournalSpace(TRI_voc_tick_t tick, TRI_voc_size_t size,
|
||||
char*& resultPosition, MMFilesDatafile*& resultDatafile);
|
||||
char*& resultPosition,
|
||||
MMFilesDatafile*& resultDatafile);
|
||||
|
||||
/// @brief create compactor file
|
||||
MMFilesDatafile* createCompactor(TRI_voc_fid_t fid, TRI_voc_size_t maximalSize);
|
||||
|
||||
MMFilesDatafile* createCompactor(TRI_voc_fid_t fid,
|
||||
TRI_voc_size_t maximalSize);
|
||||
|
||||
/// @brief close an existing compactor
|
||||
int closeCompactor(MMFilesDatafile* datafile);
|
||||
|
||||
/// @brief replace a datafile with a compactor
|
||||
int replaceDatafileWithCompactor(MMFilesDatafile* datafile, MMFilesDatafile* compactor);
|
||||
int replaceDatafileWithCompactor(MMFilesDatafile* datafile,
|
||||
MMFilesDatafile* compactor);
|
||||
|
||||
bool removeCompactor(MMFilesDatafile*);
|
||||
bool removeDatafile(MMFilesDatafile*);
|
||||
|
||||
|
||||
/// @brief seal a datafile
|
||||
int sealDatafile(MMFilesDatafile* datafile, bool isCompactor);
|
||||
|
||||
/// @brief increase dead stats for a datafile, if it exists
|
||||
void updateStats(TRI_voc_fid_t fid, MMFilesDatafileStatisticsContainer const& values) {
|
||||
void updateStats(TRI_voc_fid_t fid,
|
||||
MMFilesDatafileStatisticsContainer const& values) {
|
||||
_datafileStatistics.update(fid, values);
|
||||
}
|
||||
|
||||
|
||||
uint64_t numberDocuments(transaction::Methods* trx) const override;
|
||||
|
||||
/// @brief report extra memory used by indexes etc.
|
||||
|
@ -204,17 +211,17 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
bool tryLockForCompaction();
|
||||
void finishCompaction();
|
||||
|
||||
void setNextCompactionStartIndex(size_t index){
|
||||
void setNextCompactionStartIndex(size_t index) {
|
||||
MUTEX_LOCKER(mutexLocker, _compactionStatusLock);
|
||||
_nextCompactionStartIndex = index;
|
||||
}
|
||||
|
||||
size_t getNextCompactionStartIndex(){
|
||||
size_t getNextCompactionStartIndex() {
|
||||
MUTEX_LOCKER(mutexLocker, _compactionStatusLock);
|
||||
return _nextCompactionStartIndex;
|
||||
}
|
||||
|
||||
void setCompactionStatus(char const* reason){
|
||||
void setCompactionStatus(char const* reason) {
|
||||
TRI_ASSERT(reason != nullptr);
|
||||
MUTEX_LOCKER(mutexLocker, _compactionStatusLock);
|
||||
_lastCompactionStatus = reason;
|
||||
|
@ -223,17 +230,16 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
void lastCompactionStamp(double value) { _lastCompactionStamp = value; }
|
||||
|
||||
MMFilesDitches* ditches() const { return &_ditches; }
|
||||
|
||||
|
||||
void open(bool ignoreErrors) override;
|
||||
|
||||
/// @brief iterate all markers of a collection on load
|
||||
/// @brief iterate all markers of a collection on load
|
||||
int iterateMarkersOnLoad(arangodb::transaction::Methods* trx) override;
|
||||
|
||||
|
||||
bool isFullyCollected() const override;
|
||||
|
||||
bool doCompact() const { return _doCompact; }
|
||||
|
||||
|
||||
int64_t uncollectedLogfileEntries() const {
|
||||
return _uncollectedLogfileEntries.load();
|
||||
}
|
||||
|
@ -259,7 +265,7 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
// is somehow protected. If it goes out of all scopes
|
||||
// or it's indexes are freed the pointer returned will get invalidated.
|
||||
MMFilesPrimaryIndex* primaryIndex() const;
|
||||
|
||||
|
||||
inline bool useSecondaryIndexes() const { return _useSecondaryIndexes; }
|
||||
|
||||
void useSecondaryIndexes(bool value) { _useSecondaryIndexes = value; }
|
||||
|
@ -271,10 +277,14 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
/// @brief Find index by definition
|
||||
std::shared_ptr<Index> lookupIndex(velocypack::Slice const&) const override;
|
||||
|
||||
std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) override;
|
||||
std::unique_ptr<IndexIterator> getAnyIterator(transaction::Methods* trx, ManagedDocumentResult* mdr) override;
|
||||
void invokeOnAllElements(transaction::Methods* trx,
|
||||
std::function<bool(DocumentIdentifierToken const&)> callback) override;
|
||||
std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx,
|
||||
ManagedDocumentResult* mdr,
|
||||
bool reverse) override;
|
||||
std::unique_ptr<IndexIterator> getAnyIterator(
|
||||
transaction::Methods* trx, ManagedDocumentResult* mdr) override;
|
||||
void invokeOnAllElements(
|
||||
transaction::Methods* trx,
|
||||
std::function<bool(DocumentIdentifierToken const&)> callback) override;
|
||||
|
||||
std::shared_ptr<Index> createIndex(transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const& info,
|
||||
|
@ -307,6 +317,9 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
void truncate(transaction::Methods* trx, OperationOptions& options) override;
|
||||
|
||||
DocumentIdentifierToken lookupKey(transaction::Methods* trx,
|
||||
velocypack::Slice const& key) override;
|
||||
|
||||
int read(transaction::Methods*, arangodb::velocypack::Slice const key,
|
||||
ManagedDocumentResult& result, bool) override;
|
||||
|
||||
|
@ -321,9 +334,8 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
int insert(arangodb::transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const newSlice,
|
||||
arangodb::ManagedDocumentResult& result,
|
||||
OperationOptions& options, TRI_voc_tick_t& resultMarkerTick,
|
||||
bool lock) override;
|
||||
arangodb::ManagedDocumentResult& result, OperationOptions& options,
|
||||
TRI_voc_tick_t& resultMarkerTick, bool lock) override;
|
||||
|
||||
int update(arangodb::transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const newSlice,
|
||||
|
@ -353,7 +365,8 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
/// can be dropped. The callback is supposed to drop
|
||||
/// the collection and it is guaranteed that no one is using
|
||||
/// it at that moment.
|
||||
void deferDropCollection(std::function<bool(LogicalCollection*)> callback) override;
|
||||
void deferDropCollection(
|
||||
std::function<bool(LogicalCollection*)> callback) override;
|
||||
|
||||
int rollbackOperation(transaction::Methods*, TRI_voc_document_operation_e,
|
||||
TRI_voc_rid_t oldRevisionId,
|
||||
|
@ -361,8 +374,10 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
TRI_voc_rid_t newRevisionId,
|
||||
velocypack::Slice const& newDoc);
|
||||
|
||||
MMFilesDocumentPosition insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
|
||||
TRI_voc_fid_t fid, bool isInWal, bool shouldLock);
|
||||
MMFilesDocumentPosition insertRevision(TRI_voc_rid_t revisionId,
|
||||
uint8_t const* dataptr,
|
||||
TRI_voc_fid_t fid, bool isInWal,
|
||||
bool shouldLock);
|
||||
|
||||
void insertRevision(MMFilesDocumentPosition const& position, bool shouldLock);
|
||||
|
||||
|
@ -383,35 +398,31 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
bool openIndex(VPackSlice const& description, transaction::Methods* trx);
|
||||
|
||||
/// @brief initializes an index with all existing documents
|
||||
void fillIndex(basics::LocalTaskQueue*, transaction::Methods*,
|
||||
Index*,
|
||||
void fillIndex(basics::LocalTaskQueue*, transaction::Methods*, Index*,
|
||||
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const&,
|
||||
bool);
|
||||
|
||||
|
||||
/// @brief Fill indexes used in recovery
|
||||
int fillIndexes(transaction::Methods*,
|
||||
std::vector<std::shared_ptr<Index>> const&,
|
||||
bool skipPersistent = true);
|
||||
|
||||
|
||||
int openWorker(bool ignoreErrors);
|
||||
|
||||
int removeFastPath(arangodb::transaction::Methods* trx,
|
||||
TRI_voc_rid_t oldRevisionId,
|
||||
arangodb::velocypack::Slice const oldDoc,
|
||||
OperationOptions& options,
|
||||
TRI_voc_rid_t const& revisionId,
|
||||
OperationOptions& options, TRI_voc_rid_t const& revisionId,
|
||||
arangodb::velocypack::Slice const toRemove);
|
||||
|
||||
|
||||
static int OpenIteratorHandleDocumentMarker(MMFilesMarker const* marker,
|
||||
MMFilesDatafile* datafile,
|
||||
OpenIteratorState* state);
|
||||
static int OpenIteratorHandleDeletionMarker(MMFilesMarker const* marker,
|
||||
MMFilesDatafile* datafile,
|
||||
OpenIteratorState* state);
|
||||
static bool OpenIterator(MMFilesMarker const* marker,
|
||||
OpenIteratorState* data, MMFilesDatafile* datafile);
|
||||
static bool OpenIterator(MMFilesMarker const* marker, OpenIteratorState* data,
|
||||
MMFilesDatafile* datafile);
|
||||
|
||||
/// @brief create statistics for a datafile, using the stats provided
|
||||
void createStats(TRI_voc_fid_t fid,
|
||||
|
@ -421,12 +432,11 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
/// @brief iterates over a collection
|
||||
bool iterateDatafiles(
|
||||
std::function<bool(MMFilesMarker const*, MMFilesDatafile*)> const&
|
||||
cb);
|
||||
std::function<bool(MMFilesMarker const*, MMFilesDatafile*)> const& cb);
|
||||
|
||||
/// @brief creates a datafile
|
||||
MMFilesDatafile* createDatafile(
|
||||
TRI_voc_fid_t fid, TRI_voc_size_t journalSize, bool isCompactor);
|
||||
MMFilesDatafile* createDatafile(TRI_voc_fid_t fid, TRI_voc_size_t journalSize,
|
||||
bool isCompactor);
|
||||
|
||||
/// @brief iterate over a vector of datafiles and pick those with a specific
|
||||
/// data range
|
||||
|
@ -438,23 +448,21 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
bool iterateDatafilesVector(
|
||||
std::vector<MMFilesDatafile*> const& files,
|
||||
std::function<bool(MMFilesMarker const*, MMFilesDatafile*)> const&
|
||||
cb);
|
||||
std::function<bool(MMFilesMarker const*, MMFilesDatafile*)> const& cb);
|
||||
|
||||
MMFilesDocumentPosition lookupRevision(TRI_voc_rid_t revisionId) const;
|
||||
|
||||
int insertDocument(arangodb::transaction::Methods * trx,
|
||||
TRI_voc_rid_t revisionId,
|
||||
arangodb::velocypack::Slice const& doc,
|
||||
MMFilesDocumentOperation& operation,
|
||||
MMFilesWalMarker const* marker, bool& waitForSync);
|
||||
int insertDocument(arangodb::transaction::Methods* trx,
|
||||
TRI_voc_rid_t revisionId,
|
||||
arangodb::velocypack::Slice const& doc,
|
||||
MMFilesDocumentOperation& operation,
|
||||
MMFilesWalMarker const* marker, bool& waitForSync);
|
||||
|
||||
private:
|
||||
|
||||
uint8_t const* lookupRevisionVPack(TRI_voc_rid_t revisionId) const;
|
||||
uint8_t const* lookupRevisionVPackConditional(
|
||||
TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal)
|
||||
const;
|
||||
uint8_t const* lookupRevisionVPackConditional(TRI_voc_rid_t revisionId,
|
||||
TRI_voc_tick_t maxTick,
|
||||
bool excludeWal) const;
|
||||
|
||||
bool addIndex(std::shared_ptr<arangodb::Index> idx);
|
||||
void addIndexLocal(std::shared_ptr<arangodb::Index> idx);
|
||||
|
@ -463,7 +471,8 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
bool removeIndex(TRI_idx_iid_t iid);
|
||||
|
||||
/// @brief return engine-specific figures
|
||||
void figuresSpecific(std::shared_ptr<arangodb::velocypack::Builder>&) override;
|
||||
void figuresSpecific(
|
||||
std::shared_ptr<arangodb::velocypack::Builder>&) override;
|
||||
|
||||
// SECTION: Index storage
|
||||
|
||||
|
@ -477,26 +486,25 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
velocypack::Slice const& doc);
|
||||
|
||||
int insertPrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
|
||||
velocypack::Slice const&);
|
||||
velocypack::Slice const&);
|
||||
|
||||
int deletePrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
|
||||
velocypack::Slice const&);
|
||||
velocypack::Slice const&);
|
||||
|
||||
int insertSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
|
||||
velocypack::Slice const&, bool isRollback);
|
||||
velocypack::Slice const&, bool isRollback);
|
||||
|
||||
int deleteSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
|
||||
velocypack::Slice const&, bool isRollback);
|
||||
velocypack::Slice const&, bool isRollback);
|
||||
|
||||
int lookupDocument(transaction::Methods*, velocypack::Slice,
|
||||
ManagedDocumentResult& result);
|
||||
ManagedDocumentResult& result);
|
||||
|
||||
int updateDocument(transaction::Methods*, TRI_voc_rid_t oldRevisionId,
|
||||
velocypack::Slice const& oldDoc,
|
||||
TRI_voc_rid_t newRevisionId,
|
||||
velocypack::Slice const& newDoc,
|
||||
MMFilesDocumentOperation&, MMFilesWalMarker const*,
|
||||
bool& waitForSync);
|
||||
velocypack::Slice const& oldDoc,
|
||||
TRI_voc_rid_t newRevisionId,
|
||||
velocypack::Slice const& newDoc, MMFilesDocumentOperation&,
|
||||
MMFilesWalMarker const*, bool& waitForSync);
|
||||
|
||||
private:
|
||||
mutable arangodb::MMFilesDitches _ditches;
|
||||
|
@ -544,7 +552,6 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
bool _doCompact;
|
||||
TRI_voc_tick_t _maxTick;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -448,14 +448,15 @@ IndexIterator* MMFilesEdgeIndex::iteratorForCondition(
|
|||
if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) {
|
||||
// a.b IN values
|
||||
if (!valNode->isArray()) {
|
||||
return nullptr;
|
||||
// a.b IN non-array
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
return createInIterator(trx, mmdr, attrNode, valNode);
|
||||
}
|
||||
|
||||
// operator type unsupported
|
||||
return nullptr;
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
/// @brief specializes the condition for use with the index
|
||||
|
|
|
@ -120,8 +120,10 @@ MMFilesHashIndexLookupBuilder::MMFilesHashIndexLookupBuilder(
|
|||
TRI_IF_FAILURE("Index::permutationIN") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
for (auto const& value : VPackArrayIterator(values)) {
|
||||
tmp.emplace(value);
|
||||
if (values.isArray()) {
|
||||
for (auto const& value : VPackArrayIterator(values)) {
|
||||
tmp.emplace(value);
|
||||
}
|
||||
}
|
||||
if (tmp.empty()) {
|
||||
// IN [] short-circuit, cannot be fullfilled;
|
||||
|
|
|
@ -471,14 +471,15 @@ IndexIterator* MMFilesPrimaryIndex::iteratorForCondition(
|
|||
} else if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) {
|
||||
// a.b IN values
|
||||
if (!valNode->isArray()) {
|
||||
return nullptr;
|
||||
// a.b IN non-array
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
return createInIterator(trx, mmdr, attrNode, valNode);
|
||||
}
|
||||
|
||||
// operator type unsupported
|
||||
return nullptr;
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
/// @brief specializes the condition for use with the index
|
||||
|
|
|
@ -42,6 +42,7 @@ MMFilesWalRecoveryFeature::MMFilesWalRecoveryFeature(ApplicationServer* server)
|
|||
startsAfter("Database");
|
||||
startsAfter("MMFilesLogfileManager");
|
||||
startsAfter("MMFilesPersistentIndex");
|
||||
startsAfter("Scheduler");
|
||||
|
||||
onlyEnabledWith("MMFilesEngine");
|
||||
onlyEnabledWith("MMFilesLogfileManager");
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -464,10 +464,11 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
// TODO maybe we could also reuse Index::drop, if we ensure the
|
||||
// implementations
|
||||
// don't do anything beyond deleting their contents
|
||||
RocksDBKeyBounds indexBounds =
|
||||
RocksDBKeyBounds::PrimaryIndex(42); // default constructor?
|
||||
for (std::shared_ptr<Index> const& index : _indexes) {
|
||||
RocksDBIndex* rindex = static_cast<RocksDBIndex*>(index.get());
|
||||
|
||||
RocksDBKeyBounds indexBounds =
|
||||
RocksDBKeyBounds::Empty();
|
||||
switch (rindex->type()) {
|
||||
case RocksDBIndex::TRI_IDX_TYPE_PRIMARY_INDEX:
|
||||
indexBounds = RocksDBKeyBounds::PrimaryIndex(rindex->objectId());
|
||||
|
@ -504,6 +505,12 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
|
|||
}
|
||||
}
|
||||
|
||||
DocumentIdentifierToken RocksDBCollection::lookupKey(transaction::Methods* trx,
|
||||
VPackSlice const& key) {
|
||||
TRI_ASSERT(key.isString());
|
||||
return primaryIndex()->lookupKey(trx, StringRef(key));
|
||||
}
|
||||
|
||||
int RocksDBCollection::read(transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const key,
|
||||
ManagedDocumentResult& result, bool) {
|
||||
|
@ -597,7 +604,8 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
|
|||
TRI_voc_rid_t revisionId =
|
||||
transaction::helpers::extractRevFromDocument(newSlice);
|
||||
|
||||
RocksDBSavePoint guard(rocksTransaction(trx), trx->isSingleOperationTransaction());
|
||||
RocksDBSavePoint guard(rocksTransaction(trx),
|
||||
trx->isSingleOperationTransaction());
|
||||
|
||||
res = insertDocument(trx, revisionId, newSlice, options.waitForSync);
|
||||
if (res.ok()) {
|
||||
|
@ -698,7 +706,8 @@ int RocksDBCollection::update(arangodb::transaction::Methods* trx,
|
|||
}
|
||||
}
|
||||
|
||||
RocksDBSavePoint guard(rocksTransaction(trx), trx->isSingleOperationTransaction());
|
||||
RocksDBSavePoint guard(rocksTransaction(trx),
|
||||
trx->isSingleOperationTransaction());
|
||||
|
||||
VPackSlice const newDoc(builder->slice());
|
||||
|
||||
|
@ -796,7 +805,8 @@ int RocksDBCollection::replace(
|
|||
}
|
||||
}
|
||||
|
||||
RocksDBSavePoint guard(rocksTransaction(trx), trx->isSingleOperationTransaction());
|
||||
RocksDBSavePoint guard(rocksTransaction(trx),
|
||||
trx->isSingleOperationTransaction());
|
||||
|
||||
RocksDBOperationResult opResult =
|
||||
updateDocument(trx, oldRevisionId, oldDoc, revisionId,
|
||||
|
@ -880,7 +890,8 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx,
|
|||
}
|
||||
}
|
||||
|
||||
RocksDBSavePoint guard(rocksTransaction(trx), trx->isSingleOperationTransaction());
|
||||
RocksDBSavePoint guard(rocksTransaction(trx),
|
||||
trx->isSingleOperationTransaction());
|
||||
|
||||
res = removeDocument(trx, oldRevisionId, oldDoc, options.waitForSync);
|
||||
if (res.ok()) {
|
||||
|
@ -1152,7 +1163,8 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
|
|||
/// @brief looks up a document by key, low level worker
|
||||
/// the key must be a string slice, no revision check is performed
|
||||
RocksDBOperationResult RocksDBCollection::lookupDocument(
|
||||
transaction::Methods* trx, VPackSlice key, ManagedDocumentResult& mdr) const {
|
||||
transaction::Methods* trx, VPackSlice key,
|
||||
ManagedDocumentResult& mdr) const {
|
||||
RocksDBOperationResult res;
|
||||
if (!key.isString()) {
|
||||
res.reset(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD);
|
||||
|
|
|
@ -43,7 +43,7 @@ struct RocksDBToken;
|
|||
class RocksDBCollection final : public PhysicalCollection {
|
||||
friend class RocksDBEngine;
|
||||
friend class RocksDBVPackIndex;
|
||||
|
||||
|
||||
constexpr static double defaultLockTimeout = 10.0 * 60.0;
|
||||
|
||||
public:
|
||||
|
@ -131,6 +131,10 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
|
||||
void truncate(transaction::Methods* trx, OperationOptions& options) override;
|
||||
|
||||
DocumentIdentifierToken lookupKey(
|
||||
transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice const& key) override;
|
||||
|
||||
int read(transaction::Methods*, arangodb::velocypack::Slice const key,
|
||||
ManagedDocumentResult& result, bool) override;
|
||||
|
||||
|
@ -181,11 +185,11 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
|
||||
Result lookupDocumentToken(transaction::Methods* trx, arangodb::StringRef key,
|
||||
RocksDBToken& token) const;
|
||||
|
||||
|
||||
int beginWriteTimed(bool useDeadlockDetector, double timeout = 0.0);
|
||||
|
||||
|
||||
int endWrite(bool useDeadlockDetector);
|
||||
|
||||
|
||||
private:
|
||||
/// @brief return engine-specific figures
|
||||
void figuresSpecific(
|
||||
|
|
|
@ -24,29 +24,32 @@
|
|||
#include "RocksDBCounterManager.h"
|
||||
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBKey.h"
|
||||
#include "RocksDBEngine/RocksDBKeyBounds.h"
|
||||
#include "RocksDBEngine/RocksDBValue.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include <rocksdb/utilities/write_batch_with_index.h>
|
||||
#include <rocksdb/write_batch.h>
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/Parser.h>
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
|
||||
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice) {
|
||||
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice)
|
||||
: _sequenceNum(0), _count(0), _revisionId(0) {
|
||||
if (!slice.isArray()) {
|
||||
// got a somewhat invalid slice. probably old data from before the key structure changes
|
||||
return;
|
||||
}
|
||||
TRI_ASSERT(slice.isArray());
|
||||
|
||||
velocypack::ArrayIterator array(slice);
|
||||
if (array.valid()) {
|
||||
|
@ -146,6 +149,10 @@ void RocksDBCounterManager::removeCounter(uint64_t objectId) {
|
|||
|
||||
/// Thread-Safe force sync
|
||||
Result RocksDBCounterManager::sync(bool force) {
|
||||
#if 0
|
||||
writeSettings();
|
||||
#endif
|
||||
|
||||
if (force) {
|
||||
while(true) {
|
||||
bool expected = false;
|
||||
|
@ -198,8 +205,7 @@ Result RocksDBCounterManager::sync(bool force) {
|
|||
}
|
||||
}
|
||||
|
||||
// we have to commit all counters in one batch otherwise
|
||||
// there would be the possibility of
|
||||
// we have to commit all counters in one batch
|
||||
rocksdb::Status s = rtrx->Commit();
|
||||
if (s.ok()) {
|
||||
for (std::pair<uint64_t, CMValue> const& pair : copy) {
|
||||
|
@ -211,36 +217,48 @@ Result RocksDBCounterManager::sync(bool force) {
|
|||
}
|
||||
|
||||
void RocksDBCounterManager::readSettings() {
|
||||
#if 0
|
||||
RocksDBKey key = RocksDBKey::SettingsValue();
|
||||
|
||||
std::string result;
|
||||
rocksdb::Status status = _db->Get(rocksdb::ReadOptions(), key.string(), &result);
|
||||
if (status.ok()) {
|
||||
// key may not be there...
|
||||
// key may not be there, so don't fail when not found
|
||||
VPackSlice slice = VPackSlice(result.data());
|
||||
TRI_ASSERT(slice.isObject());
|
||||
LOG_TOPIC(TRACE, Logger::ENGINES) << "read initial settings: " << slice.toJson();
|
||||
|
||||
if (!result.empty()) {
|
||||
try {
|
||||
std::shared_ptr<VPackBuilder> builder = VPackParser::fromJson(result);
|
||||
VPackSlice s = builder->slice();
|
||||
|
||||
uint64_t lastTick = basics::VelocyPackHelper::stringUInt64(s.get("tick"));
|
||||
TRI_UpdateTickServer(lastTick);
|
||||
} catch (...) {
|
||||
LOG_TOPIC(WARN, Logger::ENGINES) << "unable to read initial settings: invalid data";
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void RocksDBCounterManager::writeSettings() {
|
||||
#if 0
|
||||
RocksDBKey key = RocksDBKey::SettingsValue();
|
||||
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.add("tick", VPackValue(std::to_string(TRI_CurrentTickServer())));
|
||||
builder.close();
|
||||
|
||||
VPackSlice slice = builder.slice();
|
||||
LOG_TOPIC(TRACE, Logger::ENGINES) << "writing settings: " << slice.toJson();
|
||||
|
||||
rocksdb::Slice value(slice.startAs<char>(), slice.byteSize());
|
||||
|
||||
rocksdb::Status status = _db->Put(rocksdb::WriteOptions(), key.string(), value);
|
||||
|
||||
if (status.ok()) {
|
||||
// TODO
|
||||
if (!status.ok()) {
|
||||
LOG_TOPIC(TRACE, Logger::ENGINES) << "writing settings failed";
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/// Parse counter values from rocksdb
|
||||
|
@ -271,7 +289,7 @@ struct WBReader : public rocksdb::WriteBatch::Handler {
|
|||
std::unordered_map<uint64_t, RocksDBCounterManager::CounterAdjustment> deltas;
|
||||
rocksdb::SequenceNumber currentSeqNum;
|
||||
|
||||
explicit WBReader() {}
|
||||
explicit WBReader() : currentSeqNum(0) {}
|
||||
|
||||
bool prepKey(const rocksdb::Slice& key) {
|
||||
if (RocksDBKey::type(key) == RocksDBEntryType::Document) {
|
||||
|
|
|
@ -99,7 +99,6 @@ class RocksDBCounterManager {
|
|||
explicit CMValue(arangodb::velocypack::Slice const&);
|
||||
void serialize(arangodb::velocypack::Builder&) const;
|
||||
};
|
||||
|
||||
|
||||
void readSettings();
|
||||
void writeSettings();
|
||||
|
|
|
@ -104,7 +104,10 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
|||
|
||||
// acquire rocksdb collection
|
||||
auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection);
|
||||
while (limit > 0) {
|
||||
|
||||
while (true) {
|
||||
TRI_ASSERT(limit > 0);
|
||||
|
||||
while (_iterator->Valid() &&
|
||||
(_index->_cmp->Compare(_iterator->key(), _bounds.end()) < 0)) {
|
||||
StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key());
|
||||
|
@ -121,14 +124,12 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
|||
}
|
||||
} // TODO do we need to handle failed lookups here?
|
||||
}
|
||||
if (limit > 0) {
|
||||
_keysIterator.next();
|
||||
if (!updateBounds()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
_keysIterator.next();
|
||||
if (!updateBounds()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void RocksDBEdgeIndexIterator::reset() {
|
||||
|
@ -323,14 +324,15 @@ IndexIterator* RocksDBEdgeIndex::iteratorForCondition(
|
|||
if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) {
|
||||
// a.b IN values
|
||||
if (!valNode->isArray()) {
|
||||
return nullptr;
|
||||
// a.b IN non-array
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
return createInIterator(trx, mmdr, attrNode, valNode);
|
||||
}
|
||||
|
||||
// operator type unsupported
|
||||
return nullptr;
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
/// @brief specializes the condition for use with the index
|
||||
|
|
|
@ -170,6 +170,13 @@ void RocksDBEngine::start() {
|
|||
static_cast<int>(opts->_maxBytesForLevelMultiplier);
|
||||
_options.verify_checksums_in_compaction = opts->_verifyChecksumsInCompaction;
|
||||
_options.optimize_filters_for_hits = opts->_optimizeFiltersForHits;
|
||||
_options.use_direct_reads = opts->_useDirectReads;
|
||||
_options.use_direct_writes = opts->_useDirectWrites;
|
||||
if (opts->_skipCorrupted) {
|
||||
_options.wal_recovery_mode = rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords;
|
||||
} else {
|
||||
_options.wal_recovery_mode = rocksdb::WALRecoveryMode::kPointInTimeRecovery;
|
||||
}
|
||||
|
||||
_options.base_background_compactions =
|
||||
static_cast<int>(opts->_baseBackgroundCompactions);
|
||||
|
@ -178,11 +185,14 @@ void RocksDBEngine::start() {
|
|||
|
||||
_options.max_log_file_size = static_cast<size_t>(opts->_maxLogFileSize);
|
||||
_options.keep_log_file_num = static_cast<size_t>(opts->_keepLogFileNum);
|
||||
_options.recycle_log_file_num = static_cast<size_t>(opts->_recycleLogFileNum);
|
||||
_options.log_file_time_to_roll =
|
||||
static_cast<size_t>(opts->_logFileTimeToRoll);
|
||||
_options.compaction_readahead_size =
|
||||
static_cast<size_t>(opts->_compactionReadaheadSize);
|
||||
|
||||
_options.IncreaseParallelism(TRI_numberProcessors());
|
||||
|
||||
_options.create_if_missing = true;
|
||||
_options.max_open_files = -1;
|
||||
_options.comparator = _cmp.get();
|
||||
|
|
|
@ -96,6 +96,7 @@ void RocksDBIndex::createCache() {
|
|||
if (!_useCache || _cachePresent) {
|
||||
// we should not get here if we do not need the cache
|
||||
// or if cache already created
|
||||
return;
|
||||
}
|
||||
|
||||
TRI_ASSERT(_cache.get() == nullptr);
|
||||
|
|
|
@ -35,6 +35,10 @@ using namespace arangodb::velocypack;
|
|||
|
||||
const char RocksDBKeyBounds::_stringSeparator = '\0';
|
||||
|
||||
RocksDBKeyBounds RocksDBKeyBounds::Empty() {
|
||||
return RocksDBKeyBounds();
|
||||
}
|
||||
|
||||
RocksDBKeyBounds RocksDBKeyBounds::Databases() {
|
||||
return RocksDBKeyBounds(RocksDBEntryType::Database);
|
||||
}
|
||||
|
@ -98,6 +102,11 @@ rocksdb::Slice const RocksDBKeyBounds::end() const {
|
|||
return rocksdb::Slice(_endBuffer);
|
||||
}
|
||||
|
||||
// constructor for an empty bound. do not use for anything but to
|
||||
// default-construct a key bound!
|
||||
RocksDBKeyBounds::RocksDBKeyBounds()
|
||||
: _type(RocksDBEntryType::Database), _startBuffer(), _endBuffer() {}
|
||||
|
||||
RocksDBKeyBounds::RocksDBKeyBounds(RocksDBEntryType type)
|
||||
: _type(type), _startBuffer(), _endBuffer() {
|
||||
switch (_type) {
|
||||
|
|
|
@ -38,8 +38,11 @@ namespace arangodb {
|
|||
|
||||
class RocksDBKeyBounds {
|
||||
public:
|
||||
RocksDBKeyBounds() = delete;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief empty bounds
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
static RocksDBKeyBounds Empty();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Bounds for list of all databases
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -125,7 +128,8 @@ class RocksDBKeyBounds {
|
|||
rocksdb::Slice const end() const;
|
||||
|
||||
private:
|
||||
RocksDBKeyBounds(RocksDBEntryType type);
|
||||
RocksDBKeyBounds();
|
||||
explicit RocksDBKeyBounds(RocksDBEntryType type);
|
||||
RocksDBKeyBounds(RocksDBEntryType type, uint64_t first);
|
||||
RocksDBKeyBounds(RocksDBEntryType type, uint64_t first,
|
||||
std::string const& second);
|
||||
|
|
|
@ -502,14 +502,15 @@ IndexIterator* RocksDBPrimaryIndex::iteratorForCondition(
|
|||
} else if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) {
|
||||
// a.b IN values
|
||||
if (!valNode->isArray()) {
|
||||
return nullptr;
|
||||
// a.b IN non-array
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
return createInIterator(trx, mmdr, attrNode, valNode);
|
||||
}
|
||||
|
||||
// operator type unsupported
|
||||
return nullptr;
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
|
||||
/// @brief specializes the condition for use with the index
|
||||
|
|
|
@ -28,11 +28,11 @@
|
|||
#include "Basics/VPackStringBufferAdapter.h"
|
||||
#include "RocksDBEngine/RocksDBCollection.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
#include "Transaction/Helpers.h"
|
||||
#include "Transaction/StandaloneContext.h"
|
||||
#include "Transaction/UserTransaction.h"
|
||||
#include "Transaction/Helpers.h"
|
||||
#include "VocBase/replication-common.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
||||
|
@ -70,8 +70,8 @@ uint64_t RocksDBReplicationContext::lastTick() const { return _lastTick; }
|
|||
uint64_t RocksDBReplicationContext::count() const {
|
||||
TRI_ASSERT(_trx != nullptr);
|
||||
TRI_ASSERT(_collection != nullptr);
|
||||
RocksDBCollection *rcoll =
|
||||
RocksDBCollection::toRocksDBCollection(_collection->getPhysical());
|
||||
RocksDBCollection* rcoll =
|
||||
RocksDBCollection::toRocksDBCollection(_collection->getPhysical());
|
||||
return rcoll->numberDocuments(_trx.get());
|
||||
}
|
||||
|
||||
|
@ -81,13 +81,16 @@ void RocksDBReplicationContext::bind(TRI_vocbase_t* vocbase) {
|
|||
_trx = createTransaction(vocbase);
|
||||
}
|
||||
|
||||
int RocksDBReplicationContext::bindCollection(std::string const& collectionName) {
|
||||
int RocksDBReplicationContext::bindCollection(
|
||||
std::string const& collectionName) {
|
||||
if ((_collection == nullptr) || _collection->name() != collectionName) {
|
||||
_collection = _trx->vocbase()->lookupCollection(collectionName);
|
||||
|
||||
|
||||
if (_collection == nullptr) {
|
||||
return TRI_ERROR_BAD_PARAMETER; RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick);
|
||||
return TRI_ERROR_BAD_PARAMETER;
|
||||
RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick);
|
||||
}
|
||||
_trx->addCollectionAtRuntime(collectionName);
|
||||
_iter = _collection->getAllIterator(_trx.get(), &_mdr,
|
||||
false); //_mdr is not used nor updated
|
||||
_hasMore = true;
|
||||
|
@ -136,8 +139,7 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
|
||||
VPackBuilder builder(&_vpackOptions);
|
||||
|
||||
uint64_t size = 0;
|
||||
auto cb = [this, &type, &buff, &adapter, &size,
|
||||
auto cb = [this, &type, &buff, &adapter,
|
||||
&builder](DocumentIdentifierToken const& token) {
|
||||
builder.clear();
|
||||
|
||||
|
@ -149,7 +151,8 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
|
||||
|
||||
if (!ok) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "could not get document with token: " << token._data;
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "could not get document with token: " << token._data;
|
||||
throw RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);
|
||||
}
|
||||
|
||||
|
@ -163,10 +166,9 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
VPackSlice slice = builder.slice();
|
||||
dumper.dump(slice);
|
||||
buff.appendChar('\n');
|
||||
size += (slice.byteSize() + 1);
|
||||
};
|
||||
|
||||
while (_hasMore && (size < chunkSize)) {
|
||||
while (_hasMore && buff.length() < chunkSize) {
|
||||
try {
|
||||
_hasMore = _iter->next(cb, 10); // TODO: adjust limit?
|
||||
} catch (std::exception const& ex) {
|
||||
|
@ -179,25 +181,26 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
return RocksDBReplicationResult(TRI_ERROR_NO_ERROR, _lastTick);
|
||||
}
|
||||
|
||||
arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder &b,
|
||||
arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
|
||||
uint64_t chunkSize) {
|
||||
TRI_ASSERT(_trx);
|
||||
TRI_ASSERT(_iter);
|
||||
|
||||
RocksDBAllIndexIterator *primary = dynamic_cast<RocksDBAllIndexIterator*>(_iter.get());
|
||||
|
||||
|
||||
RocksDBAllIndexIterator* primary =
|
||||
static_cast<RocksDBAllIndexIterator*>(_iter.get());
|
||||
|
||||
std::string lowKey;
|
||||
VPackSlice highKey;// FIXME: no good keeping this
|
||||
|
||||
VPackSlice highKey; // FIXME: no good keeping this
|
||||
|
||||
uint64_t hash = 0x012345678;
|
||||
//auto cb = [&](DocumentIdentifierToken const& token, StringRef const& key) {
|
||||
// auto cb = [&](DocumentIdentifierToken const& token, StringRef const& key) {
|
||||
auto cb = [&](DocumentIdentifierToken const& token) {
|
||||
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
|
||||
if (!ok) {
|
||||
// TODO: do something here?
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// current document
|
||||
VPackSlice current(_mdr.vpack());
|
||||
highKey = current.get(StaticStrings::KeyString);
|
||||
|
@ -205,19 +208,19 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder &b,
|
|||
if (lowKey.empty()) {
|
||||
lowKey = highKey.copyString();
|
||||
}
|
||||
|
||||
|
||||
// we can get away with the fast hash function here, as key values are
|
||||
// restricted to strings
|
||||
hash ^= transaction::helpers::extractKeyFromDocument(current).hashString();
|
||||
hash ^= transaction::helpers::extractRevSliceFromDocument(current).hash();
|
||||
};
|
||||
|
||||
|
||||
b.openArray();
|
||||
while (_hasMore && true /*sizelimit*/) {
|
||||
try {
|
||||
|
||||
//_hasMore = primary->nextWithKey(cb, chunkSize);
|
||||
_hasMore = primary->next(cb, chunkSize);
|
||||
|
||||
|
||||
b.add(VPackValue(VPackValueType::Object));
|
||||
b.add("low", VPackValue(lowKey));
|
||||
b.add("high", VPackValue(highKey.copyString()));
|
||||
|
@ -228,12 +231,13 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder &b,
|
|||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
b.close();
|
||||
|
||||
return Result();
|
||||
}
|
||||
|
||||
/// dump all keys from collection
|
||||
arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder &b,
|
||||
arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder& b,
|
||||
size_t chunk,
|
||||
size_t chunkSize) {
|
||||
TRI_ASSERT(_trx);
|
||||
|
@ -243,30 +247,34 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder &b,
|
|||
if (from == 0) {
|
||||
_iter->reset();
|
||||
_lastChunkOffset = 0;
|
||||
} else if (from < _lastChunkOffset+chunkSize) {
|
||||
_hasMore = true;
|
||||
} else if (from < _lastChunkOffset + chunkSize) {
|
||||
TRI_ASSERT(from >= chunkSize);
|
||||
uint64_t diff = from - chunkSize;
|
||||
uint64_t to;// = (chunk + 1) * chunkSize;
|
||||
uint64_t to; // = (chunk + 1) * chunkSize;
|
||||
_iter->skip(diff, to);
|
||||
TRI_ASSERT(to == diff);
|
||||
TRI_ASSERT(to == diff);
|
||||
_lastChunkOffset = from;
|
||||
} else if (from > _lastChunkOffset+chunkSize) {
|
||||
} else if (from > _lastChunkOffset + chunkSize) {
|
||||
// no jumping back in time fix the intitial syncer if you see this
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "Trying to request a chunk the rocksdb "
|
||||
<< "iterator already passed over";
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "Trying to request a chunk the rocksdb "
|
||||
<< "iterator already passed over";
|
||||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
RocksDBAllIndexIterator *primary = dynamic_cast<RocksDBAllIndexIterator*>(_iter.get());
|
||||
|
||||
RocksDBAllIndexIterator* primary =
|
||||
static_cast<RocksDBAllIndexIterator*>(_iter.get());
|
||||
auto cb = [&](DocumentIdentifierToken const& token, StringRef const& key) {
|
||||
RocksDBToken const& rt = static_cast<RocksDBToken const&>(token);
|
||||
|
||||
|
||||
b.openArray();
|
||||
b.add(VPackValuePair(key.data(), key.size(), VPackValueType::String));
|
||||
b.add(VPackValue(rt.revisionId()));
|
||||
b.add(VPackValue(std::to_string(rt.revisionId())));
|
||||
b.close();
|
||||
};
|
||||
|
||||
|
||||
b.openArray();
|
||||
// chunk is going to be ignored here
|
||||
while (_hasMore && true /*sizelimit*/) {
|
||||
try {
|
||||
|
@ -275,15 +283,14 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder &b,
|
|||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
b.close();
|
||||
|
||||
return Result();
|
||||
}
|
||||
|
||||
/// dump keys and document
|
||||
arangodb::Result RocksDBReplicationContext::dumpDocuments(VPackBuilder &b,
|
||||
size_t chunk,
|
||||
size_t chunkSize,
|
||||
VPackSlice const& ids) {
|
||||
arangodb::Result RocksDBReplicationContext::dumpDocuments(
|
||||
VPackBuilder& b, size_t chunk, size_t chunkSize, VPackSlice const& ids) {
|
||||
TRI_ASSERT(_trx);
|
||||
TRI_ASSERT(_iter);
|
||||
// Position the iterator correctly
|
||||
|
@ -291,20 +298,22 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(VPackBuilder &b,
|
|||
if (from == 0) {
|
||||
_iter->reset();
|
||||
_lastChunkOffset = 0;
|
||||
} else if (from < _lastChunkOffset+chunkSize) {
|
||||
_hasMore = true;
|
||||
} else if (from < _lastChunkOffset + chunkSize) {
|
||||
TRI_ASSERT(from >= chunkSize);
|
||||
uint64_t diff = from - chunkSize;
|
||||
uint64_t to;// = (chunk + 1) * chunkSize;
|
||||
uint64_t to; // = (chunk + 1) * chunkSize;
|
||||
_iter->skip(diff, to);
|
||||
TRI_ASSERT(to == diff);
|
||||
_lastChunkOffset = from;
|
||||
} else if (from > _lastChunkOffset+chunkSize) {
|
||||
} else if (from > _lastChunkOffset + chunkSize) {
|
||||
// no jumping back in time fix the intitial syncer if you see this
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "Trying to request a chunk the rocksdb "
|
||||
<< "iterator already passed over";
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "Trying to request a chunk the rocksdb "
|
||||
<< "iterator already passed over";
|
||||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
|
||||
auto cb = [&](DocumentIdentifierToken const& token) {
|
||||
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
|
||||
if (!ok) {
|
||||
|
@ -315,7 +324,8 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(VPackBuilder &b,
|
|||
TRI_ASSERT(current.isObject());
|
||||
b.add(current);
|
||||
};
|
||||
|
||||
|
||||
b.openArray();
|
||||
bool hasMore = true;
|
||||
size_t oldPos = from;
|
||||
for (auto const& it : VPackArrayIterator(ids)) {
|
||||
|
@ -323,7 +333,7 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(VPackBuilder &b,
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
|
||||
}
|
||||
TRI_ASSERT(hasMore);
|
||||
|
||||
|
||||
size_t newPos = from + it.getNumber<size_t>();
|
||||
if (oldPos != from && newPos > oldPos + 1) {
|
||||
uint64_t ignore;
|
||||
|
@ -332,7 +342,8 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(VPackBuilder &b,
|
|||
}
|
||||
hasMore = _iter->next(cb, 1);
|
||||
}
|
||||
|
||||
b.close();
|
||||
|
||||
return Result();
|
||||
}
|
||||
|
||||
|
@ -372,7 +383,8 @@ void RocksDBReplicationContext::releaseDumpingResources() {
|
|||
std::unique_ptr<transaction::Methods>
|
||||
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
|
||||
double lockTimeout = transaction::Methods::DefaultLockTimeout;
|
||||
std::shared_ptr<transaction::StandaloneContext> ctx = transaction::StandaloneContext::Create(vocbase);
|
||||
std::shared_ptr<transaction::StandaloneContext> ctx =
|
||||
transaction::StandaloneContext::Create(vocbase);
|
||||
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
|
||||
ctx, {}, {}, {}, lockTimeout, false, true));
|
||||
Result res = trx->begin();
|
||||
|
|
|
@ -31,9 +31,9 @@
|
|||
#include "Transaction/Methods.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/Options.h>
|
||||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/Builder.h>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
|
@ -67,21 +67,21 @@ class RocksDBReplicationContext {
|
|||
RocksDBReplicationResult dump(TRI_vocbase_t* vocbase,
|
||||
std::string const& collectionName,
|
||||
basics::StringBuffer&, uint64_t chunkSize);
|
||||
|
||||
|
||||
// iterates over all documents in a collection, previously bound with
|
||||
// bindCollection. Generates array of objects with minKey, maxKey and hash
|
||||
// per chunk. Distance between min and maxKey should be chunkSize
|
||||
arangodb::Result dumpKeyChunks(velocypack::Builder &outBuilder,
|
||||
arangodb::Result dumpKeyChunks(velocypack::Builder& outBuilder,
|
||||
uint64_t chunkSize);
|
||||
|
||||
|
||||
/// dump all keys from collection
|
||||
arangodb::Result dumpKeys(velocypack::Builder &outBuilder, size_t chunk, size_t chunkSize);
|
||||
arangodb::Result dumpKeys(velocypack::Builder& outBuilder, size_t chunk,
|
||||
size_t chunkSize);
|
||||
/// dump keys and document
|
||||
arangodb::Result dumpDocuments(velocypack::Builder &b,
|
||||
size_t chunk,
|
||||
arangodb::Result dumpDocuments(velocypack::Builder& b, size_t chunk,
|
||||
size_t chunkSize,
|
||||
velocypack::Slice const& ids);
|
||||
|
||||
|
||||
double expires() const;
|
||||
bool isDeleted() const;
|
||||
void deleted();
|
||||
|
|
|
@ -57,6 +57,7 @@ class WBReader : public rocksdb::WriteBatch::Handler {
|
|||
_builder.add(
|
||||
"type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_COLLECTION_CREATE)));
|
||||
break;
|
||||
}
|
||||
case RocksDBEntryType::Document: {
|
||||
_builder.add(
|
||||
|
|
|
@ -329,36 +329,37 @@ bool RocksDBRestReplicationHandler::isCoordinatorError() {
|
|||
void RocksDBRestReplicationHandler::handleCommandLoggerState() {
|
||||
VPackBuilder builder;
|
||||
builder.add(VPackValue(VPackValueType::Object)); // Base
|
||||
|
||||
//MMFilesLogfileManager::instance()->waitForSync(10.0);
|
||||
//MMFilesLogfileManagerState const s =
|
||||
//MMFilesLogfileManager::instance()->state();
|
||||
|
||||
// MMFilesLogfileManager::instance()->waitForSync(10.0);
|
||||
// MMFilesLogfileManagerState const s =
|
||||
// MMFilesLogfileManager::instance()->state();
|
||||
rocksdb::TransactionDB* db =
|
||||
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
|
||||
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
|
||||
rocksdb::Status status = db->GetBaseDB()->SyncWAL();
|
||||
if (!status.ok()) {
|
||||
Result res = rocksutils::convertStatus(status).errorNumber();
|
||||
generateError(rest::ResponseCode::BAD, res.errorNumber(), res.errorMessage());
|
||||
generateError(rest::ResponseCode::BAD, res.errorNumber(),
|
||||
res.errorMessage());
|
||||
return;
|
||||
}
|
||||
rocksdb::SequenceNumber lastTick = db->GetLatestSequenceNumber();
|
||||
|
||||
|
||||
// "state" part
|
||||
builder.add("state", VPackValue(VPackValueType::Object));
|
||||
builder.add("running", VPackValue(true));
|
||||
builder.add("lastLogTick", VPackValue(std::to_string(lastTick)));
|
||||
builder.add("lastUncommittedLogTick",
|
||||
VPackValue(std::to_string(lastTick+1)));
|
||||
builder.add("totalEvents", VPackValue(0));//s.numEvents + s.numEventsSync
|
||||
VPackValue(std::to_string(lastTick + 1)));
|
||||
builder.add("totalEvents", VPackValue(0)); // s.numEvents + s.numEventsSync
|
||||
builder.add("time", VPackValue(utilities::timeString()));
|
||||
builder.close();
|
||||
|
||||
|
||||
// "server" part
|
||||
builder.add("server", VPackValue(VPackValueType::Object));
|
||||
builder.add("version", VPackValue(ARANGODB_VERSION));
|
||||
builder.add("serverId", VPackValue(std::to_string(ServerIdFeature::getId())));
|
||||
builder.close();
|
||||
|
||||
|
||||
// "clients" part
|
||||
builder.add("clients", VPackValue(VPackValueType::Array));
|
||||
auto allClients = _vocbase->getReplicationClients();
|
||||
|
@ -366,19 +367,19 @@ void RocksDBRestReplicationHandler::handleCommandLoggerState() {
|
|||
// One client
|
||||
builder.add(VPackValue(VPackValueType::Object));
|
||||
builder.add("serverId", VPackValue(std::to_string(std::get<0>(it))));
|
||||
|
||||
|
||||
char buffer[21];
|
||||
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
|
||||
builder.add("time", VPackValue(buffer));
|
||||
|
||||
|
||||
builder.add("lastServedTick", VPackValue(std::to_string(std::get<2>(it))));
|
||||
|
||||
|
||||
builder.close();
|
||||
}
|
||||
builder.close(); // clients
|
||||
|
||||
|
||||
builder.close(); // base
|
||||
|
||||
|
||||
generateResult(rest::ResponseCode::OK, builder.slice());
|
||||
}
|
||||
|
||||
|
@ -421,7 +422,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
|
|||
|
||||
VPackBuilder b;
|
||||
b.add(VPackValue(VPackValueType::Object));
|
||||
b.add("id", VPackValue(std::to_string(ctx->id())));
|
||||
b.add("id", VPackValue(std::to_string(ctx->id()))); // id always string
|
||||
b.close();
|
||||
|
||||
generateResult(rest::ResponseCode::OK, b.slice());
|
||||
|
@ -721,10 +722,10 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
// add client
|
||||
bool found;
|
||||
std::string const& value = _request->value("serverId", found);
|
||||
|
||||
|
||||
if (found) {
|
||||
TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value);
|
||||
|
||||
|
||||
if (serverId > 0) {
|
||||
_vocbase->updateReplicationClient(serverId, result.maxTick());
|
||||
}
|
||||
|
@ -757,11 +758,16 @@ void RocksDBRestReplicationHandler::handleCommandInventory() {
|
|||
if (found) {
|
||||
ctx = _manager->find(StringUtils::uint64(batchId), busy);
|
||||
}
|
||||
if (!found || busy || ctx == nullptr) {
|
||||
if (!found) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"batchId not specified");
|
||||
return;
|
||||
}
|
||||
if (busy || ctx == nullptr) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"context is busy or nullptr");
|
||||
return;
|
||||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
||||
TRI_voc_tick_t tick = TRI_CurrentTickServer();
|
||||
|
@ -1025,7 +1031,7 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
|
|||
"invalid collection parameter");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
RocksDBReplicationContext* ctx = nullptr;
|
||||
bool found, busy;
|
||||
std::string batchId = _request->value("batchId", found);
|
||||
|
@ -1038,54 +1044,53 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
|
|||
return;
|
||||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
||||
//TRI_voc_tick_t tickEnd = UINT64_MAX;
|
||||
|
||||
// TRI_voc_tick_t tickEnd = UINT64_MAX;
|
||||
// determine end tick for keys
|
||||
//std::string const& value = _request->value("to", found);
|
||||
//if (found) {
|
||||
// std::string const& value = _request->value("to", found);
|
||||
// if (found) {
|
||||
// tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value));
|
||||
//}
|
||||
|
||||
//arangodb::LogicalCollection* c = _vocbase->lookupCollection(collection);
|
||||
//arangodb::CollectionGuard guard(_vocbase, c->cid(), false);
|
||||
//arangodb::LogicalCollection* col = guard.collection();
|
||||
|
||||
|
||||
// arangodb::LogicalCollection* c = _vocbase->lookupCollection(collection);
|
||||
// arangodb::CollectionGuard guard(_vocbase, c->cid(), false);
|
||||
// arangodb::LogicalCollection* col = guard.collection();
|
||||
|
||||
int res = ctx->bindCollection(collection);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND,
|
||||
TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// turn off the compaction for the collection
|
||||
//StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
//TRI_voc_tick_t id;
|
||||
//int res = engine->insertCompactionBlocker(_vocbase, 1200.0, id);
|
||||
//if (res != TRI_ERROR_NO_ERROR) {
|
||||
// StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
// TRI_voc_tick_t id;
|
||||
// int res = engine->insertCompactionBlocker(_vocbase, 1200.0, id);
|
||||
// if (res != TRI_ERROR_NO_ERROR) {
|
||||
// THROW_ARANGO_EXCEPTION(res);
|
||||
//}
|
||||
|
||||
|
||||
// initialize a container with the keys
|
||||
//auto keys =
|
||||
//std::make_unique<MMFilesCollectionKeys>(_vocbase, col->name(), id, 300.0);
|
||||
|
||||
//std::string const idString(std::to_string(keys->id()));
|
||||
|
||||
//keys->create(tickEnd);
|
||||
//size_t const count = keys->count();
|
||||
|
||||
//auto keysRepository = _vocbase->collectionKeys();
|
||||
|
||||
//keysRepository->store(keys.get());
|
||||
//keys.release();
|
||||
|
||||
// auto keys =
|
||||
// std::make_unique<MMFilesCollectionKeys>(_vocbase, col->name(), id, 300.0);
|
||||
|
||||
// std::string const idString(std::to_string(keys->id()));
|
||||
|
||||
// keys->create(tickEnd);
|
||||
// size_t const count = keys->count();
|
||||
|
||||
// auto keysRepository = _vocbase->collectionKeys();
|
||||
|
||||
// keysRepository->store(keys.get());
|
||||
// keys.release();
|
||||
|
||||
VPackBuilder result;
|
||||
result.add(VPackValue(VPackValueType::Object));
|
||||
result.add("id", VPackValue(ctx->id()));
|
||||
result.add("id", VPackValue(StringUtils::itoa(ctx->id())));
|
||||
result.add("count", VPackValue(ctx->count()));
|
||||
result.close();
|
||||
generateResult(rest::ResponseCode::OK, result.slice());
|
||||
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1094,20 +1099,20 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
|
|||
|
||||
void RocksDBRestReplicationHandler::handleCommandGetKeys() {
|
||||
std::vector<std::string> const& suffixes = _request->suffixes();
|
||||
|
||||
|
||||
if (suffixes.size() != 2) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting GET /_api/replication/keys/<keys-id>");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
static uint64_t const DefaultChunkSize = 5000;
|
||||
uint64_t chunkSize = DefaultChunkSize;
|
||||
|
||||
|
||||
// determine chunk size
|
||||
bool found;
|
||||
std::string const& value = _request->value("chunkSize", found);
|
||||
|
||||
|
||||
if (found) {
|
||||
chunkSize = StringUtils::uint64(value);
|
||||
if (chunkSize < 100) {
|
||||
|
@ -1116,18 +1121,18 @@ void RocksDBRestReplicationHandler::handleCommandGetKeys() {
|
|||
chunkSize = 20000;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::string const& id = suffixes[1];
|
||||
uint64_t batchId = arangodb::basics::StringUtils::uint64(id);
|
||||
bool busy;
|
||||
RocksDBReplicationContext *ctx = _manager->find(batchId, busy);
|
||||
RocksDBReplicationContext* ctx = _manager->find(batchId, busy);
|
||||
if (busy || ctx == nullptr) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"batchId not specified");
|
||||
return;
|
||||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
||||
|
||||
VPackBuilder b;
|
||||
ctx->dumpKeyChunks(b, chunkSize);
|
||||
generateResult(rest::ResponseCode::OK, b.slice());
|
||||
|
@ -1139,20 +1144,20 @@ void RocksDBRestReplicationHandler::handleCommandGetKeys() {
|
|||
|
||||
void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
||||
std::vector<std::string> const& suffixes = _request->suffixes();
|
||||
|
||||
|
||||
if (suffixes.size() != 2) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting PUT /_api/replication/keys/<keys-id>");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
static uint64_t const DefaultChunkSize = 5000;
|
||||
uint64_t chunkSize = DefaultChunkSize;
|
||||
|
||||
|
||||
// determine chunk size
|
||||
bool found;
|
||||
std::string const& value1 = _request->value("chunkSize", found);
|
||||
|
||||
|
||||
if (found) {
|
||||
chunkSize = StringUtils::uint64(value1);
|
||||
if (chunkSize < 100) {
|
||||
|
@ -1161,16 +1166,16 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
|||
chunkSize = 20000;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::string const& value2 = _request->value("chunk", found);
|
||||
|
||||
|
||||
size_t chunk = 0;
|
||||
if (found) {
|
||||
chunk = static_cast<size_t>(StringUtils::uint64(value2));
|
||||
}
|
||||
|
||||
|
||||
std::string const& value3 = _request->value("type", found);
|
||||
|
||||
|
||||
bool keys = true;
|
||||
if (value3 == "keys") {
|
||||
keys = true;
|
||||
|
@ -1181,28 +1186,25 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
|||
"invalid 'type' value");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
std::string const& id = suffixes[1];
|
||||
|
||||
|
||||
uint64_t batchId = arangodb::basics::StringUtils::uint64(id);
|
||||
bool busy;
|
||||
RocksDBReplicationContext *ctx = _manager->find(batchId, busy);
|
||||
RocksDBReplicationContext* ctx = _manager->find(batchId, busy);
|
||||
if (busy || ctx == nullptr) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"batchId not specified");
|
||||
return;
|
||||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
||||
|
||||
std::shared_ptr<transaction::Context> transactionContext =
|
||||
transaction::StandaloneContext::Create(_vocbase);
|
||||
|
||||
transaction::StandaloneContext::Create(_vocbase);
|
||||
|
||||
VPackBuilder resultBuilder(transactionContext->getVPackOptions());
|
||||
resultBuilder.openArray();
|
||||
|
||||
if (keys) {
|
||||
ctx->dumpKeys(resultBuilder, chunk,
|
||||
static_cast<size_t>(chunkSize));
|
||||
ctx->dumpKeys(resultBuilder, chunk, static_cast<size_t>(chunkSize));
|
||||
} else {
|
||||
bool success;
|
||||
std::shared_ptr<VPackBuilder> parsedIds = parseVelocyPackBody(success);
|
||||
|
@ -1210,25 +1212,23 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
|||
generateResult(rest::ResponseCode::BAD, VPackSlice());
|
||||
return;
|
||||
}
|
||||
ctx->dumpDocuments(resultBuilder, chunk,
|
||||
static_cast<size_t>(chunkSize),
|
||||
parsedIds->slice());
|
||||
ctx->dumpDocuments(resultBuilder, chunk, static_cast<size_t>(chunkSize),
|
||||
parsedIds->slice());
|
||||
}
|
||||
|
||||
resultBuilder.close();
|
||||
|
||||
generateResult(rest::ResponseCode::OK, resultBuilder.slice(),
|
||||
transactionContext);
|
||||
}
|
||||
|
||||
void RocksDBRestReplicationHandler::handleCommandRemoveKeys() {
|
||||
std::vector<std::string> const& suffixes = _request->suffixes();
|
||||
|
||||
|
||||
if (suffixes.size() != 2) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting DELETE /_api/replication/keys/<keys-id>");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
std::string const& id = suffixes[1];
|
||||
/*uint64_t batchId = arangodb::basics::StringUtils::uint64(id);
|
||||
bool busy;
|
||||
|
@ -1239,20 +1239,19 @@ void RocksDBRestReplicationHandler::handleCommandRemoveKeys() {
|
|||
return;
|
||||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);*/
|
||||
|
||||
|
||||
|
||||
/*auto keys = _vocbase->collectionKeys();
|
||||
TRI_ASSERT(keys != nullptr);
|
||||
|
||||
|
||||
auto collectionKeysId =
|
||||
static_cast<CollectionKeysId>(arangodb::basics::StringUtils::uint64(id));
|
||||
bool found = keys->remove(collectionKeysId);
|
||||
|
||||
|
||||
if (!found) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND);
|
||||
return;
|
||||
}*/
|
||||
|
||||
|
||||
VPackBuilder resultBuilder;
|
||||
resultBuilder.openObject();
|
||||
resultBuilder.add("id", VPackValue(id)); // id as a string
|
||||
|
@ -1260,7 +1259,7 @@ void RocksDBRestReplicationHandler::handleCommandRemoveKeys() {
|
|||
resultBuilder.add("code",
|
||||
VPackValue(static_cast<int>(rest::ResponseCode::ACCEPTED)));
|
||||
resultBuilder.close();
|
||||
|
||||
|
||||
generateResult(rest::ResponseCode::ACCEPTED, resultBuilder.slice());
|
||||
}
|
||||
|
||||
|
@ -1359,60 +1358,60 @@ void RocksDBRestReplicationHandler::handleCommandSync() {
|
|||
// error already created
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
VPackSlice const body = parsedBody->slice();
|
||||
|
||||
|
||||
std::string const endpoint =
|
||||
VelocyPackHelper::getStringValue(body, "endpoint", "");
|
||||
|
||||
VelocyPackHelper::getStringValue(body, "endpoint", "");
|
||||
|
||||
if (endpoint.empty()) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"<endpoint> must be a valid endpoint");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
std::string const database =
|
||||
VelocyPackHelper::getStringValue(body, "database", _vocbase->name());
|
||||
VelocyPackHelper::getStringValue(body, "database", _vocbase->name());
|
||||
std::string const username =
|
||||
VelocyPackHelper::getStringValue(body, "username", "");
|
||||
VelocyPackHelper::getStringValue(body, "username", "");
|
||||
std::string const password =
|
||||
VelocyPackHelper::getStringValue(body, "password", "");
|
||||
VelocyPackHelper::getStringValue(body, "password", "");
|
||||
std::string const jwt = VelocyPackHelper::getStringValue(body, "jwt", "");
|
||||
bool const verbose =
|
||||
VelocyPackHelper::getBooleanValue(body, "verbose", false);
|
||||
VelocyPackHelper::getBooleanValue(body, "verbose", false);
|
||||
bool const includeSystem =
|
||||
VelocyPackHelper::getBooleanValue(body, "includeSystem", true);
|
||||
VelocyPackHelper::getBooleanValue(body, "includeSystem", true);
|
||||
bool const incremental =
|
||||
VelocyPackHelper::getBooleanValue(body, "incremental", false);
|
||||
VelocyPackHelper::getBooleanValue(body, "incremental", false);
|
||||
bool const keepBarrier =
|
||||
VelocyPackHelper::getBooleanValue(body, "keepBarrier", false);
|
||||
VelocyPackHelper::getBooleanValue(body, "keepBarrier", false);
|
||||
bool const useCollectionId =
|
||||
VelocyPackHelper::getBooleanValue(body, "useCollectionId", true);
|
||||
|
||||
VelocyPackHelper::getBooleanValue(body, "useCollectionId", true);
|
||||
|
||||
std::unordered_map<std::string, bool> restrictCollections;
|
||||
VPackSlice const restriction = body.get("restrictCollections");
|
||||
|
||||
|
||||
if (restriction.isArray()) {
|
||||
for (VPackSlice const& cname : VPackArrayIterator(restriction)) {
|
||||
if (cname.isString()) {
|
||||
restrictCollections.insert(
|
||||
std::pair<std::string, bool>(cname.copyString(), true));
|
||||
std::pair<std::string, bool>(cname.copyString(), true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::string restrictType =
|
||||
VelocyPackHelper::getStringValue(body, "restrictType", "");
|
||||
|
||||
VelocyPackHelper::getStringValue(body, "restrictType", "");
|
||||
|
||||
if ((restrictType.empty() && !restrictCollections.empty()) ||
|
||||
(!restrictType.empty() && restrictCollections.empty()) ||
|
||||
(!restrictType.empty() && restrictType != "include" &&
|
||||
restrictType != "exclude")) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid value for <restrictCollections> or <restrictType>");
|
||||
return;
|
||||
}
|
||||
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid value for <restrictCollections> or <restrictType>");
|
||||
return;
|
||||
}
|
||||
|
||||
TRI_replication_applier_configuration_t config;
|
||||
config._endpoint = endpoint;
|
||||
config._database = database;
|
||||
|
@ -1422,31 +1421,32 @@ void RocksDBRestReplicationHandler::handleCommandSync() {
|
|||
config._includeSystem = includeSystem;
|
||||
config._verbose = verbose;
|
||||
config._useCollectionId = useCollectionId;
|
||||
|
||||
|
||||
// wait until all data in current logfile got synced
|
||||
//MMFilesLogfileManager::instance()->waitForSync(5.0);
|
||||
// MMFilesLogfileManager::instance()->waitForSync(5.0);
|
||||
rocksdb::TransactionDB* db =
|
||||
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
|
||||
|
||||
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
|
||||
|
||||
rocksdb::Status status = db->GetBaseDB()->SyncWAL();
|
||||
if (!status.ok()) {
|
||||
Result res = rocksutils::convertStatus(status).errorNumber();
|
||||
generateError(rest::ResponseCode::BAD, res.errorNumber(), res.errorMessage());
|
||||
generateError(rest::ResponseCode::BAD, res.errorNumber(),
|
||||
res.errorMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
InitialSyncer syncer(_vocbase, &config, restrictCollections, restrictType,
|
||||
verbose);
|
||||
|
||||
|
||||
std::string errorMsg = "";
|
||||
|
||||
|
||||
/*int res = */ syncer.run(errorMsg, incremental);
|
||||
|
||||
|
||||
VPackBuilder result;
|
||||
result.add(VPackValue(VPackValueType::Object));
|
||||
|
||||
|
||||
result.add("collections", VPackValue(VPackValueType::Array));
|
||||
|
||||
|
||||
for (auto const& it : syncer.getProcessedCollections()) {
|
||||
std::string const cidString = StringUtils::itoa(it.first);
|
||||
// Insert a collection
|
||||
|
@ -1455,17 +1455,17 @@ void RocksDBRestReplicationHandler::handleCommandSync() {
|
|||
result.add("name", VPackValue(it.second));
|
||||
result.close(); // one collection
|
||||
}
|
||||
|
||||
|
||||
result.close(); // collections
|
||||
|
||||
|
||||
auto tickString = std::to_string(syncer.getLastLogTick());
|
||||
result.add("lastLogTick", VPackValue(tickString));
|
||||
|
||||
|
||||
if (keepBarrier) {
|
||||
auto barrierId = std::to_string(syncer.stealBarrier());
|
||||
result.add("barrierId", VPackValue(barrierId));
|
||||
}
|
||||
|
||||
|
||||
result.close(); // base
|
||||
generateResult(rest::ResponseCode::OK, result.slice());
|
||||
}
|
||||
|
|
|
@ -292,7 +292,7 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
|
|||
static_cast<RocksDBTransactionCollection*>(findCollection(cid));
|
||||
|
||||
if (collection == nullptr) {
|
||||
std::string message = "collection '" + collection->collectionName() +
|
||||
std::string message = "collection '" + std::to_string(cid) +
|
||||
"' not found in transaction state";
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
|
||||
}
|
||||
|
|
|
@ -1147,7 +1147,7 @@ IndexIterator* RocksDBVPackIndex::iteratorForCondition(
|
|||
// unsupported right now. Should have been rejected by
|
||||
// supportsFilterCondition
|
||||
TRI_ASSERT(false);
|
||||
return nullptr;
|
||||
return new EmptyIndexIterator(_collection, trx, mmdr, this);
|
||||
}
|
||||
value->toVelocyPackValue(searchValues);
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ class JobQueueThread final
|
|||
std::shared_ptr<Job> job(jobPtr);
|
||||
|
||||
_scheduler->post([this, self, job]() {
|
||||
RequestStatistics::SET_QUEUE_END(job->_handler->statistics());
|
||||
|
||||
try {
|
||||
job->_callback(std::move(job->_handler));
|
||||
} catch (std::exception& e) {
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "Basics/MutexLocker.h"
|
||||
#include "Basics/StringUtils.h"
|
||||
#include "Basics/Thread.h"
|
||||
#include "GeneralServer/RestHandler.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "Rest/GeneralResponse.h"
|
||||
#include "Scheduler/JobGuard.h"
|
||||
|
@ -328,6 +329,10 @@ bool Scheduler::hasQueueCapacity() const {
|
|||
}
|
||||
|
||||
bool Scheduler::queue(std::unique_ptr<Job> job) {
|
||||
auto jobQueue = _jobQueue.get();
|
||||
auto queueSize = (jobQueue == nullptr) ? 0 : jobQueue->queueSize();
|
||||
RequestStatistics::SET_QUEUE_START(job->_handler->statistics(), queueSize);
|
||||
|
||||
return _jobQueue->queue(std::move(job));
|
||||
}
|
||||
|
||||
|
|
|
@ -213,6 +213,20 @@ void RequestStatistics::fill(StatisticsDistribution& totalTime,
|
|||
bytesReceived = *TRI_BytesReceivedDistributionStatistics;
|
||||
}
|
||||
|
||||
std::string RequestStatistics::timingsCsv() {
|
||||
std::stringstream ss;
|
||||
|
||||
ss << std::setprecision(9) << std::fixed
|
||||
<< "read," << (_readEnd - _readStart)
|
||||
<< ",queue," << (_queueEnd - _queueStart)
|
||||
<< ",queue-size," << _queueSize
|
||||
<< ",request," << (_requestEnd - _requestStart)
|
||||
<< ",total," << (StatisticsFeature::time() - _readStart)
|
||||
<< ",error," << (_executeError ? "true" : "false");
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
std::string RequestStatistics::to_string() {
|
||||
std::stringstream ss;
|
||||
|
||||
|
|
|
@ -87,6 +87,19 @@ class RequestStatistics {
|
|||
}
|
||||
}
|
||||
|
||||
static void SET_QUEUE_START(RequestStatistics* stat, int64_t nrQueued) {
|
||||
if (stat != nullptr) {
|
||||
stat->_queueStart = StatisticsFeature::time();
|
||||
stat->_queueSize = nrQueued;
|
||||
}
|
||||
}
|
||||
|
||||
static void SET_QUEUE_END(RequestStatistics* stat) {
|
||||
if (stat != nullptr) {
|
||||
stat->_queueEnd = StatisticsFeature::time();
|
||||
}
|
||||
}
|
||||
|
||||
static void ADD_RECEIVED_BYTES(RequestStatistics* stat, size_t bytes) {
|
||||
if (stat != nullptr) {
|
||||
stat->_receivedBytes += bytes;
|
||||
|
@ -134,6 +147,7 @@ class RequestStatistics {
|
|||
basics::StatisticsDistribution& bytesSent,
|
||||
basics::StatisticsDistribution& bytesReceived);
|
||||
|
||||
std::string timingsCsv();
|
||||
std::string to_string();
|
||||
void trace_log();
|
||||
|
||||
|
@ -161,6 +175,7 @@ class RequestStatistics {
|
|||
_readEnd = 0.0;
|
||||
_queueStart = 0.0;
|
||||
_queueEnd = 0.0;
|
||||
_queueSize = 0;
|
||||
_requestStart = 0.0;
|
||||
_requestEnd = 0.0;
|
||||
_writeStart = 0.0;
|
||||
|
@ -178,8 +193,10 @@ class RequestStatistics {
|
|||
|
||||
double _readStart; // CommTask::processRead - read first byte of message
|
||||
double _readEnd; // CommTask::processRead - message complete
|
||||
double _queueStart; // addJob to queue GeneralServer::handleRequest
|
||||
double _queueEnd; // exit queue DispatcherThread::handleJob
|
||||
double _queueStart; // job added to JobQueue
|
||||
double _queueEnd; // job removed from JobQueue
|
||||
int64_t _queueSize;
|
||||
|
||||
double _requestStart; // GeneralServerJob::work
|
||||
double _requestEnd;
|
||||
double _writeStart;
|
||||
|
|
|
@ -34,14 +34,25 @@ struct DocumentIdentifierToken {
|
|||
public:
|
||||
// TODO Replace by Engine::InvalidToken
|
||||
constexpr DocumentIdentifierToken() : _data(0) {}
|
||||
|
||||
~DocumentIdentifierToken() {}
|
||||
|
||||
DocumentIdentifierToken& operator=(DocumentIdentifierToken const& other) {
|
||||
|
||||
DocumentIdentifierToken(DocumentIdentifierToken const& other) noexcept : _data(other._data) {}
|
||||
DocumentIdentifierToken& operator=(DocumentIdentifierToken const& other) noexcept {
|
||||
_data = other._data;
|
||||
return *this;
|
||||
}
|
||||
|
||||
DocumentIdentifierToken(DocumentIdentifierToken&& other) noexcept : _data(other._data) {
|
||||
other._data = 0;
|
||||
}
|
||||
|
||||
DocumentIdentifierToken& operator=(DocumentIdentifierToken&& other) noexcept {
|
||||
_data = other._data;
|
||||
other._data = 0;
|
||||
return *this;
|
||||
}
|
||||
|
||||
~DocumentIdentifierToken() {}
|
||||
|
||||
inline bool operator==(DocumentIdentifierToken const& other) const { return _data == other._data; }
|
||||
|
||||
inline bool operator==(uint64_t const& other) const { return _data == other; }
|
||||
|
|
|
@ -104,7 +104,7 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
// status functionality
|
||||
// --------------------
|
||||
|
||||
// return the name of the storage engine
|
||||
// return the name of the specific storage engine e.g. rocksdb
|
||||
char const* typeName() const { return _typeName.c_str(); }
|
||||
|
||||
// inventory functionality
|
||||
|
|
|
@ -392,40 +392,41 @@ static void JS_ChecksumCollection(
|
|||
|
||||
ManagedDocumentResult mmdr;
|
||||
trx.invokeOnAllElements(col->name(), [&hash, &withData, &withRevisions, &trx, &collection, &mmdr](DocumentIdentifierToken const& token) {
|
||||
collection->readDocument(&trx, token, mmdr);
|
||||
VPackSlice const slice(mmdr.vpack());
|
||||
if (collection->readDocument(&trx, token, mmdr)) {
|
||||
VPackSlice const slice(mmdr.vpack());
|
||||
|
||||
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
|
||||
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
|
||||
|
||||
if (withRevisions) {
|
||||
if (withRevisions) {
|
||||
localHash += transaction::helpers::extractRevSliceFromDocument(slice).hash();
|
||||
}
|
||||
}
|
||||
|
||||
if (withData) {
|
||||
if (withData) {
|
||||
// with data
|
||||
uint64_t const n = slice.length() ^ 0xf00ba44ba5;
|
||||
uint64_t seed = fasthash64_uint64(n, 0xdeadf054);
|
||||
|
||||
for (auto const& it : VPackObjectIterator(slice, false)) {
|
||||
// loop over all attributes, but exclude _rev, _id and _key
|
||||
// _id is different for each collection anyway, _rev is covered by withRevisions, and _key
|
||||
// was already handled before
|
||||
VPackValueLength keyLength;
|
||||
char const* key = it.key.getString(keyLength);
|
||||
if (keyLength >= 3 &&
|
||||
key[0] == '_' &&
|
||||
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
|
||||
(keyLength == 4 && (memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
|
||||
// exclude attribute
|
||||
continue;
|
||||
}
|
||||
|
||||
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
|
||||
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
|
||||
// loop over all attributes, but exclude _rev, _id and _key
|
||||
// _id is different for each collection anyway, _rev is covered by withRevisions, and _key
|
||||
// was already handled before
|
||||
VPackValueLength keyLength;
|
||||
char const* key = it.key.getString(keyLength);
|
||||
if (keyLength >= 3 &&
|
||||
key[0] == '_' &&
|
||||
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
|
||||
(keyLength == 4 && (memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
|
||||
// exclude attribute
|
||||
continue;
|
||||
}
|
||||
|
||||
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
|
||||
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
|
||||
}
|
||||
}
|
||||
|
||||
hash ^= localHash;
|
||||
}
|
||||
}
|
||||
|
||||
hash ^= localHash;
|
||||
return true;
|
||||
});
|
||||
|
||||
|
|
|
@ -21,12 +21,9 @@
|
|||
/// @author Dr. Frank Celler
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "v8-replication.h"
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "Cluster/ClusterFeature.h"
|
||||
#include "MMFiles/MMFilesLogfileManager.h"
|
||||
#include "MMFiles/mmfiles-replication-dump.h"
|
||||
#include "Replication/InitialSyncer.h"
|
||||
#include "Rest/Version.h"
|
||||
#include "RestServer/ServerIdFeature.h"
|
||||
|
@ -35,6 +32,14 @@
|
|||
#include "V8/v8-utils.h"
|
||||
#include "V8/v8-vpack.h"
|
||||
#include "V8Server/v8-vocbaseprivate.h"
|
||||
#include "v8-replication.h"
|
||||
|
||||
// FIXME to be removed (should be storage engine independent - get it working now)
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "MMFiles/MMFilesLogfileManager.h"
|
||||
#include "MMFiles/mmfiles-replication-dump.h"
|
||||
#include "RocksDBEngine/RocksDBEngine.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/Parser.h>
|
||||
|
@ -51,21 +56,41 @@ using namespace arangodb::rest;
|
|||
|
||||
static void JS_StateLoggerReplication(
|
||||
v8::FunctionCallbackInfo<v8::Value> const& args) {
|
||||
// FIXME: use code in RestReplicationHandler and get rid of storage-engine
|
||||
// depended code here
|
||||
//
|
||||
TRI_V8_TRY_CATCH_BEGIN(isolate);
|
||||
v8::HandleScope scope(isolate);
|
||||
|
||||
MMFilesLogfileManagerState const s =
|
||||
MMFilesLogfileManager::instance()->state();
|
||||
|
||||
v8::Handle<v8::Object> result = v8::Object::New(isolate);
|
||||
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
|
||||
|
||||
v8::Handle<v8::Object> state = v8::Object::New(isolate);
|
||||
state->Set(TRI_V8_ASCII_STRING("running"), v8::True(isolate));
|
||||
state->Set(TRI_V8_ASCII_STRING("lastLogTick"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastCommittedTick));
|
||||
state->Set(TRI_V8_ASCII_STRING("lastUncommittedLogTick"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastAssignedTick));
|
||||
state->Set(TRI_V8_ASCII_STRING("totalEvents"),
|
||||
|
||||
if(engineName == "mmfiles"){
|
||||
MMFilesLogfileManagerState const s = MMFilesLogfileManager::instance()->state();
|
||||
state->Set(TRI_V8_ASCII_STRING("lastLogTick"),
|
||||
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastCommittedTick));
|
||||
state->Set(TRI_V8_ASCII_STRING("lastUncommittedLogTick"), TRI_V8UInt64String<TRI_voc_tick_t>(isolate, s.lastAssignedTick));
|
||||
state->Set(TRI_V8_ASCII_STRING("totalEvents"),
|
||||
v8::Number::New(isolate, static_cast<double>(s.numEvents + s.numEventsSync)));
|
||||
state->Set(TRI_V8_ASCII_STRING("time"), TRI_V8_STD_STRING(s.timeString));
|
||||
state->Set(TRI_V8_ASCII_STRING("time"), TRI_V8_STD_STRING(s.timeString));
|
||||
} else if (engineName == "rocksdb") {
|
||||
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
|
||||
uint64_t lastTick = db->GetLatestSequenceNumber();
|
||||
state->Set(TRI_V8_ASCII_STRING("lastLogTick"),
|
||||
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, lastTick));
|
||||
state->Set(TRI_V8_ASCII_STRING("lastUncommittedLogTick"),
|
||||
TRI_V8UInt64String<TRI_voc_tick_t>(isolate, lastTick));
|
||||
state->Set(TRI_V8_ASCII_STRING("totalEvents"),
|
||||
v8::Number::New(isolate, static_cast<double>(0))); //s.numEvents + s.numEventsSync)));
|
||||
state->Set(TRI_V8_ASCII_STRING("time"), TRI_V8_STD_STRING(utilities::timeString()));
|
||||
} else {
|
||||
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid storage engine");
|
||||
return;
|
||||
}
|
||||
|
||||
v8::Handle<v8::Object> result = v8::Object::New(isolate);
|
||||
result->Set(TRI_V8_ASCII_STRING("state"), state);
|
||||
|
||||
v8::Handle<v8::Object> server = v8::Object::New(isolate);
|
||||
|
@ -91,6 +116,12 @@ static void JS_TickRangesLoggerReplication(
|
|||
TRI_V8_TRY_CATCH_BEGIN(isolate);
|
||||
v8::HandleScope scope(isolate);
|
||||
|
||||
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
|
||||
if( engineName != "mmfiles"){
|
||||
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "only implemented for mmfiles engine");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
|
||||
|
||||
v8::Handle<v8::Array> result = v8::Array::New(isolate, (int)ranges.size());
|
||||
|
@ -121,6 +152,12 @@ static void JS_FirstTickLoggerReplication(
|
|||
TRI_V8_TRY_CATCH_BEGIN(isolate);
|
||||
v8::HandleScope scope(isolate);
|
||||
|
||||
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
|
||||
if( engineName != "mmfiles"){
|
||||
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "only implemented for mmfiles engine");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
|
||||
|
||||
TRI_voc_tick_t tick = UINT64_MAX;
|
||||
|
@ -152,6 +189,12 @@ static void JS_LastLoggerReplication(
|
|||
TRI_V8_TRY_CATCH_BEGIN(isolate);
|
||||
v8::HandleScope scope(isolate);
|
||||
|
||||
std::string engineName = EngineSelectorFeature::ENGINE->typeName();
|
||||
if( engineName != "mmfiles"){
|
||||
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "only implemented for mmfiles engine");
|
||||
return;
|
||||
}
|
||||
|
||||
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
|
||||
|
||||
if (vocbase == nullptr) {
|
||||
|
|
|
@ -134,6 +134,9 @@ class PhysicalCollection {
|
|||
virtual void truncate(transaction::Methods* trx,
|
||||
OperationOptions& options) = 0;
|
||||
|
||||
virtual DocumentIdentifierToken lookupKey(
|
||||
transaction::Methods*, arangodb::velocypack::Slice const&) = 0;
|
||||
|
||||
virtual int read(transaction::Methods*, arangodb::velocypack::Slice const key,
|
||||
ManagedDocumentResult& result, bool) = 0;
|
||||
|
||||
|
|
|
@ -74,9 +74,9 @@ macro (install_readme input output)
|
|||
if (MSVC)
|
||||
set(CRLFSTYLE "CRLF")
|
||||
endif ()
|
||||
configure_file(${PROJECT_SOURCE_DIR}/${input} "${PROJECT_BINARY_DIR}/${input}" NEWLINE_STYLE ${CRLFSTYLE})
|
||||
configure_file(${PROJECT_SOURCE_DIR}/${input} "${PROJECT_BINARY_DIR}/${output}" NEWLINE_STYLE ${CRLFSTYLE})
|
||||
install(
|
||||
FILES "${PROJECT_BINARY_DIR}/${input}"
|
||||
FILES "${PROJECT_BINARY_DIR}/${output}"
|
||||
DESTINATION "${where}"
|
||||
)
|
||||
endmacro ()
|
||||
|
|
|
@ -7,7 +7,6 @@ endif()
|
|||
if(SNAPCRAFT_FOUND)
|
||||
set(SNAPCRAFT_TEMPLATE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/Installation/Ubuntu")
|
||||
set(SNAPCRAFT_SOURCE_DIR "${CMAKE_BINARY_DIR}/_CPack_Packages/SNAP")
|
||||
set(CPACK_SNAP_PACKAGE_FILE_NAME "${CPACK_PACKAGE_NAME}_${CPACK_PACKAGE_VERSION}_${ARANGODB_PACKAGE_ARCHITECTURE}.snap")
|
||||
|
||||
message(STATUS "Creating snap package")
|
||||
|
||||
|
@ -31,7 +30,12 @@ if(SNAPCRAFT_FOUND)
|
|||
file(
|
||||
COPY "${SNAPCRAFT_TEMPLATE_DIR}/arangodb.png"
|
||||
DESTINATION "${SNAPCRAFT_SOURCE_DIR}/"
|
||||
)
|
||||
)
|
||||
|
||||
set(CPACK_SNAP_PACKAGE_FILE_NAME "${CPACK_PACKAGE_NAME}_${CPACK_PACKAGE_VERSION}_${ARANGODB_PACKAGE_ARCHITECTURE}.snap")
|
||||
if(NOT EXISTS ${CPACK_SNAP_PACKAGE_FILE_NAME})
|
||||
set(CPACK_SNAP_PACKAGE_FILE_NAME "${CPACK_PACKAGE_NAME}_${CPACK_PACKAGE_VERSION}-${ARANGODB_PACKAGE_REVISION}_${ARANGODB_PACKAGE_ARCHITECTURE}.snap")
|
||||
endif()
|
||||
add_custom_target(snap
|
||||
COMMENT "create snap-package"
|
||||
COMMAND ${SNAP_EXE} snap
|
||||
|
|
|
@ -608,13 +608,14 @@
|
|||
this.waitForInit(this.documents.bind(this), colid, pageid);
|
||||
return;
|
||||
}
|
||||
if (!this.documentsView) {
|
||||
this.documentsView = new window.DocumentsView({
|
||||
collection: new window.ArangoDocuments(),
|
||||
documentStore: this.arangoDocumentStore,
|
||||
collectionsStore: this.arangoCollectionsStore
|
||||
});
|
||||
if (this.documentsView) {
|
||||
this.documentsView.removeView();
|
||||
}
|
||||
this.documentsView = new window.DocumentsView({
|
||||
collection: new window.ArangoDocuments(),
|
||||
documentStore: this.arangoDocumentStore,
|
||||
collectionsStore: this.arangoCollectionsStore
|
||||
});
|
||||
this.documentsView.setCollectionId(colid, pageid);
|
||||
this.documentsView.render();
|
||||
},
|
||||
|
|
|
@ -30,6 +30,14 @@
|
|||
next: null
|
||||
},
|
||||
|
||||
removeView: function () {
|
||||
this.$el.empty().off(); /* off to unbind the events */
|
||||
this.stopListening();
|
||||
this.unbind();
|
||||
delete this.el;
|
||||
return this;
|
||||
},
|
||||
|
||||
editButtons: ['#deleteSelected', '#moveSelected'],
|
||||
|
||||
initialize: function (options) {
|
||||
|
|
|
@ -44,22 +44,52 @@
|
|||
$('.login-window #databases').show();
|
||||
|
||||
$.ajax(url).success(function (permissions) {
|
||||
// enable db select and login button
|
||||
$('#loginDatabase').html('');
|
||||
// fill select with allowed dbs
|
||||
_.each(permissions.result, function (rule, db) {
|
||||
if (errCallback) {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + db + '</option>'
|
||||
);
|
||||
} else {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + rule + '</option>'
|
||||
);
|
||||
}
|
||||
});
|
||||
var successFunc = function (availableDbs) {
|
||||
// enable db select and login button
|
||||
$('#loginDatabase').html('');
|
||||
// fill select with allowed dbs
|
||||
_.each(permissions.result, function (rule, db) {
|
||||
if (errCallback) {
|
||||
if (availableDbs) {
|
||||
if (availableDbs.indexOf(db) > -1) {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + db + '</option>'
|
||||
);
|
||||
}
|
||||
} else {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + db + '</option>'
|
||||
);
|
||||
}
|
||||
} else {
|
||||
if (availableDbs) {
|
||||
if (availableDbs.indexOf(db) > -1) {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + rule + '</option>'
|
||||
);
|
||||
}
|
||||
} else {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + rule + '</option>'
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
self.renderDBS();
|
||||
self.renderDBS();
|
||||
};
|
||||
|
||||
// fetch available dbs
|
||||
var availableDbs;
|
||||
try {
|
||||
$.ajax(arangoHelper.databaseUrl('/_api/database/user')).success(function (dbs) {
|
||||
availableDbs = dbs.result;
|
||||
successFunc(availableDbs);
|
||||
});
|
||||
} catch (ignore) {
|
||||
console.log(ignore);
|
||||
successFunc();
|
||||
}
|
||||
}).error(function () {
|
||||
if (errCallback) {
|
||||
errCallback();
|
||||
|
@ -191,14 +221,35 @@
|
|||
// enable db select and login button
|
||||
$('#loginDatabase').html('');
|
||||
|
||||
// fill select with allowed dbs
|
||||
_.each(permissions.result, function (db, key) {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + key + '</option>'
|
||||
);
|
||||
});
|
||||
var successFunc = function (availableDbs) {
|
||||
if (availableDbs) {
|
||||
_.each(permissions.result, function (db, key) {
|
||||
if (availableDbs.indexOf(key) > -1) {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + key + '</option>'
|
||||
);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// fill select with allowed dbs
|
||||
_.each(permissions.result, function (db, key) {
|
||||
$('#loginDatabase').append(
|
||||
'<option>' + key + '</option>'
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
self.renderDBS();
|
||||
self.renderDBS();
|
||||
};
|
||||
|
||||
// fetch available dbs
|
||||
try {
|
||||
$.ajax(arangoHelper.databaseUrl('/_api/database/user')).success(function (dbs) {
|
||||
successFunc(dbs.result);
|
||||
});
|
||||
} catch (ignore) {
|
||||
successFunc();
|
||||
}
|
||||
}).error(function () {
|
||||
$('.wrong-credentials').show();
|
||||
});
|
||||
|
|
|
@ -42,7 +42,7 @@ describe('HTTP headers in Foxx services', function () {
|
|||
expect(result.headers['x-foobar']).to.equal('baz');
|
||||
expect(result.headers['x-nofoobar']).to.equal('baz');
|
||||
const irrelevantHeaders = ['http/1.1', 'connection', 'content-type'];
|
||||
expect(result.headers['access-control-expose-headers']).to.equal(Object.keys(result.headers).filter(x => !x.startsWith('access-control-') && !irrelevantHeaders.includes(x)).sort().join(', '));
|
||||
expect(result.headers['access-control-expose-headers']).to.equal(Object.keys(result.headers).filter(x => !x.startsWith('x-content-type-options') && !x.startsWith('access-control-') && !irrelevantHeaders.includes(x)).sort().join(', '));
|
||||
expect(result.headers['access-control-allow-credentials']).to.equal('true');
|
||||
});
|
||||
|
||||
|
@ -68,7 +68,7 @@ describe('HTTP headers in Foxx services', function () {
|
|||
var opts = { headers: { origin }, method: "POST" };
|
||||
var result = internal.download(origin + "/unittest/headers/header-empty", "", opts);
|
||||
const irrelevantHeaders = ['http/1.1', 'connection', 'content-type'];
|
||||
expect(result.headers['access-control-expose-headers']).to.equal(Object.keys(result.headers).filter(x => !x.startsWith('access-control-') && !irrelevantHeaders.includes(x)).sort().join(', '));
|
||||
expect(result.headers['access-control-expose-headers']).to.equal(Object.keys(result.headers).filter(x => !x.startsWith('x-content-type-options') && !x.startsWith('access-control-') && !irrelevantHeaders.includes(x)).sort().join(', '));
|
||||
expect(result.headers['access-control-allow-credentials']).to.equal('true');
|
||||
});
|
||||
});
|
||||
|
|
|
@ -168,6 +168,9 @@ function request (req) {
|
|||
} else {
|
||||
options.maxRedirects = 10;
|
||||
}
|
||||
if (req.sslProtocol) {
|
||||
options.sslProtocol = req.sslProtocol;
|
||||
}
|
||||
let result = internal.download(path, body, options);
|
||||
|
||||
return new Response(result, req.encoding, req.json);
|
||||
|
|
|
@ -430,8 +430,7 @@ function ReplicationLoggerSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testLoggerSystemCollection : function () {
|
||||
db._drop("_unitfoxx");
|
||||
db._drop("_unittests");
|
||||
db._drop("_unittests", true);
|
||||
|
||||
var tick = getLastLogTick();
|
||||
|
||||
|
|
|
@ -183,7 +183,7 @@ function ReplicationLoggerSuite () {
|
|||
assertTrue(typeof tick === 'string');
|
||||
assertNotEqual("", state.time);
|
||||
assertMatch(/^\d+-\d+-\d+T\d+:\d+:\d+Z$/, state.time);
|
||||
|
||||
|
||||
// query the state again
|
||||
state = replication.logger.state().state;
|
||||
assertTrue(state.running);
|
||||
|
@ -430,8 +430,7 @@ function ReplicationLoggerSuite () {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testLoggerSystemCollection : function () {
|
||||
db._drop("_unitfoxx");
|
||||
db._drop("_unittests");
|
||||
db._drop("_unittests", true);
|
||||
|
||||
var tick = getLastLogTick();
|
||||
|
||||
|
|
|
@ -119,6 +119,28 @@ function optimizerIndexesTestSuite () {
|
|||
assertEqual(0, results.stats.scannedIndex);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test _id
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testUsePrimaryIdInAttributeAccess : function () {
|
||||
var values = [ "UnitTestsCollection/test1", "UnitTestsCollection/test2", "UnitTestsCollection/test21", "UnitTestsCollection/test30" ];
|
||||
var query = "LET data = NOOPT({ ids : " + JSON.stringify(values) + " }) FOR i IN " + c.name() + " FILTER i._id IN data.ids RETURN i.value";
|
||||
|
||||
var plan = AQL_EXPLAIN(query).plan;
|
||||
var nodeTypes = plan.nodes.map(function(node) {
|
||||
return node.type;
|
||||
});
|
||||
|
||||
assertEqual("SingletonNode", nodeTypes[0], query);
|
||||
assertNotEqual(-1, nodeTypes.indexOf("IndexNode"), query);
|
||||
|
||||
var results = AQL_EXECUTE(query);
|
||||
assertEqual([ 1, 2, 21, 30 ], results.json.sort(), query);
|
||||
assertEqual(0, results.stats.scannedFull);
|
||||
assertEqual(4, results.stats.scannedIndex);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test _id
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -205,6 +227,28 @@ function optimizerIndexesTestSuite () {
|
|||
assertEqual(0, results.stats.scannedIndex);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test _id
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testUsePrimaryKeyInAttributeAccess : function () {
|
||||
var values = [ "test1", "test2", "test21", "test30" ];
|
||||
var query = "LET data = NOOPT({ ids : " + JSON.stringify(values) + " }) FOR i IN " + c.name() + " FILTER i._key IN data.ids RETURN i.value";
|
||||
|
||||
var plan = AQL_EXPLAIN(query).plan;
|
||||
var nodeTypes = plan.nodes.map(function(node) {
|
||||
return node.type;
|
||||
});
|
||||
|
||||
assertEqual("SingletonNode", nodeTypes[0], query);
|
||||
assertNotEqual(-1, nodeTypes.indexOf("IndexNode"), query);
|
||||
|
||||
var results = AQL_EXECUTE(query);
|
||||
assertEqual([ 1, 2, 21, 30 ], results.json.sort(), query);
|
||||
assertEqual(0, results.stats.scannedFull);
|
||||
assertEqual(4, results.stats.scannedIndex);
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test _key
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2538,7 +2582,18 @@ function optimizerIndexesTestSuite () {
|
|||
[ "FOR i IN " + c.name() + " FILTER i._key IN ['test23', 'test42'] RETURN i._key", [ 'test23', 'test42' ] ],
|
||||
[ "LET a = PASSTHRU([]) FOR i IN " + c.name() + " FILTER i._key IN a RETURN i._key", [ ] ],
|
||||
[ "LET a = PASSTHRU(['test23']) FOR i IN " + c.name() + " FILTER i._key IN a RETURN i._key", [ 'test23' ] ],
|
||||
[ "LET a = PASSTHRU(['test23', 'test42']) FOR i IN " + c.name() + " FILTER i._key IN a RETURN i._key", [ 'test23', 'test42' ] ]
|
||||
[ "LET a = PASSTHRU(['test23', 'test42']) FOR i IN " + c.name() + " FILTER i._key IN a RETURN i._key", [ 'test23', 'test42' ] ],
|
||||
[ "LET a = PASSTHRU({ ids: ['test23', 'test42'] }) FOR i IN " + c.name() + " FILTER i._key IN a.ids RETURN i._key", [ 'test23', 'test42' ] ],
|
||||
[ "LET a = PASSTHRU({ ids: [23, 42] }) FOR i IN " + c.name() + " FILTER i.value2 IN a.ids RETURN i.value2", [ 23, 42 ] ],
|
||||
[ "LET a = PASSTHRU({ ids: [23, 42] }) FOR i IN " + c.name() + " FILTER i.value3 IN a.ids RETURN i.value2", [ 23, 42 ] ],
|
||||
|
||||
// non-arrays. should not fail but return no results
|
||||
[ "LET a = PASSTHRU({}) FOR i IN " + c.name() + " FILTER i._key IN a RETURN i._key", [ ] ],
|
||||
[ "LET a = PASSTHRU({}) FOR i IN " + c.name() + " FILTER i.value2 IN a RETURN i.value2", [ ] ],
|
||||
[ "LET a = PASSTHRU({}) FOR i IN " + c.name() + " FILTER i.value3 IN a RETURN i.value2", [ ] ]
|
||||
|
||||
|
||||
|
||||
];
|
||||
|
||||
queries.forEach(function(query) {
|
||||
|
|
|
@ -241,8 +241,7 @@ function dumpTestSuite () {
|
|||
|
||||
assertEqual(2, c.type()); // document
|
||||
assertFalse(p.waitForSync);
|
||||
assertTrue(p.isVolatile);
|
||||
|
||||
|
||||
assertEqual(1, c.getIndexes().length); // just primary index
|
||||
assertEqual("primary", c.getIndexes()[0].type);
|
||||
assertEqual(0, c.count());
|
||||
|
|
|
@ -41,7 +41,7 @@ var slaveEndpoint = ARGUMENTS[0];
|
|||
|
||||
var mmfilesEngine = false;
|
||||
if (db._engine().name === "mmfiles") {
|
||||
mmfilesEngine = true
|
||||
mmfilesEngine = true;
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test suite
|
||||
|
|
|
@ -54,10 +54,14 @@ RocksDBOptionFeature::RocksDBOptionFeature(
|
|||
_maxBackgroundCompactions(rocksDBDefaults.max_background_compactions),
|
||||
_maxLogFileSize(rocksDBDefaults.max_log_file_size),
|
||||
_keepLogFileNum(rocksDBDefaults.keep_log_file_num),
|
||||
_recycleLogFileNum(rocksDBDefaults.recycle_log_file_num),
|
||||
_logFileTimeToRoll(rocksDBDefaults.log_file_time_to_roll),
|
||||
_compactionReadaheadSize(rocksDBDefaults.compaction_readahead_size),
|
||||
_verifyChecksumsInCompaction(rocksDBDefaults.verify_checksums_in_compaction),
|
||||
_optimizeFiltersForHits(rocksDBDefaults.optimize_filters_for_hits) {
|
||||
_optimizeFiltersForHits(rocksDBDefaults.optimize_filters_for_hits),
|
||||
_useDirectReads(rocksDBDefaults.use_direct_reads),
|
||||
_useDirectWrites(rocksDBDefaults.use_direct_writes),
|
||||
_skipCorrupted(false) {
|
||||
setOptional(true);
|
||||
requiresElevatedPrivileges(false);
|
||||
startsAfter("DatabasePath");
|
||||
|
@ -108,13 +112,13 @@ void RocksDBOptionFeature::collectOptions(std::shared_ptr<ProgramOptions> option
|
|||
"control maximum total data size for a level",
|
||||
new UInt64Parameter(&_maxBytesForLevelMultiplier));
|
||||
|
||||
options->addOption(
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.verify-checksums-in-compation",
|
||||
"if true, compaction will verify checksum on every read that happens "
|
||||
"as part of compaction",
|
||||
new BooleanParameter(&_verifyChecksumsInCompaction));
|
||||
|
||||
options->addOption(
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.optimize-filters-for-hits",
|
||||
"this flag specifies that the implementation should optimize the filters "
|
||||
"mainly for cases where keys are found rather than also optimize for keys "
|
||||
|
@ -123,39 +127,60 @@ void RocksDBOptionFeature::collectOptions(std::shared_ptr<ProgramOptions> option
|
|||
"important",
|
||||
new BooleanParameter(&_optimizeFiltersForHits));
|
||||
|
||||
options->addOption(
|
||||
#ifdef __linux__
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.use-direct-reads",
|
||||
"use O_DIRECT for reading files",
|
||||
new BooleanParameter(&_useDirectReads));
|
||||
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.use-direct-writes",
|
||||
"use O_DIRECT for writing files",
|
||||
new BooleanParameter(&_useDirectWrites));
|
||||
#endif
|
||||
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.base-background-compactions",
|
||||
"suggested number of concurrent background compaction jobs",
|
||||
new UInt64Parameter(&_baseBackgroundCompactions));
|
||||
|
||||
options->addOption(
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.max-background-compactions",
|
||||
"maximum number of concurrent background compaction jobs",
|
||||
new UInt64Parameter(&_maxBackgroundCompactions));
|
||||
|
||||
options->addOption(
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.max-log-file-size",
|
||||
"specify the maximal size of the info log file",
|
||||
new UInt64Parameter(&_maxLogFileSize));
|
||||
|
||||
options->addOption(
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.keep-log-file-num",
|
||||
"maximal info log files to be kept",
|
||||
new UInt64Parameter(&_keepLogFileNum));
|
||||
|
||||
options->addOption(
|
||||
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.recycle-log-file-num",
|
||||
"number of log files to keep around for recycling",
|
||||
new UInt64Parameter(&_recycleLogFileNum));
|
||||
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.log-file-time-to-roll",
|
||||
"time for the info log file to roll (in seconds). "
|
||||
"If specified with non-zero value, log file will be rolled "
|
||||
"if it has been active longer than `log_file_time_to_roll`",
|
||||
new UInt64Parameter(&_logFileTimeToRoll));
|
||||
|
||||
options->addOption(
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.compaction-read-ahead-size",
|
||||
"if non-zero, we perform bigger reads when doing compaction. If you're "
|
||||
"running RocksDB on spinning disks, you should set this to at least 2MB. "
|
||||
"that way RocksDB's compaction is doing sequential instead of random reads.",
|
||||
new UInt64Parameter(&_compactionReadaheadSize));
|
||||
|
||||
options->addHiddenOption("--rocksdb.wal-recovery-skip-corrupted",
|
||||
"skip corrupted records in WAL recovery",
|
||||
new BooleanParameter(&_skipCorrupted));
|
||||
}
|
||||
|
||||
void RocksDBOptionFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
||||
|
|
|
@ -58,10 +58,14 @@ class RocksDBOptionFeature final : public application_features::ApplicationFeatu
|
|||
uint64_t _maxBackgroundCompactions;
|
||||
uint64_t _maxLogFileSize;
|
||||
uint64_t _keepLogFileNum;
|
||||
uint64_t _recycleLogFileNum;
|
||||
uint64_t _logFileTimeToRoll;
|
||||
uint64_t _compactionReadaheadSize;
|
||||
bool _verifyChecksumsInCompaction;
|
||||
bool _optimizeFiltersForHits;
|
||||
bool _useDirectReads;
|
||||
bool _useDirectWrites;
|
||||
bool _skipCorrupted;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -104,11 +104,13 @@ std::string const StaticStrings::HLCHeader("x-arango-hlc");
|
|||
std::string const StaticStrings::KeepAlive("Keep-Alive");
|
||||
std::string const StaticStrings::Location("location");
|
||||
std::string const StaticStrings::MultiPartContentType("multipart/form-data");
|
||||
std::string const StaticStrings::NoSniff("nosniff");
|
||||
std::string const StaticStrings::Origin("origin");
|
||||
std::string const StaticStrings::Queue("x-arango-queue");
|
||||
std::string const StaticStrings::Server("server");
|
||||
std::string const StaticStrings::StartThread("x-arango-start-thread");
|
||||
std::string const StaticStrings::WwwAuthenticate("www-authenticate");
|
||||
std::string const StaticStrings::WwwAuthenticate("www-authenticate");
|
||||
std::string const StaticStrings::XContentTypeOptions("x-content-type-options");
|
||||
|
||||
// mime types
|
||||
std::string const StaticStrings::MimeTypeJson(
|
||||
|
|
|
@ -100,11 +100,13 @@ class StaticStrings {
|
|||
static std::string const KeepAlive;
|
||||
static std::string const Location;
|
||||
static std::string const MultiPartContentType;
|
||||
static std::string const NoSniff;
|
||||
static std::string const Origin;
|
||||
static std::string const Queue;
|
||||
static std::string const Server;
|
||||
static std::string const StartThread;
|
||||
static std::string const WwwAuthenticate;
|
||||
static std::string const XContentTypeOptions;
|
||||
|
||||
// mime types
|
||||
static std::string const MimeTypeJson;
|
||||
|
|
Loading…
Reference in New Issue