1
0
Fork 0

add contextId to replication handler

This commit is contained in:
Jan Christoph Uhde 2017-04-20 10:56:17 +02:00
parent 97c41a3ea7
commit 7889291247
5 changed files with 88 additions and 19 deletions

View File

@ -37,7 +37,7 @@ void RocksDBRestHandlers::registerResources(
RestHandlerCreator<RocksDBRestExportHandler>::createNoData);
handlerFactory->addPrefixHandler(
"/_admin/replication",
"/_api/replication",
RestHandlerCreator<RocksDBRestReplicationHandler>::createNoData);
handlerFactory->addPrefixHandler(

View File

@ -19,6 +19,7 @@
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEngine/RocksDBRestReplicationHandler.h"
@ -478,9 +479,19 @@ void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() {
////////////////////////////////////////////////////////////////////////////////
void RocksDBRestReplicationHandler::handleCommandInventory() {
uint64_t contextId = TRI_CurrentTickServer();
// add context id to store
// TODO - add context id to store
VPackBuilder builder;
builder.openObject();
builder.add("contextId", VPackValue(std::to_string(contextId)));
builder.close();
generateError(rest::ResponseCode::NOT_IMPLEMENTED,
TRI_ERROR_NOT_YET_IMPLEMENTED,
"replication API is not fully implemented for RocksDB yet");
//generateResult(rest::ResponseCode::OK, builder.slice());
}
////////////////////////////////////////////////////////////////////////////////
@ -560,6 +571,42 @@ void RocksDBRestReplicationHandler::handleCommandRemoveKeys() {
////////////////////////////////////////////////////////////////////////////////
void RocksDBRestReplicationHandler::handleCommandDump() {
bool found = false;
uint64_t contextId = 0;
// handle collection
std::string const& collection = _request->value("collection");
if (collection.empty()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid collection parameter");
return;
}
arangodb::LogicalCollection* c = _vocbase->lookupCollection(collection);
if (c == nullptr) {
generateError(rest::ResponseCode::NOT_FOUND,
TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
return;
}
//get contextId
std::string const& contextIdString = _request->value("contextId", found);
if (found) {
contextId = StringUtils::uint64(contextIdString);
} else {
generateError(rest::ResponseCode::NOT_FOUND,
TRI_ERROR_HTTP_BAD_PARAMETER,
"replication dump request misses contextId");
}
// lock context id || die
// print request
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "requested collection dump for collection '" << collection
<< "' using contextId '" << contextId
<< "'";
// fail
generateError(rest::ResponseCode::NOT_IMPLEMENTED,
TRI_ERROR_NOT_YET_IMPLEMENTED,
"replication API is not fully implemented for RocksDB yet");

View File

@ -19,6 +19,7 @@
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_ROCKSDB_ROCKSDB_REST_REPLICATION_HANDLER_H

View File

@ -298,6 +298,7 @@ void DumpFeature::endBatch(std::string DBserver) {
/// @brief dump a single collection
int DumpFeature::dumpCollection(int fd, std::string const& cid,
std::string const& contextIdString,
std::string const& name, uint64_t maxTick,
std::string& errorMsg) {
uint64_t chunkSize = _chunkSize;
@ -319,6 +320,11 @@ int DumpFeature::dumpCollection(int fd, std::string const& cid,
url += "&compat28=true";
}
if (!contextIdString.empty()) {
url += "&contextId=";
url += contextIdString;
}
_stats._totalBatches++;
std::unique_ptr<SimpleHttpResult> response(
@ -487,6 +493,20 @@ int DumpFeature::runDump(std::string& dbName, std::string& errorMsg) {
maxTick = _tickEnd;
}
// read the server's contextId
std::string const contextIdString =
arangodb::basics::VelocyPackHelper::getStringValue(body, "contextId", "");
// if (contextIdString == "") {
// errorMsg = "got malformed JSON response from server - contextId is
// missing";
// return TRI_ERROR_INTERNAL;
//}
std::cout << "contextId provided by server is: " << contextIdString
<< std::endl;
// uint64_t contextId = StringUtils::uint64(contextIdString);
try {
VPackBuilder meta;
meta.openObject();
@ -504,8 +524,9 @@ int DumpFeature::runDump(std::string& dbName, std::string& errorMsg) {
TRI_UnlinkFile(fileName.c_str());
}
fd = TRI_TRACKED_CREATE_FILE(fileName.c_str(), O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
fd = TRI_TRACKED_CREATE_FILE(fileName.c_str(),
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";
@ -605,8 +626,8 @@ int DumpFeature::runDump(std::string& dbName, std::string& errorMsg) {
}
fd = TRI_TRACKED_CREATE_FILE(fileName.c_str(),
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";
@ -641,8 +662,8 @@ int DumpFeature::runDump(std::string& dbName, std::string& errorMsg) {
}
fd = TRI_TRACKED_CREATE_FILE(fileName.c_str(),
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";
@ -651,8 +672,8 @@ int DumpFeature::runDump(std::string& dbName, std::string& errorMsg) {
}
extendBatch("");
int res =
dumpCollection(fd, std::to_string(cid), name, maxTick, errorMsg);
int res = dumpCollection(fd, std::to_string(cid), contextIdString, name,
maxTick, errorMsg);
TRI_TRACKED_CLOSE_FILE(fd);
@ -879,9 +900,9 @@ int DumpFeature::runClusterDump(std::string& errorMsg) {
TRI_UnlinkFile(fileName.c_str());
}
int fd = TRI_TRACKED_CREATE_FILE(fileName.c_str(),
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
int fd = TRI_TRACKED_CREATE_FILE(
fileName.c_str(), O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";
@ -915,9 +936,9 @@ int DumpFeature::runClusterDump(std::string& errorMsg) {
TRI_UnlinkFile(fileName.c_str());
}
int fd = TRI_TRACKED_CREATE_FILE(fileName.c_str(),
O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
int fd = TRI_TRACKED_CREATE_FILE(
fileName.c_str(), O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC,
S_IRUSR | S_IWUSR);
if (fd < 0) {
errorMsg = "cannot write to file '" + fileName + "'";

View File

@ -32,10 +32,9 @@ class SimpleHttpResult;
}
class DumpFeature final : public application_features::ApplicationFeature,
public ArangoClientHelper {
public ArangoClientHelper {
public:
DumpFeature(application_features::ApplicationServer* server,
int* result);
DumpFeature(application_features::ApplicationServer* server, int* result);
public:
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
@ -62,7 +61,8 @@ class DumpFeature final : public application_features::ApplicationFeature,
int startBatch(std::string DBserver, std::string& errorMsg);
void extendBatch(std::string DBserver);
void endBatch(std::string DBserver);
int dumpCollection(int fd, std::string const& cid, std::string const& name,
int dumpCollection(int fd, std::string const& collectionId,
std::string const& contextId, std::string const& name,
uint64_t maxTick, std::string& errorMsg);
void flushWal();
int runDump(std::string& dbName, std::string& errorMsg);