1
0
Fork 0
This commit is contained in:
Jan Steemann 2015-09-11 11:56:38 +02:00
parent bead727a32
commit b0a924c5a8
4 changed files with 418 additions and 45 deletions

View File

@ -51,6 +51,57 @@ using namespace triagens::arango;
using namespace triagens::httpclient;
using namespace triagens::rest;
static size_t BinarySearch (std::vector<TRI_df_marker_t const*> const& markers,
std::string const& key) {
TRI_ASSERT(! markers.empty());
size_t l = 0;
size_t r = markers.size() - 1;
while (true) {
// determine midpoint
size_t m = l + ((r - l) / 2);
char const* other = TRI_EXTRACT_MARKER_KEY(markers[m]);
int res = strcmp(key.c_str(), other);
if (res == 0) {
return m;
}
if (res < 0) {
if (m == 0) {
return SIZE_MAX;
}
r = m - 1;
}
else {
l = m + 1;
}
if (r < l) {
return SIZE_MAX;
}
}
}
static bool FindRange (std::vector<TRI_df_marker_t const*> const& markers,
std::string const& lower,
std::string const& upper,
size_t& lowerPos,
size_t& upperPos) {
bool found = false;
if (! markers.empty()) {
lowerPos = BinarySearch(markers, lower);
if (lowerPos != SIZE_MAX) {
upperPos = BinarySearch(markers, upper);
}
}
return found;
}
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -107,7 +158,8 @@ InitialSyncer::~InitialSyncer () {
/// @brief run method, performs a full synchronization
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::run (string& errorMsg) {
int InitialSyncer::run (string& errorMsg,
bool incremental) {
if (_client == nullptr ||
_connection == nullptr ||
_endpoint == nullptr) {
@ -167,7 +219,7 @@ int InitialSyncer::run (string& errorMsg) {
std::unique_ptr<TRI_json_t> json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, response->getBody().c_str()));
if (JsonHelper::isObject(json.get())) {
res = handleInventoryResponse(json.get(), errorMsg);
res = handleInventoryResponse(json.get(), incremental, errorMsg);
}
else {
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
@ -569,6 +621,233 @@ int InitialSyncer::handleCollectionDump (string const& cid,
return TRI_ERROR_INTERNAL;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::handleCollectionSync (string const& cid,
TRI_document_collection_t* document,
TRI_transaction_collection_t* trxCollection,
string const& collectionName,
TRI_voc_tick_t maxTick,
string& errorMsg) {
string const baseUrl = BaseUrl + "/keys";
std::unique_ptr<SimpleHttpResult> response(_client->request(HttpRequest::HTTP_REQUEST_POST,
BaseUrl + "?collection=" + cid,
nullptr,
0));
if (response == nullptr || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
StringBuffer& data = response->getBody();
// order collection keys
std::unique_ptr<TRI_json_t> json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.c_str()));
if (! TRI_IsObjectJson(json.get())) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": response is no object";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_json_t const* idJson = TRI_LookupObjectJson(json.get(), "id");
if (! TRI_IsStringJson(idJson)) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": response does not contain 'id' attribute";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
std::string const id(idJson->_value._string.data, idJson->_value._string.length - 1);
// now we can fetch the complete chunk information from the master
int res = handleSyncKeys(id, cid, document, trxCollection, collectionName, maxTick, errorMsg);
{
// now delete the keys we ordered
std::unique_ptr<SimpleHttpResult> response(_client->request(HttpRequest::HTTP_REQUEST_DELETE,
BaseUrl + "/" + id,
nullptr,
0));
if (response == nullptr || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
#include <iostream>
int InitialSyncer::handleSyncKeys (std::string const& keysId,
std::string const& cid,
TRI_document_collection_t* document,
TRI_transaction_collection_t* trxCollection,
std::string const& collectionName,
TRI_voc_tick_t maxTick,
std::string& errorMsg) {
// fetch all local keys from primary index
std::vector<TRI_df_marker_t const*> markers;
auto idx = document->primaryIndex();
markers.reserve(idx->size());
triagens::basics::BucketPosition position;
uint64_t total = 0;
while (true) {
auto ptr = idx->lookupSequential(position, total);
if (ptr == nullptr) {
// done
break;
}
void const* marker = ptr->getDataPtr();
auto df = static_cast<TRI_df_marker_t const*>(marker);
if (df->_tick >= maxTick) {
continue;
}
markers.emplace_back(df);
}
// sort all our local keys
std::sort(markers.begin(), markers.end(), [] (TRI_df_marker_t const* lhs, TRI_df_marker_t const* rhs) -> bool {
int res = strcmp(TRI_EXTRACT_MARKER_KEY(lhs), TRI_EXTRACT_MARKER_KEY(rhs));
return res < 0;
});
TRI_voc_tick_t const chunkSize = 5000;
string const baseUrl = BaseUrl + "/keys";
std::unique_ptr<SimpleHttpResult> response(_client->request(HttpRequest::HTTP_REQUEST_PUT,
BaseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize),
nullptr,
0));
if (response == nullptr || ! response->isComplete()) {
errorMsg = "could not connect to master at " + string(_masterInfo._endpoint) +
": " + _client->getErrorMessage();
return TRI_ERROR_REPLICATION_NO_RESPONSE;
}
TRI_ASSERT(response != nullptr);
if (response->wasHttpError()) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": HTTP " + StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage();
return TRI_ERROR_REPLICATION_MASTER_ERROR;
}
StringBuffer& data = response->getBody();
// parse chunks
std::unique_ptr<TRI_json_t> json(TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data.c_str()));
if (! TRI_IsArrayJson(json.get())) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": response is no array";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
size_t const n = TRI_LengthArrayJson(json.get());
// now process each chunk
for (size_t i = 0; i < n; ++i) {
// read remote chunk
auto chunk = static_cast<TRI_json_t const*>(TRI_AtVector(&(json.get()->_value._objects), i));
if (! TRI_IsObjectJson(chunk)) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": chunk is no object";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
auto lowJson = TRI_LookupObjectJson(chunk, "low");
auto highJson = TRI_LookupObjectJson(chunk, "high");
auto hashJson = TRI_LookupObjectJson(chunk, "hash");
std::cout << "i: " << i << ", RANGE LOW: " << std::string(lowJson->_value._string.data) << ", HIGH: " << std::string(highJson->_value._string.data) << ", HASH: " << std::string(hashJson->_value._string.data) << "\n";
if (! TRI_IsStringJson(lowJson) || ! TRI_IsStringJson(highJson) || ! TRI_IsStringJson(hashJson)) {
errorMsg = "got invalid response from master at " + string(_masterInfo._endpoint) +
": chunks in response have an invalid format";
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
size_t localFrom;
size_t localTo;
bool match = FindRange(markers,
std::string(lowJson->_value._string.data, lowJson->_value._string.length - 1),
std::string(highJson->_value._string.data, highJson->_value._string.length - 1),
localFrom,
localTo);
if (match) {
// now must hash the range
uint64_t hash = 0x012345678;
for (size_t i = localFrom; i < localTo; ++i) {
auto marker = markers.at(i);
char const* key = TRI_EXTRACT_MARKER_KEY(marker);
hash ^= TRI_FnvHashString(key);
hash ^= TRI_EXTRACT_MARKER_RID(marker);
}
if (std::to_string(hash) != std::string(hashJson->_value._string.data, hashJson->_value._string.length - 1)) {
match = false;
}
}
std::cout << "RANGE DOES MATCH: " << (int) match << "\n";
if (! match) {
// must transfer keys for non-matching range
}
}
// TODO: remove all keys that are below first remote key or beyond last remote key
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the information about a collection
////////////////////////////////////////////////////////////////////////////////
@ -705,7 +984,7 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters,
// -------------------------------------------------------------------------------------
else if (phase == PHASE_DUMP) {
string const progress = "syncing data for " + collectionMsg;
string const progress = "dumping data for " + collectionMsg;
setProgress(progress.c_str());
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
@ -811,6 +1090,55 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters,
return res;
}
// sync collection data incrementally
// -------------------------------------------------------------------------------------
else if (phase == PHASE_SYNC) {
string const progress = "syncing data for " + collectionMsg;
setProgress(progress.c_str());
TRI_vocbase_col_t* col = TRI_LookupCollectionByIdVocBase(_vocbase, cid);
if (col == nullptr && ! masterName.empty()) {
// not found, try name next
col = TRI_LookupCollectionByNameVocBase(_vocbase, masterName.c_str());
}
if (col == nullptr) {
errorMsg = "cannot sync: " + collectionMsg + " not found";
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
int res = TRI_ERROR_INTERNAL;
{
SingleCollectionWriteTransaction<UINT64_MAX> trx(new StandaloneTransactionContext(), _vocbase, col->_cid);
res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "unable to start transaction: " + string(TRI_errno_string(res));
return res;
}
TRI_transaction_collection_t* trxCollection = trx.trxCollection();
if (trxCollection == nullptr) {
res = TRI_ERROR_INTERNAL;
errorMsg = "unable to start transaction: " + string(TRI_errno_string(res));
}
else {
res = handleCollectionSync(StringUtils::itoa(cid), trx.documentCollection(), trxCollection, masterName, _masterInfo._lastLogTick, errorMsg);
}
res = trx.finish(res);
}
return res;
}
// we won't get here
@ -823,7 +1151,8 @@ int InitialSyncer::handleCollection (TRI_json_t const* parameters,
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
string& errorMsg) {
bool incremental,
std::string& errorMsg) {
TRI_json_t const* data = JsonHelper::getObjectElement(json, "collections");
if (! JsonHelper::isArray(data)) {
@ -892,7 +1221,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
}
}
collections.emplace_back(std::make_pair(parameters, indexes));
collections.emplace_back(parameters, indexes);
}
int res;
@ -906,10 +1235,18 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
if (incremental) {
// STEP 2: sync collection data from master and create initial indexes
// ----------------------------------------------------------------------------------
return iterateCollections(collections, errorMsg, PHASE_SYNC);
}
TRI_ASSERT(! incremental);
// STEP 2: drop collections locally if they are also present on the master (clean up)
// ----------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------
res = iterateCollections(collections, errorMsg, PHASE_DROP);
@ -917,7 +1254,6 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
return res;
}
// STEP 3: re-create empty collections locally
// ----------------------------------------------------------------------------------
@ -926,8 +1262,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// STEP 4: sync collection data from master and create initial indexes
// ----------------------------------------------------------------------------------
@ -939,7 +1274,7 @@ int InitialSyncer::handleInventoryResponse (TRI_json_t const* json,
////////////////////////////////////////////////////////////////////////////////
int InitialSyncer::iterateCollections (std::vector<std::pair<TRI_json_t const*, TRI_json_t const*>> const& collections,
string& errorMsg,
std::string& errorMsg,
sync_phase_e phase) {
std::string phaseMsg("starting phase " + translatePhase(phase) + " with " + std::to_string(collections.size()) + " collections");
setProgress(phaseMsg);

View File

@ -72,7 +72,8 @@ namespace triagens {
PHASE_VALIDATE,
PHASE_DROP,
PHASE_CREATE,
PHASE_DUMP
PHASE_DUMP,
PHASE_SYNC
}
sync_phase_e;
@ -108,7 +109,7 @@ namespace triagens {
/// @brief run method, performs a full synchronization
////////////////////////////////////////////////////////////////////////////////
int run (std::string&);
int run (std::string&, bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief return the last log tick of the master at start
@ -134,6 +135,8 @@ namespace triagens {
return "create";
case PHASE_DUMP:
return "dump";
case PHASE_SYNC:
return "sync";
case PHASE_NONE:
break;
}
@ -203,6 +206,29 @@ namespace triagens {
TRI_voc_tick_t,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
int handleCollectionSync (std::string const&,
struct TRI_document_collection_t*,
struct TRI_transaction_collection_s*,
std::string const&,
TRI_voc_tick_t,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
////////////////////////////////////////////////////////////////////////////////
int handleSyncKeys (std::string const&,
std::string const&,
struct TRI_document_collection_t*,
struct TRI_transaction_collection_s*,
std::string const&,
TRI_voc_tick_t,
std::string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief handle the information about a collection
////////////////////////////////////////////////////////////////////////////////
@ -217,6 +243,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int handleInventoryResponse (struct TRI_json_t const*,
bool,
std::string&);
////////////////////////////////////////////////////////////////////////////////

View File

@ -3158,7 +3158,7 @@ void RestReplicationHandler::handleCommandCreateKeys () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a key range
/// @brief returns all key ranges
////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandGetKeys () {
@ -3170,32 +3170,23 @@ void RestReplicationHandler::handleCommandGetKeys () {
"expecting GET /_api/replication/keys/<keys-id>");
return;
}
static TRI_voc_tick_t const DefaultChunkSize = 5000;
TRI_voc_tick_t chunkSize = DefaultChunkSize;
// determine chunk size
bool found;
char const* value = _request->value("chunkSize", found);
if (found) {
chunkSize = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value));
if (chunkSize < 100) {
chunkSize = DefaultChunkSize;
}
}
std::string const& id = suffix[1];
TRI_voc_tick_t tickStart = 0;
TRI_voc_tick_t tickEnd = 0;
// determine start and end tick for keys
bool found;
char const* value = _request->value("from", found);
if (found) {
tickStart = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value));
}
value = _request->value("to", found);
if (found) {
tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value));
}
if (tickStart > tickEnd || tickEnd == 0) {
generateError(HttpResponse::BAD,
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid from/to values");
return;
}
int res = TRI_ERROR_NO_ERROR;
try {
@ -3218,14 +3209,27 @@ void RestReplicationHandler::handleCommandGetKeys () {
}
try {
auto result = collectionKeys->hashChunk(tickStart, tickEnd);
triagens::basics::Json json(triagens::basics::Json::Array, 200);
TRI_voc_tick_t max = static_cast<TRI_voc_tick_t>(collectionKeys->count());
for (TRI_voc_tick_t from = 0; from < max; from += chunkSize) {
TRI_voc_tick_t to = from + chunkSize;
if (to > max) {
to = max;
}
auto result = collectionKeys->hashChunk(from, to);
triagens::basics::Json chunk(triagens::basics::Json::Object, 3);
chunk.set("low", triagens::basics::Json(std::get<0>(result)));
chunk.set("high", triagens::basics::Json(std::get<1>(result)));
chunk.set("hash", triagens::basics::Json(std::to_string(std::get<2>(result))));
json.add(chunk);
}
collectionKeys->release();
triagens::basics::Json json(triagens::basics::Json::Object, 3);
json.set("low", triagens::basics::Json(std::get<0>(result)));
json.set("high", triagens::basics::Json(std::get<1>(result)));
json.set("hash", triagens::basics::Json(std::to_string(std::get<2>(result))));
generateResult(HttpResponse::OK, json.json());
}
@ -3816,7 +3820,7 @@ void RestReplicationHandler::handleCommandMakeSlave () {
res = TRI_ERROR_NO_ERROR;
try {
res = syncer.run(errorMsg);
res = syncer.run(errorMsg, false);
}
catch (...) {
errorMsg = "caught an exception";
@ -4003,7 +4007,7 @@ void RestReplicationHandler::handleCommandSync () {
string errorMsg = "";
try {
res = syncer.run(errorMsg);
res = syncer.run(errorMsg, false);
}
catch (...) {
errorMsg = "caught an exception";

View File

@ -278,6 +278,13 @@ static void JS_SynchronizeReplication (const v8::FunctionCallbackInfo<v8::Value>
config._requireFromPresent = TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("requireFromPresent")));
}
}
bool incremental = false;
if (object->Has(TRI_V8_ASCII_STRING("incremental"))) {
if (object->Get(TRI_V8_ASCII_STRING("incremental"))->IsBoolean()) {
incremental = TRI_ObjectToBoolean(object->Get(TRI_V8_ASCII_STRING("incremental")));
}
}
string errorMsg = "";
InitialSyncer syncer(vocbase, &config, restrictCollections, restrictType, verbose);
@ -287,7 +294,7 @@ static void JS_SynchronizeReplication (const v8::FunctionCallbackInfo<v8::Value>
v8::Handle<v8::Object> result = v8::Object::New(isolate);
try {
res = syncer.run(errorMsg);
res = syncer.run(errorMsg, incremental);
result->Set(TRI_V8_ASCII_STRING("lastLogTick"), V8TickId(isolate, syncer.getLastLogTick()));