1
0
Fork 0

data synchronisation

This commit is contained in:
Jan Steemann 2013-07-04 17:56:54 +02:00
parent a4ad46a646
commit a33dcede42
3 changed files with 73 additions and 19 deletions

View File

@ -182,6 +182,49 @@ int ReplicationFetcher::run () {
/// @{
////////////////////////////////////////////////////////////////////////////////
int ReplicationFetcher::applyCollectionDump (TRI_voc_cid_t cid,
SimpleHttpResult* response,
string& errorMsg) {
std::stringstream& data = response->getBody();
while (true) {
string line;
std::getline(data, line, '\n');
if (line.size() < 2) {
return TRI_ERROR_NO_ERROR;
}
TRI_json_t* json = TRI_JsonString(TRI_CORE_MEM_ZONE, line.c_str());
if (! JsonHelper::isArray(json)) {
if (json != 0) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
errorMsg = "received invalid JSON data for collection " + StringUtils::itoa(cid);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_json_t* type = JsonHelper::getArrayElement(json, "type");
if (! JsonHelper::isString(type)) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
errorMsg = "received invalid JSON data for collection " + StringUtils::itoa(cid);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
// std::cout << "type: " << type->_value._string.data << "\n";
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get local replication apply state
////////////////////////////////////////////////////////////////////////////////
@ -333,17 +376,25 @@ int ReplicationFetcher::getMasterInventory (string& errorMsg) {
int ReplicationFetcher::handleCollectionDump (TRI_voc_cid_t cid,
TRI_voc_tick_t maxTick,
string& errorMsg) {
static const uint64_t chunkSize = 1024 * 1024;
if (_client == 0) {
return TRI_ERROR_INTERNAL;
}
const string baseUrl = "/_api/replication/dump?collection=" + StringUtils::itoa(cid);
const string baseUrl = "/_api/replication/dump"
"?collection=" + StringUtils::itoa(cid) +
"&chunkSize=" + StringUtils::itoa(chunkSize);
map<string, string> headers;
TRI_voc_tick_t fromTick = 0;
while (true) {
string url = baseUrl + "&from=" + StringUtils::itoa(fromTick) + "&to=" + StringUtils::itoa(maxTick);
string url = baseUrl +
"&from=" + StringUtils::itoa(fromTick) +
"&to=" + StringUtils::itoa(maxTick);
// send request
SimpleHttpResult* response = _client->request(HttpRequest::HTTP_REQUEST_GET,
@ -405,13 +456,15 @@ int ReplicationFetcher::handleCollectionDump (TRI_voc_cid_t cid,
}
}
std::cout << "GOT: " << response->getBody().str().c_str() << "\n\n\n";
int res = applyCollectionDump(cid, response, errorMsg);
delete response;
if (! hasMore || fromTick == 0) {
if (res != TRI_ERROR_NO_ERROR ||
! hasMore ||
fromTick == 0) {
// done
return TRI_ERROR_NO_ERROR;
return res;
}
}
@ -508,6 +561,8 @@ int ReplicationFetcher::handleCollectionInitial (TRI_json_t const* json,
}
else if (phase == PHASE_DATA) {
int res;
LOGGER_INFO("syncing data for collection '" << masterName->_value._string.data << "', id " << id);
res = handleCollectionDump(id, _masterInfo._state._lastTick, errorMsg);

View File

@ -46,6 +46,7 @@ namespace triagens {
namespace httpclient {
class GeneralClientConnection;
class SimpleHttpClient;
class SimpleHttpResult;
}
namespace rest {
@ -145,6 +146,16 @@ namespace triagens {
/// @{
////////////////////////////////////////////////////////////////////////////////
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief apply the data from a collection dump
////////////////////////////////////////////////////////////////////////////////
int applyCollectionDump (TRI_voc_cid_t,
httpclient::SimpleHttpResult*,
string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief get local replication apply state
////////////////////////////////////////////////////////////////////////////////

View File

@ -1781,13 +1781,7 @@ static bool StringifyJsonShapeDataArray (TRI_shaper_t* shaper,
return false;
}
res = TRI_AppendCharStringBuffer(buffer, '"');
if (res != TRI_ERROR_NO_ERROR) {
return false;
}
res = TRI_AppendCharStringBuffer(buffer, ':');
res = TRI_AppendString2StringBuffer(buffer, "\":", 2);
if (res != TRI_ERROR_NO_ERROR) {
return false;
@ -1866,13 +1860,7 @@ static bool StringifyJsonShapeDataArray (TRI_shaper_t* shaper,
return false;
}
res = TRI_AppendCharStringBuffer(buffer, '"');
if (res != TRI_ERROR_NO_ERROR) {
return false;
}
res = TRI_AppendCharStringBuffer(buffer, ':');
res = TRI_AppendString2StringBuffer(buffer, "\":", 2);
if (res != TRI_ERROR_NO_ERROR) {
return false;