mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'engine-api' of github.com:arangodb/arangodb into engine-api
This commit is contained in:
commit
e33c8b76a2
|
@ -179,11 +179,14 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
|
|||
try {
|
||||
setProgress("fetching master state");
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: getting master state";
|
||||
res = getMasterState(errorMsg);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res << " " << errorMsg;
|
||||
return res;
|
||||
}
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res << " " << errorMsg;
|
||||
|
||||
if (incremental) {
|
||||
if (_masterInfo._majorVersion == 1 ||
|
||||
|
@ -210,7 +213,8 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
|
|||
return res;
|
||||
}
|
||||
|
||||
std::string url = BaseUrl + "/inventory?serverId=" + _localServerIdString;
|
||||
std::string url = BaseUrl + "/inventory?serverId=" + _localServerIdString
|
||||
+ "&batchId=" + std::to_string(_batchId);
|
||||
if (_includeSystem) {
|
||||
url += "&includeSystem=true";
|
||||
}
|
||||
|
@ -255,6 +259,7 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
|
|||
|
||||
VPackSlice const slice = builder->slice();
|
||||
if (!slice.isObject()) {
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: InitialSyncer::run - inventoryResponse is not an object";
|
||||
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
||||
|
||||
errorMsg = "got invalid response from master at " +
|
||||
|
@ -615,7 +620,10 @@ int InitialSyncer::handleCollectionDump(
|
|||
|
||||
uint64_t chunkSize = _chunkSize;
|
||||
|
||||
std::string const baseUrl = BaseUrl + "/dump?collection=" + cid + appendix;
|
||||
TRI_ASSERT(_batchId); //should not be equal to 0
|
||||
std::string const baseUrl = BaseUrl + "/dump?collection=" + cid
|
||||
+ "&batchId=" + std::to_string(_batchId)
|
||||
+ appendix;
|
||||
|
||||
TRI_voc_tick_t fromTick = 0;
|
||||
int batch = 1;
|
||||
|
@ -1691,7 +1699,7 @@ int InitialSyncer::handleCollection(VPackSlice const& parameters,
|
|||
if (checkAborted()) {
|
||||
return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
|
||||
}
|
||||
|
||||
|
||||
if (!parameters.isObject() || !indexes.isArray()) {
|
||||
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
||||
}
|
||||
|
|
|
@ -662,7 +662,6 @@ int Syncer::getMasterState(std::string& errorMsg) {
|
|||
return TRI_ERROR_REPLICATION_MASTER_ERROR;
|
||||
}
|
||||
|
||||
|
||||
auto builder = std::make_shared<VPackBuilder>();
|
||||
int res = parseResponse(builder, response.get());
|
||||
|
||||
|
@ -670,6 +669,7 @@ int Syncer::getMasterState(std::string& errorMsg) {
|
|||
VPackSlice const slice = builder->slice();
|
||||
|
||||
if (!slice.isObject()) {
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "synger::getMasterState - state is not an object";
|
||||
res = TRI_ERROR_REPLICATION_INVALID_RESPONSE;
|
||||
errorMsg = "got invalid response from master at " +
|
||||
_masterInfo._endpoint + ": invalid JSON";
|
||||
|
@ -679,6 +679,9 @@ int Syncer::getMasterState(std::string& errorMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR){
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "synger::getMasterState - handleStateResponse failed";
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -147,9 +147,10 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
|
||||
// set data
|
||||
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
|
||||
|
||||
if (!ok) {
|
||||
// TODO: do something here?
|
||||
return;
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "could not get document with token: " << token._data;
|
||||
throw RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);
|
||||
}
|
||||
|
||||
builder.add(VPackValue("data"));
|
||||
|
@ -170,6 +171,8 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
|
|||
_hasMore = _iter->next(cb, 10); // TODO: adjust limit?
|
||||
} catch (std::exception const& ex) {
|
||||
return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);
|
||||
} catch (RocksDBReplicationResult const& ex) {
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -369,7 +372,7 @@ void RocksDBReplicationContext::releaseDumpingResources() {
|
|||
std::unique_ptr<transaction::Methods>
|
||||
RocksDBReplicationContext::createTransaction(TRI_vocbase_t* vocbase) {
|
||||
double lockTimeout = transaction::Methods::DefaultLockTimeout;
|
||||
auto ctx = transaction::StandaloneContext::Create(vocbase);
|
||||
std::shared_ptr<transaction::StandaloneContext> ctx = transaction::StandaloneContext::Create(vocbase);
|
||||
std::unique_ptr<transaction::Methods> trx(new transaction::UserTransaction(
|
||||
ctx, {}, {}, {}, lockTimeout, false, true));
|
||||
Result res = trx->begin();
|
||||
|
|
Loading…
Reference in New Issue