mirror of https://gitee.com/bigwinds/arangodb
Bug fix/planning issue 514 replication api (#3021)
* add "cluster selectivity estimates" to CHANGELOG * add some documentation to RocksDBRestReplicationHandler * fix building with relative paths * add some more doc * add some tests for the replication api * fix RocksDBRestReplicationHandler and add tests * update documentation * remove obsolete parameter * fix error message * Implementing logger-first-tick, logger-tick-ranges. Fixing dump `chunkSize` documentation
This commit is contained in:
parent
5708fc8fab
commit
df76bbc690
|
@ -210,9 +210,3 @@ in a batch part will be ignored.
|
|||
|
||||
|
||||
@startDocuBlock JSF_batch_processing
|
||||
|
||||
@startDocuBlock JSF_delete_batch_replication
|
||||
|
||||
@startDocuBlock JSF_post_batch_replication
|
||||
|
||||
@startDocuBlock JSF_put_batch_replication
|
||||
|
|
|
@ -10,6 +10,17 @@ or the incremental data synchronization.
|
|||
<!-- arangod/RestHandler/RestReplicationHandler.cpp -->
|
||||
@startDocuBlock JSF_put_api_replication_inventory
|
||||
|
||||
|
||||
The *batch* method will create a snapshot of the current state that then can be
|
||||
dumped. A batchId is required when using the dump api with rocksdb.
|
||||
|
||||
@startDocuBlock JSF_post_batch_replication
|
||||
|
||||
@startDocuBlock JSF_delete_batch_replication
|
||||
|
||||
@startDocuBlock JSF_put_batch_replication
|
||||
|
||||
|
||||
The *dump* method can be used to fetch data from a specific collection. As the
|
||||
results of the dump command can be huge, *dump* may not return all data from a collection
|
||||
at once. Instead, the dump command may be called repeatedly by replication clients
|
||||
|
@ -32,4 +43,4 @@ parts of the dump results in the same order as they are provided.
|
|||
@startDocuBlock JSF_put_api_replication_synchronize
|
||||
|
||||
<!-- arangod/RestHandler/RestReplicationHandler.cpp -->
|
||||
@startDocuBlock JSF_get_api_replication_cluster_inventory
|
||||
@startDocuBlock JSF_get_api_replication_cluster_inventory
|
||||
|
|
|
@ -9,26 +9,25 @@
|
|||
@RESTQUERYPARAM{collection,string,required}
|
||||
The name or id of the collection to dump.
|
||||
|
||||
@RESTQUERYPARAM{chunkSize,number,optional} Approximate maximum size of the returned result.
|
||||
|
||||
@RESTQUERYPARAM{batchId,string,required}
|
||||
rocksdb only - The id of the snapshot to use
|
||||
|
||||
@RESTQUERYPARAM{from,number,optional}
|
||||
Lower bound tick value for results.
|
||||
mmfiles only - Lower bound tick value for results.
|
||||
|
||||
@RESTQUERYPARAM{to,number,optional}
|
||||
Upper bound tick value for results.
|
||||
|
||||
@RESTQUERYPARAM{chunkSize,number,optional}
|
||||
Approximate maximum size of the returned result.
|
||||
mmfiles only - Upper bound tick value for results.
|
||||
|
||||
@RESTQUERYPARAM{includeSystem,boolean,optional}
|
||||
Include system collections in the result. The default value is *true*.
|
||||
|
||||
@RESTQUERYPARAM{failOnUnknown,boolean,optional}
|
||||
Produce an error when dumped edges refer to now-unknown collections.
|
||||
mmfiles only - Include system collections in the result. The default value is *true*.
|
||||
|
||||
@RESTQUERYPARAM{ticks,boolean,optional}
|
||||
Whether or not to include tick values in the dump. The default value is *true*.
|
||||
mmfiles only - Whether or not to include tick values in the dump. The default value is *true*.
|
||||
|
||||
@RESTQUERYPARAM{flush,boolean,optional}
|
||||
Whether or not to flush the WAL before dumping. The default value is *true*.
|
||||
mmfiles only - Whether or not to flush the WAL before dumping. The default value is *true*.
|
||||
|
||||
@RESTDESCRIPTION
|
||||
Returns the data from the collection for the requested range.
|
||||
|
|
|
@ -25,7 +25,7 @@ It is an error if this attribute is not bound in the coordinator case.
|
|||
|
||||
@RESTRETURNCODES
|
||||
|
||||
@RESTRETURNCODE{204}
|
||||
@RESTRETURNCODE{200}
|
||||
is returned if the batch was created successfully.
|
||||
|
||||
@RESTRETURNCODE{400}
|
||||
|
|
|
@ -3435,7 +3435,7 @@ Result MMFilesEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
|
|||
return Result();
|
||||
}
|
||||
|
||||
Result MMFilesEngine::createTickRanges(VPackBuilder& builder){
|
||||
Result MMFilesEngine::createTickRanges(VPackBuilder& builder) {
|
||||
auto const& ranges = MMFilesLogfileManager::instance()->ranges();
|
||||
builder.openArray();
|
||||
for (auto& it : ranges) {
|
||||
|
|
|
@ -1691,6 +1691,7 @@ int RocksDBEngine::handleSyncKeys(arangodb::InitialSyncer& syncer,
|
|||
return handleSyncKeysRocksDB(syncer, col, keysId, cid, collectionName,
|
||||
maxTick, errorMsg);
|
||||
}
|
||||
|
||||
Result RocksDBEngine::createTickRanges(VPackBuilder& builder) {
|
||||
rocksdb::TransactionDB* tdb = rocksutils::globalRocksDB();
|
||||
rocksdb::VectorLogPtr walFiles;
|
||||
|
|
|
@ -215,8 +215,12 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
|
||||
arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
|
||||
uint64_t chunkSize) {
|
||||
Result rv;
|
||||
|
||||
TRI_ASSERT(_trx);
|
||||
TRI_ASSERT(_iter);
|
||||
if(!_iter){
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "the replication context interator has not been initialized");
|
||||
}
|
||||
|
||||
std::string lowKey;
|
||||
VPackSlice highKey; // FIXME: no good keeping this
|
||||
|
@ -253,15 +257,16 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
|
|||
b.close();
|
||||
lowKey.clear(); // reset string
|
||||
} catch (std::exception const&) {
|
||||
return Result(TRI_ERROR_INTERNAL);
|
||||
return rv.reset(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
b.close();
|
||||
// we will not call this method twice
|
||||
_iter->reset();
|
||||
_lastIteratorOffset = 0;
|
||||
|
||||
return Result();
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// dump all keys from collection
|
||||
|
@ -269,12 +274,23 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
|
|||
VPackBuilder& b, size_t chunk, size_t chunkSize,
|
||||
std::string const& lowKey) {
|
||||
TRI_ASSERT(_trx);
|
||||
TRI_ASSERT(_iter);
|
||||
|
||||
Result rv;
|
||||
|
||||
if(!_iter){
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "the replication context interator has not been initialized");
|
||||
}
|
||||
|
||||
RocksDBSortedAllIterator* primary =
|
||||
static_cast<RocksDBSortedAllIterator*>(_iter.get());
|
||||
|
||||
// Position the iterator correctly
|
||||
if (chunk != 0 && ((std::numeric_limits<std::size_t>::max() / chunk) < chunkSize)) {
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "It seems that your chunk / chunkSize combination is not valid - overflow");
|
||||
}
|
||||
|
||||
size_t from = chunk * chunkSize;
|
||||
|
||||
if (from != _lastIteratorOffset) {
|
||||
if (!lowKey.empty()) {
|
||||
primary->seek(StringRef(lowKey));
|
||||
|
@ -284,6 +300,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
|
|||
_iter->reset();
|
||||
_lastIteratorOffset = 0;
|
||||
}
|
||||
|
||||
if (from > _lastIteratorOffset) {
|
||||
TRI_ASSERT(from >= chunkSize);
|
||||
uint64_t diff = from - _lastIteratorOffset;
|
||||
|
@ -291,7 +308,11 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
|
|||
_iter->skip(diff, to);
|
||||
_lastIteratorOffset += to;
|
||||
}
|
||||
TRI_ASSERT(_lastIteratorOffset == from);
|
||||
|
||||
//TRI_ASSERT(_lastIteratorOffset == from);
|
||||
if(_lastIteratorOffset != from){
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "The parameters you provided lead to an invalid iterator offset.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -310,25 +331,36 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(
|
|||
_hasMore = primary->nextWithKey(cb, chunkSize);
|
||||
_lastIteratorOffset++;
|
||||
} catch (std::exception const&) {
|
||||
return Result(TRI_ERROR_INTERNAL);
|
||||
return rv.reset(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
b.close();
|
||||
|
||||
return Result();
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// dump keys and document
|
||||
arangodb::Result RocksDBReplicationContext::dumpDocuments(
|
||||
VPackBuilder& b, size_t chunk, size_t chunkSize, std::string const& lowKey,
|
||||
VPackSlice const& ids) {
|
||||
Result rv;
|
||||
|
||||
TRI_ASSERT(_trx);
|
||||
|
||||
if(!_iter){
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "the replication context interator has not been initialized");
|
||||
}
|
||||
|
||||
TRI_ASSERT(_iter);
|
||||
RocksDBSortedAllIterator* primary =
|
||||
static_cast<RocksDBSortedAllIterator*>(_iter.get());
|
||||
|
||||
// Position the iterator must be reset to the beginning
|
||||
// after calls to dumpKeys moved it forwards
|
||||
if (chunk != 0 && ((std::numeric_limits<std::size_t>::max() / chunk) < chunkSize)) {
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "It seems that your chunk / chunkSize combination is not valid - overflow");
|
||||
}
|
||||
size_t from = chunk * chunkSize;
|
||||
|
||||
if (from != _lastIteratorOffset) {
|
||||
if (!lowKey.empty()) {
|
||||
primary->seek(StringRef(lowKey));
|
||||
|
@ -346,7 +378,10 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(
|
|||
_lastIteratorOffset += to;
|
||||
TRI_ASSERT(to == diff);
|
||||
}
|
||||
TRI_ASSERT(_lastIteratorOffset == from);
|
||||
|
||||
if(_lastIteratorOffset != from){
|
||||
return rv.reset(TRI_ERROR_BAD_PARAMETER, "The parameters you provided lead to an invalid iterator offset.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ class RocksDBReplicationContext {
|
|||
RocksDBReplicationContext();
|
||||
~RocksDBReplicationContext();
|
||||
|
||||
TRI_voc_tick_t id() const;
|
||||
TRI_voc_tick_t id() const; //batchId
|
||||
uint64_t lastTick() const;
|
||||
uint64_t count() const;
|
||||
|
||||
|
@ -114,9 +114,9 @@ class RocksDBReplicationContext {
|
|||
arangodb::LogicalCollection const* r);
|
||||
|
||||
private:
|
||||
TRI_voc_tick_t _id;
|
||||
uint64_t _lastTick;
|
||||
uint64_t _currentTick;
|
||||
TRI_voc_tick_t _id; // batch id
|
||||
uint64_t _lastTick; // the time at which the snapshot was taken
|
||||
uint64_t _currentTick; // shows how often dump was called
|
||||
std::unique_ptr<transaction::Methods> _trx;
|
||||
LogicalCollection* _collection;
|
||||
std::unique_ptr<IndexIterator> _iter;
|
||||
|
@ -130,7 +130,7 @@ class RocksDBReplicationContext {
|
|||
double _expires;
|
||||
bool _isDeleted;
|
||||
bool _isUsed;
|
||||
bool _hasMore;
|
||||
bool _hasMore; //used during dump to check if there are more documents
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -83,6 +83,7 @@ RocksDBRestReplicationHandler::RocksDBRestReplicationHandler(
|
|||
|
||||
RocksDBRestReplicationHandler::~RocksDBRestReplicationHandler() {}
|
||||
|
||||
//main function that dispactes the diferent routes and commands
|
||||
RestStatus RocksDBRestReplicationHandler::execute() {
|
||||
// extract the request type
|
||||
auto const type = _request->requestType();
|
||||
|
@ -98,6 +99,22 @@ RestStatus RocksDBRestReplicationHandler::execute() {
|
|||
goto BAD_CALL;
|
||||
}
|
||||
handleCommandLoggerState();
|
||||
} else if (command == "logger-tick-ranges") {
|
||||
if (type != rest::RequestType::GET) {
|
||||
goto BAD_CALL;
|
||||
}
|
||||
if (isCoordinatorError()) {
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
handleCommandLoggerTickRanges();
|
||||
} else if (command == "logger-first-tick") {
|
||||
if (type != rest::RequestType::GET) {
|
||||
goto BAD_CALL;
|
||||
}
|
||||
if (isCoordinatorError()) {
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
handleCommandLoggerFirstTick();
|
||||
} else if (command == "logger-follow") {
|
||||
if (type != rest::RequestType::GET && type != rest::RequestType::PUT) {
|
||||
goto BAD_CALL;
|
||||
|
@ -112,12 +129,35 @@ RestStatus RocksDBRestReplicationHandler::execute() {
|
|||
}
|
||||
handleCommandDetermineOpenTransactions();
|
||||
} else if (command == "batch") {
|
||||
// access batch context in context manager
|
||||
// example call: curl -XPOST --dump - --data '{}' http://localhost:5555/_api/replication/batch
|
||||
// the object may contain a "ttl" for the context
|
||||
|
||||
// POST - create batch id / handle
|
||||
// PUT - extend batch id / handle
|
||||
// DEL - delete batchid
|
||||
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
handleTrampolineCoordinator();
|
||||
} else {
|
||||
handleCommandBatch();
|
||||
}
|
||||
} else if (command == "inventory") {
|
||||
// get overview of collections and idexes followed by some extra data
|
||||
// example call: curl --dump - http://localhost:5555/_api/replication/inventory?batchId=75
|
||||
|
||||
// {
|
||||
// collections : [ ... ],
|
||||
// "state" : {
|
||||
// "running" : true,
|
||||
// "lastLogTick" : "10528",
|
||||
// "lastUncommittedLogTick" : "10531",
|
||||
// "totalEvents" : 3782,
|
||||
// "time" : "2017-07-19T21:50:59Z"
|
||||
// },
|
||||
// "tick" : "10531"
|
||||
// }
|
||||
|
||||
if (type != rest::RequestType::GET) {
|
||||
goto BAD_CALL;
|
||||
}
|
||||
|
@ -127,7 +167,9 @@ RestStatus RocksDBRestReplicationHandler::execute() {
|
|||
handleCommandInventory();
|
||||
}
|
||||
} else if (command == "keys") {
|
||||
if (type != rest::RequestType::GET && type != rest::RequestType::POST &&
|
||||
// preconditions for calling this route are unclear and undocumented -- FIXME
|
||||
if (type != rest::RequestType::GET &&
|
||||
type != rest::RequestType::POST &&
|
||||
type != rest::RequestType::PUT &&
|
||||
type != rest::RequestType::DELETE_REQ) {
|
||||
goto BAD_CALL;
|
||||
|
@ -138,8 +180,16 @@ RestStatus RocksDBRestReplicationHandler::execute() {
|
|||
}
|
||||
|
||||
if (type == rest::RequestType::POST) {
|
||||
// has to be called first will bind the iterator to a collection
|
||||
|
||||
// xample: curl -XPOST --dump - 'http://localhost:5555/_db/_system/_api/replication/keys/?collection=_users&batchId=169' ; echo
|
||||
// returns
|
||||
// { "id": <context id - int>,
|
||||
// "count": <number of documents in collection - int>
|
||||
// }
|
||||
handleCommandCreateKeys();
|
||||
} else if (type == rest::RequestType::GET) {
|
||||
// curl --dump - 'http://localhost:5555/_db/_system/_api/replication/keys/123?collection=_users' ; echo # id is batchid
|
||||
handleCommandGetKeys();
|
||||
} else if (type == rest::RequestType::PUT) {
|
||||
handleCommandFetchKeys();
|
||||
|
@ -147,6 +197,14 @@ RestStatus RocksDBRestReplicationHandler::execute() {
|
|||
handleCommandRemoveKeys();
|
||||
}
|
||||
} else if (command == "dump") {
|
||||
// works on collections
|
||||
// example: curl --dump - 'http://localhost:5555/_db/_system/_api/replication/dump?collection=test&batchId=115'
|
||||
// requires batch-id
|
||||
// does internally an
|
||||
// - get inventory
|
||||
// - purge local
|
||||
// - dump remote to local
|
||||
|
||||
if (type != rest::RequestType::GET) {
|
||||
goto BAD_CALL;
|
||||
}
|
||||
|
@ -320,6 +378,47 @@ void RocksDBRestReplicationHandler::handleCommandLoggerState() {
|
|||
generateResult(rest::ResponseCode::OK, builder.slice());
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the available logfile range
|
||||
/// @route GET logger-tick-ranges
|
||||
/// @caller js/client/modules/@arangodb/replication.js
|
||||
/// @response VPackArray, containing info about each datafile
|
||||
/// * filename
|
||||
/// * status
|
||||
/// * tickMin - tickMax
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
void RocksDBRestReplicationHandler::handleCommandLoggerTickRanges() {
|
||||
VPackBuilder b;
|
||||
Result res = globalRocksEngine()->createTickRanges(b);
|
||||
if (res.ok()) {
|
||||
generateResult(rest::ResponseCode::OK, b.slice());
|
||||
} else {
|
||||
generateError(res);
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the first tick available in a logfile
|
||||
/// @route GET logger-first-tick
|
||||
/// @caller js/client/modules/@arangodb/replication.js
|
||||
/// @response VPackObject with minTick of LogfileManager->ranges()
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
void RocksDBRestReplicationHandler::handleCommandLoggerFirstTick() {
|
||||
TRI_voc_tick_t tick = UINT64_MAX;
|
||||
Result res = EngineSelectorFeature::ENGINE->firstTick(tick);
|
||||
|
||||
VPackBuilder b;
|
||||
b.add(VPackValue(VPackValueType::Object));
|
||||
if (tick == UINT64_MAX || res.fail()) {
|
||||
b.add("firstTick", VPackValue(VPackValueType::Null));
|
||||
} else {
|
||||
auto tickString = std::to_string(tick);
|
||||
b.add("firstTick", VPackValue(tickString));
|
||||
}
|
||||
b.close();
|
||||
generateResult(rest::ResponseCode::OK, b.slice());
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief was docuBlock JSF_delete_batch_replication
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -342,14 +441,18 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
|
|||
return;
|
||||
}
|
||||
|
||||
// extract ttl
|
||||
// double expires =
|
||||
// VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
|
||||
RocksDBReplicationContext* ctx = _manager->createContext();
|
||||
if (ctx == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
"unable to create replication context");
|
||||
}
|
||||
|
||||
// extract ttl
|
||||
if (input->slice().hasKey("ttl")){
|
||||
double ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", RocksDBReplicationContext::DefaultTTL);
|
||||
ctx->use(ttl);
|
||||
}
|
||||
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
ctx->bind(_vocbase); // create transaction+snapshot
|
||||
|
||||
|
@ -389,8 +492,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
|
|||
}
|
||||
|
||||
// extract ttl
|
||||
double expires =
|
||||
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
|
||||
double expires = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
|
||||
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
bool busy;
|
||||
|
@ -797,8 +899,12 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
|
|||
}
|
||||
|
||||
RocksDBReplicationContext* ctx = nullptr;
|
||||
|
||||
//get batchId from url parameters
|
||||
bool found, busy;
|
||||
std::string batchId = _request->value("batchId", found);
|
||||
|
||||
// find context
|
||||
if (found) {
|
||||
ctx = _manager->find(StringUtils::uint64(batchId), busy);
|
||||
}
|
||||
|
@ -809,6 +915,8 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
|
|||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
||||
// to is ignored because the snapshot time is the latest point in time
|
||||
|
||||
// TRI_voc_tick_t tickEnd = UINT64_MAX;
|
||||
// determine end tick for keys
|
||||
// std::string const& value = _request->value("to", found);
|
||||
|
@ -820,6 +928,7 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
|
|||
// arangodb::CollectionGuard guard(_vocbase, c->cid(), false);
|
||||
// arangodb::LogicalCollection* col = guard.collection();
|
||||
|
||||
// bind collection to context - will initialize iterator
|
||||
int res = ctx->bindCollection(collection);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND,
|
||||
|
@ -864,15 +973,25 @@ void RocksDBRestReplicationHandler::handleCommandGetKeys() {
|
|||
}
|
||||
}
|
||||
|
||||
//first suffix needs to be the batch id
|
||||
std::string const& id = suffixes[1];
|
||||
uint64_t batchId = arangodb::basics::StringUtils::uint64(id);
|
||||
|
||||
// get context
|
||||
bool busy;
|
||||
RocksDBReplicationContext* ctx = _manager->find(batchId, busy);
|
||||
if (busy || ctx == nullptr) {
|
||||
if (ctx == nullptr) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"batchId not specified");
|
||||
"batchId not specified, expired or invalid in another way");
|
||||
return;
|
||||
}
|
||||
if (busy) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"RequestContext is busy");
|
||||
return;
|
||||
}
|
||||
|
||||
//lock context
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
||||
VPackBuilder b;
|
||||
|
@ -936,9 +1055,15 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
|||
uint64_t batchId = arangodb::basics::StringUtils::uint64(id);
|
||||
bool busy;
|
||||
RocksDBReplicationContext* ctx = _manager->find(batchId, busy);
|
||||
if (busy || ctx == nullptr) {
|
||||
if (ctx == nullptr) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"batchId not specified");
|
||||
"batchId not specified or not found");
|
||||
return;
|
||||
}
|
||||
|
||||
if (busy) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
|
||||
"batch is busy");
|
||||
return;
|
||||
}
|
||||
RocksDBReplicationContextGuard(_manager, ctx);
|
||||
|
@ -948,7 +1073,11 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
|||
|
||||
VPackBuilder resultBuilder(transactionContext->getVPackOptions());
|
||||
if (keys) {
|
||||
ctx->dumpKeys(resultBuilder, chunk, static_cast<size_t>(chunkSize), lowKey);
|
||||
Result rv = ctx->dumpKeys(resultBuilder, chunk, static_cast<size_t>(chunkSize), lowKey);
|
||||
if (rv.fail()){
|
||||
generateError(rv);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
bool success;
|
||||
std::shared_ptr<VPackBuilder> parsedIds = parseVelocyPackBody(success);
|
||||
|
@ -956,8 +1085,11 @@ void RocksDBRestReplicationHandler::handleCommandFetchKeys() {
|
|||
generateResult(rest::ResponseCode::BAD, VPackSlice());
|
||||
return;
|
||||
}
|
||||
ctx->dumpDocuments(resultBuilder, chunk, static_cast<size_t>(chunkSize),
|
||||
lowKey, parsedIds->slice());
|
||||
Result rv = ctx->dumpDocuments(resultBuilder, chunk, static_cast<size_t>(chunkSize), lowKey, parsedIds->slice());
|
||||
if (rv.fail()){
|
||||
generateError(rv);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
generateResult(rest::ResponseCode::OK, resultBuilder.slice(),
|
||||
|
@ -1012,9 +1144,15 @@ void RocksDBRestReplicationHandler::handleCommandRemoveKeys() {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void RocksDBRestReplicationHandler::handleCommandDump() {
|
||||
LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "enter handleCommandDump";
|
||||
|
||||
bool found = false;
|
||||
uint64_t contextId = 0;
|
||||
|
||||
|
||||
// contains dump options that might need to be inspected
|
||||
// VPackSlice options = _request->payload();
|
||||
|
||||
// get collection Name
|
||||
std::string const& collection = _request->value("collection");
|
||||
if (collection.empty()) {
|
||||
|
@ -1037,12 +1175,20 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
|
|||
bool isBusy = false;
|
||||
RocksDBReplicationContext* context = _manager->find(contextId, isBusy);
|
||||
RocksDBReplicationContextGuard(_manager, context);
|
||||
if (context == nullptr || isBusy) {
|
||||
|
||||
if (context == nullptr) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"replication dump - unable to acquire context");
|
||||
"replication dump - unable to find context (it could be expired)");
|
||||
return;
|
||||
}
|
||||
|
||||
if (isBusy) {
|
||||
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"replication dump - context is busy");
|
||||
return;
|
||||
}
|
||||
|
||||
// check for 28 compatibility
|
||||
bool compat28 = false;
|
||||
std::string const& value8 = _request->value("compat28", found);
|
||||
|
||||
|
@ -1055,6 +1201,7 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
|
|||
<< "requested collection dump for collection '" << collection
|
||||
<< "' using contextId '" << context->id() << "'";
|
||||
|
||||
|
||||
// TODO needs to generalized || velocypacks needs to support multiple slices
|
||||
// per response!
|
||||
auto response = dynamic_cast<HttpResponse*>(_response.get());
|
||||
|
@ -1064,6 +1211,7 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
|
|||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type");
|
||||
}
|
||||
|
||||
// do the work!
|
||||
auto result =
|
||||
context->dump(_vocbase, collection, dump, determineChunkSize(), compat28);
|
||||
|
||||
|
@ -2406,7 +2554,7 @@ int RocksDBRestReplicationHandler::processRestoreIndexesCoordinator(
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief determine the chunk size
|
||||
/// @brief determine the chunk size - from query url
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint64_t RocksDBRestReplicationHandler::determineChunkSize() const {
|
||||
|
|
|
@ -60,6 +60,27 @@ class RocksDBRestReplicationHandler : public RestReplicationHandler {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void handleCommandLoggerState() override;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the available logfile range
|
||||
/// @route GET logger-tick-ranges
|
||||
/// @caller js/client/modules/@arangodb/replication.js
|
||||
/// @response VPackArray, containing info about each datafile
|
||||
/// * filename
|
||||
/// * status
|
||||
/// * tickMin - tickMax
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void handleCommandLoggerTickRanges();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief return the first tick available in a logfile
|
||||
/// @route GET logger-first-tick
|
||||
/// @caller js/client/modules/@arangodb/replication.js
|
||||
/// @response VPackObject with minTick of LogfileManager->ranges()
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void handleCommandLoggerFirstTick();
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief handle a follow command for the replication log
|
||||
|
|
|
@ -84,9 +84,9 @@ static void JS_TickRangesLoggerReplication(
|
|||
|
||||
VPackBuilder builder;
|
||||
Result res = EngineSelectorFeature::ENGINE->createTickRanges(builder);
|
||||
if(res.fail()){
|
||||
if (res.fail()) {
|
||||
TRI_V8_THROW_EXCEPTION(res);
|
||||
}
|
||||
}
|
||||
|
||||
v8::Handle<v8::Value>resultValue = TRI_VPackToV8(isolate, builder.slice());
|
||||
result = v8::Handle<v8::Array>::Cast(resultValue);
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/*jshint globalstrict:false, strict:false, maxlen: 5000 */
|
||||
/*global arango, assertEqual, assertTrue*/
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief test the replication interface
|
||||
///
|
||||
/// @file
|
||||
///
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Christoph Uhde
|
||||
/// @author Copyright 2017, triAGENS GmbH, Cologne, Germany
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
var jsunity = require("jsunity");
|
||||
var arangodb = require("@arangodb");
|
||||
var db = arangodb.db;
|
||||
|
||||
function ReplicationApiSuite () {
|
||||
'use strict';
|
||||
|
||||
var cn = "UnitTestsCollection";
|
||||
var collection = null;
|
||||
var isRocksDB = ( db._engine().name === "rocksdb" );
|
||||
var batchesToFree = [];
|
||||
|
||||
return {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief set up
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
setUp : function () {
|
||||
db._drop(cn);
|
||||
collection = db._create(cn);
|
||||
for(var i = 0; i < 100; i++){
|
||||
collection.insert({"number":i});
|
||||
}
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief tear down
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
tearDown : function () {
|
||||
|
||||
// avoid hanging tests! by canceling batches
|
||||
batchesToFree.forEach( function(id){
|
||||
arango.DELETE_RAW("/_api/replication/batch/"+ id, "");
|
||||
});
|
||||
|
||||
collection.drop();
|
||||
collection = null;
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create document w/ special keys
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testCreateBatchId : function () {
|
||||
if(!isRocksDB){ return; };
|
||||
|
||||
// create batch
|
||||
var doc = {};
|
||||
var result = arango.POST_RAW("/_api/replication/batch", JSON.stringify(doc));
|
||||
assertEqual(200, result.code);
|
||||
var obj = JSON.parse(result.body);
|
||||
assertTrue(obj.hasOwnProperty("id"));
|
||||
|
||||
// delete batch
|
||||
result = arango.DELETE_RAW("/_api/replication/batch/"+ obj.id, "");
|
||||
assertEqual(204, result.code);
|
||||
|
||||
},
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get document w/ special keys
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
testKeys : function () {
|
||||
if(!isRocksDB){ return; };
|
||||
|
||||
// create batch
|
||||
var doc = {};
|
||||
var result = arango.POST_RAW("/_api/replication/batch", JSON.stringify(doc));
|
||||
assertEqual(200, result.code);
|
||||
var batchObj = JSON.parse(result.body);
|
||||
assertTrue(batchObj.hasOwnProperty("id"));
|
||||
batchesToFree.push(batchObj.id);
|
||||
|
||||
// create keys
|
||||
result = arango.POST_RAW("/_api/replication/keys?collection=" + cn +
|
||||
"&batchId=" + batchObj.id, JSON.stringify(doc));
|
||||
assertEqual(200, result.code);
|
||||
var keysObj = JSON.parse(result.body);
|
||||
assertTrue(keysObj.hasOwnProperty("count"));
|
||||
assertTrue(keysObj.hasOwnProperty("id"));
|
||||
assertTrue(keysObj.count === 100);
|
||||
|
||||
// fetch keys
|
||||
result = arango.PUT_RAW("/_api/replication/keys/"+ batchObj.id +
|
||||
"?collection=" + cn +
|
||||
"&type=keys"
|
||||
,JSON.stringify(doc)
|
||||
);
|
||||
|
||||
assertEqual(200, result.code);
|
||||
keysObj = JSON.parse(result.body);
|
||||
assertTrue(Array.isArray(keysObj));
|
||||
|
||||
result = arango.PUT_RAW("/_api/replication/keys/"+ batchObj.id +
|
||||
"?collection=" + cn +
|
||||
"&type=keys" +
|
||||
"&chunk=" + Math.pow(2,60) +
|
||||
"&chunkSize=" + Math.pow(2,60)
|
||||
,JSON.stringify(doc)
|
||||
);
|
||||
assertEqual(400, result.code);
|
||||
|
||||
// iterator should be invalid
|
||||
result = arango.PUT_RAW("/_api/replication/keys/"+ batchObj.id +
|
||||
"?collection=" + cn +
|
||||
"&type=keys" +
|
||||
"&chunk=" + 5 +
|
||||
"&chunkSize=" + 2000
|
||||
,JSON.stringify(doc)
|
||||
);
|
||||
assertEqual(400, result.code);
|
||||
|
||||
// delete batch
|
||||
result = arango.DELETE_RAW("/_api/replication/batch/"+ batchObj.id, "");
|
||||
assertEqual(204, result.code);
|
||||
|
||||
batchesToFree.pop();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}; // return
|
||||
} // Replication suite
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief executes the test suite
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
jsunity.run(ReplicationApiSuite);
|
||||
|
||||
return jsunity.done();
|
||||
|
|
@ -53,7 +53,7 @@ class Result {
|
|||
bool is(int errorNumber) const { return _errorNumber == errorNumber; }
|
||||
bool isNot(int errorNumber) const { return !is(errorNumber); }
|
||||
|
||||
void reset(int errorNumber = TRI_ERROR_NO_ERROR) {
|
||||
Result& reset(int errorNumber = TRI_ERROR_NO_ERROR) {
|
||||
_errorNumber = errorNumber;
|
||||
|
||||
if (errorNumber != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -61,26 +61,31 @@ class Result {
|
|||
} else {
|
||||
_errorMessage.clear();
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
void reset(int errorNumber, std::string const& errorMessage) {
|
||||
Result& reset(int errorNumber, std::string const& errorMessage) {
|
||||
_errorNumber = errorNumber;
|
||||
_errorMessage = errorMessage;
|
||||
return *this;
|
||||
}
|
||||
|
||||
void reset(int errorNumber, std::string&& errorMessage) {
|
||||
Result& reset(int errorNumber, std::string&& errorMessage) {
|
||||
_errorNumber = errorNumber;
|
||||
_errorMessage = std::move(errorMessage);
|
||||
return *this;
|
||||
}
|
||||
|
||||
void reset(Result const& other) {
|
||||
Result& reset(Result const& other) {
|
||||
_errorNumber = other._errorNumber;
|
||||
_errorMessage = other._errorMessage;
|
||||
return *this;
|
||||
}
|
||||
|
||||
void reset(Result&& other) {
|
||||
Result& reset(Result&& other) {
|
||||
_errorNumber = other._errorNumber;
|
||||
_errorMessage = std::move(other._errorMessage);
|
||||
return *this;
|
||||
}
|
||||
|
||||
void cloneData(Result const& other) {
|
||||
|
|
Loading…
Reference in New Issue