mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'engine-api' of https://github.com/arangodb/arangodb into engine-api
This commit is contained in:
commit
06fea9dbf4
|
@ -179,11 +179,14 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
|
||||||
try {
|
try {
|
||||||
setProgress("fetching master state");
|
setProgress("fetching master state");
|
||||||
|
|
||||||
|
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: getting master state";
|
||||||
res = getMasterState(errorMsg);
|
res = getMasterState(errorMsg);
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res << " " << errorMsg;
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res << " " << errorMsg;
|
||||||
|
|
||||||
if (incremental) {
|
if (incremental) {
|
||||||
if (_masterInfo._majorVersion == 1 ||
|
if (_masterInfo._majorVersion == 1 ||
|
||||||
|
@ -210,7 +213,8 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string url = BaseUrl + "/inventory?serverId=" + _localServerIdString;
|
std::string url = BaseUrl + "/inventory?serverId=" + _localServerIdString
|
||||||
|
+ "&batchId=" + std::to_string(_batchId);
|
||||||
if (_includeSystem) {
|
if (_includeSystem) {
|
||||||
url += "&includeSystem=true";
|
url += "&includeSystem=true";
|
||||||
}
|
}
|
||||||
|
@ -255,6 +259,7 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
|
||||||
|
|
||||||
VPackSlice const slice = builder->slice();
|
VPackSlice const slice = builder->slice();
|
||||||
if (!slice.isObject()) {
|
if (!slice.isObject()) {
|
||||||
|
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: InitialSyncer::run - inventoryResponse is not an object";
|
||||||
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
||||||
|
|
||||||
errorMsg = "got invalid response from master at " +
|
errorMsg = "got invalid response from master at " +
|
||||||
|
@ -615,7 +620,10 @@ int InitialSyncer::handleCollectionDump(
|
||||||
|
|
||||||
uint64_t chunkSize = _chunkSize;
|
uint64_t chunkSize = _chunkSize;
|
||||||
|
|
||||||
std::string const baseUrl = BaseUrl + "/dump?collection=" + cid + appendix;
|
TRI_ASSERT(_batchId); //should not be equal to 0
|
||||||
|
std::string const baseUrl = BaseUrl + "/dump?collection=" + cid
|
||||||
|
+ "&batchId=" + std::to_string(_batchId)
|
||||||
|
+ appendix;
|
||||||
|
|
||||||
TRI_voc_tick_t fromTick = 0;
|
TRI_voc_tick_t fromTick = 0;
|
||||||
int batch = 1;
|
int batch = 1;
|
||||||
|
|
|
@ -662,7 +662,6 @@ int Syncer::getMasterState(std::string& errorMsg) {
|
||||||
return TRI_ERROR_REPLICATION_MASTER_ERROR;
|
return TRI_ERROR_REPLICATION_MASTER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
auto builder = std::make_shared<VPackBuilder>();
|
auto builder = std::make_shared<VPackBuilder>();
|
||||||
int res = parseResponse(builder, response.get());
|
int res = parseResponse(builder, response.get());
|
||||||
|
|
||||||
|
@ -670,6 +669,7 @@ int Syncer::getMasterState(std::string& errorMsg) {
|
||||||
VPackSlice const slice = builder->slice();
|
VPackSlice const slice = builder->slice();
|
||||||
|
|
||||||
if (!slice.isObject()) {
|
if (!slice.isObject()) {
|
||||||
|
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "synger::getMasterState - state is not an object";
|
||||||
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
||||||
errorMsg = "got invalid response from master at " +
|
errorMsg = "got invalid response from master at " +
|
||||||
_masterInfo._endpoint + ": invalid JSON";
|
_masterInfo._endpoint + ": invalid JSON";
|
||||||
|
@ -679,6 +679,9 @@ int Syncer::getMasterState(std::string& errorMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR){
|
||||||
|
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "synger::getMasterState - handleStateResponse failed";
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,9 +147,10 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
||||||
|
|
||||||
// set data
|
// set data
|
||||||
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
|
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
|
||||||
|
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
// TODO: do something here?
|
LOG_TOPIC(ERR, Logger::REPLICATION) << "could not get document with token: " << token._data;
|
||||||
return;
|
throw RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);
|
||||||
}
|
}
|
||||||
|
|
||||||
builder.add(VPackValue("data"));
|
builder.add(VPackValue("data"));
|
||||||
|
@ -170,6 +171,8 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
||||||
_hasMore = _iter->next(cb, 10); // TODO: adjust limit?
|
_hasMore = _iter->next(cb, 10); // TODO: adjust limit?
|
||||||
} catch (std::exception const& ex) {
|
} catch (std::exception const& ex) {
|
||||||
return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);
|
return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);
|
||||||
|
} catch (RocksDBReplicationResult const& ex) {
|
||||||
|
return ex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,7 +372,7 @@ void RocksDBReplicationContext::releaseDumpingResources() {
|
||||||
std::unique_ptr<transaction::Methods>
|
std::unique_ptr<transaction::Methods>
|
||||||
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
|
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
|
||||||
double lockTimeout = transaction::Methods::DefaultLockTimeout;
|
double lockTimeout = transaction::Methods::DefaultLockTimeout;
|
||||||
auto ctx = transaction::StandaloneContext::Create(vocbase);
|
std::shared_ptr<transaction::StandaloneContext> ctx = transaction::StandaloneContext::Create(vocbase);
|
||||||
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
|
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
|
||||||
ctx, {}, {}, {}, lockTimeout, false, true));
|
ctx, {}, {}, {}, lockTimeout, false, true));
|
||||||
Result res = trx->begin();
|
Result res = trx->begin();
|
||||||
|
|
Loading…
Reference in New Issue