mirror of https://gitee.com/bigwinds/arangodb
use proper log topic (#6915)
This commit is contained in:
parent
c83d74e384
commit
97a6bc58ac
|
@ -115,9 +115,15 @@ static bool FindRange(std::vector<uint8_t const*> const& markers,
|
|||
Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
||||
arangodb::LogicalCollection* coll,
|
||||
std::string const& keysId) {
|
||||
std::string progress =
|
||||
"collecting local keys for collection '" + coll->name() + "'";
|
||||
syncer.setProgress(progress);
|
||||
double const startTime = TRI_microtime();
|
||||
|
||||
syncer.setProgress(std::string("collecting local keys for collection '") + coll->name() + "'");
|
||||
|
||||
if (syncer.isAborted()) {
|
||||
return Result(TRI_ERROR_REPLICATION_APPLIER_STOPPED);
|
||||
}
|
||||
|
||||
InitialSyncerIncrementalSyncStats stats;
|
||||
|
||||
// fetch all local keys from primary index
|
||||
std::vector<uint8_t const*> markers;
|
||||
|
@ -205,10 +211,8 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
syncer._state.barrier.extend(syncer._state.connection);
|
||||
}
|
||||
|
||||
std::string progress = "sorting " + std::to_string(markers.size()) +
|
||||
" local key(s) for collection '" + coll->name() +
|
||||
"'";
|
||||
syncer.setProgress(progress);
|
||||
syncer.setProgress(std::string("sorting ") + std::to_string(markers.size()) +
|
||||
" local key(s) for collection '" + coll->name() + "'");
|
||||
|
||||
// sort all our local keys
|
||||
std::sort(
|
||||
|
@ -254,15 +258,17 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
|
||||
std::string url =
|
||||
baseUrl + "/" + keysId + "?chunkSize=" + std::to_string(chunkSize);
|
||||
progress = "fetching remote keys chunks for collection '" + coll->name() +
|
||||
"' from " + url;
|
||||
syncer.setProgress(progress);
|
||||
syncer.setProgress(std::string("fetching remote keys chunks for collection '") + coll->name() + "' from " + url);
|
||||
|
||||
double t = TRI_microtime();
|
||||
|
||||
std::unique_ptr<httpclient::SimpleHttpResult> response;
|
||||
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
|
||||
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
|
||||
});
|
||||
|
||||
stats.waitedForInitial += TRI_microtime() - t;
|
||||
|
||||
if (replutils::hasFailed(response.get())) {
|
||||
return buildHttpError(response.get(), url, syncer._state.connection);
|
||||
}
|
||||
|
@ -342,6 +348,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
keyBuilder.close();
|
||||
|
||||
trx.remove(coll->name(), keyBuilder.slice(), options);
|
||||
++stats.numDocsRemoved;
|
||||
}
|
||||
|
||||
// last high
|
||||
|
@ -368,6 +375,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
keyBuilder.close();
|
||||
|
||||
trx.remove(coll->name(), keyBuilder.slice(), options);
|
||||
++stats.numDocsRemoved;
|
||||
}
|
||||
|
||||
trx.commit();
|
||||
|
@ -411,9 +419,8 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
auto idx = physical->primaryIndex();
|
||||
|
||||
size_t const currentChunkId = i;
|
||||
progress = "processing keys chunk " + std::to_string(currentChunkId) +
|
||||
" for collection '" + coll->name() + "'";
|
||||
syncer.setProgress(progress);
|
||||
syncer.setProgress(std::string("processing keys chunk ") + std::to_string(currentChunkId) +
|
||||
" for collection '" + coll->name() + "'");
|
||||
|
||||
if (!syncer._state.isChildSyncer) {
|
||||
syncer._batch.extend(syncer._state.connection, syncer._progress);
|
||||
|
@ -473,15 +480,19 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
std::string url = baseUrl + "/" + keysId +
|
||||
"?type=keys&chunk=" + std::to_string(i) +
|
||||
"&chunkSize=" + std::to_string(chunkSize);
|
||||
progress = "fetching keys chunk " + std::to_string(currentChunkId) +
|
||||
" for collection '" + coll->name() + "' from " + url;
|
||||
syncer.setProgress(progress);
|
||||
syncer.setProgress(std::string("fetching keys chunk ") + std::to_string(currentChunkId) +
|
||||
" for collection '" + coll->name() + "' from " + url);
|
||||
|
||||
double t = TRI_microtime();
|
||||
|
||||
std::unique_ptr<httpclient::SimpleHttpResult> response;
|
||||
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
|
||||
response.reset(client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
|
||||
});
|
||||
|
||||
stats.waitedForKeys += TRI_microtime() - t;
|
||||
++stats.numKeysRequests;
|
||||
|
||||
if (replutils::hasFailed(response.get())) {
|
||||
return buildHttpError(response.get(), url, syncer._state.connection);
|
||||
}
|
||||
|
@ -520,6 +531,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
keyBuilder.close();
|
||||
|
||||
trx.remove(coll->name(), keyBuilder.slice(), options);
|
||||
++stats.numDocsRemoved;
|
||||
++nextStart;
|
||||
} else {
|
||||
break;
|
||||
|
@ -578,6 +590,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
keyBuilder.close();
|
||||
|
||||
trx.remove(coll->name(), keyBuilder.slice(), options);
|
||||
++stats.numDocsRemoved;
|
||||
++nextStart;
|
||||
} else if (res == 0) {
|
||||
// key match
|
||||
|
@ -653,11 +666,11 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
std::to_string(currentChunkId) +
|
||||
"&chunkSize=" + std::to_string(chunkSize) +
|
||||
"&offset=" + std::to_string(offsetInChunk);
|
||||
progress = "fetching documents chunk " +
|
||||
syncer.setProgress(std::string("fetching documents chunk ") +
|
||||
std::to_string(currentChunkId) + " for collection '" +
|
||||
coll->name() + "' from " + url;
|
||||
coll->name() + "' from " + url);
|
||||
|
||||
syncer.setProgress(progress);
|
||||
double t = TRI_microtime();
|
||||
|
||||
std::unique_ptr<httpclient::SimpleHttpResult> response;
|
||||
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
|
||||
|
@ -665,6 +678,10 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
keyJsonString.size()));
|
||||
});
|
||||
|
||||
stats.waitedForDocs += TRI_microtime() - t;
|
||||
stats.numDocsRequested += toFetch.size();
|
||||
++stats.numDocsRequests;
|
||||
|
||||
if (replutils::hasFailed(response.get())) {
|
||||
return buildHttpError(response.get(), url, syncer._state.connection);
|
||||
}
|
||||
|
@ -782,6 +799,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
}
|
||||
}
|
||||
}
|
||||
++stats.numDocsInserted;
|
||||
}
|
||||
|
||||
if (foundLength >= toFetch.size()) {
|
||||
|
@ -801,6 +819,18 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
|
|||
}
|
||||
}
|
||||
|
||||
syncer.setProgress(
|
||||
std::string("incremental sync statistics for collection '") + coll->name() + "': " +
|
||||
"keys requests: " + std::to_string(stats.numKeysRequests) + ", " +
|
||||
"docs requests: " + std::to_string(stats.numDocsRequests) + ", " +
|
||||
"number of documents requested: " + std::to_string(stats.numDocsRequested) + ", " +
|
||||
"number of documents inserted: " + std::to_string(stats.numDocsInserted) + ", " +
|
||||
"number of documents removed: " + std::to_string(stats.numDocsRemoved) + ", " +
|
||||
"waited for initial: " + std::to_string(stats.waitedForInitial) + " s, " +
|
||||
"waited for keys: " + std::to_string(stats.waitedForKeys) + " s, " +
|
||||
"waited for docs: " + std::to_string(stats.waitedForDocs) + " s, " +
|
||||
"total time: " + std::to_string(TRI_microtime() - startTime) + " s");
|
||||
|
||||
return Result();
|
||||
}
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -122,24 +122,23 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin
|
|||
var slaveState = replication.applier.state();
|
||||
|
||||
if (slaveState.state.lastError.errorNum > 0) {
|
||||
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
break;
|
||||
}
|
||||
|
||||
if (!slaveState.state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
|
||||
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
|
||||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
|
||||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
|
||||
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!printed) {
|
||||
console.log("waiting for slave to catch up");
|
||||
console.topic("replication=debug", "waiting for slave to catch up");
|
||||
printed = true;
|
||||
}
|
||||
internal.wait(0.5, false);
|
||||
|
|
|
@ -153,24 +153,23 @@ var compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFinal
|
|||
var slaveState = replication.applier.state();
|
||||
|
||||
if (slaveState.state.lastError.errorNum > 0) {
|
||||
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
break;
|
||||
}
|
||||
|
||||
if (!slaveState.state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
|
||||
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
|
||||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
|
||||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
|
||||
console.log("slave has caught up. syncResult.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
console.topic("replication=debug", "slave has caught up. syncResult.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!printed) {
|
||||
console.log("waiting for slave to catch up");
|
||||
console.topic("replication=debug", "waiting for slave to catch up");
|
||||
printed = true;
|
||||
}
|
||||
internal.wait(0.5, false);
|
||||
|
|
|
@ -153,24 +153,23 @@ var compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFinal
|
|||
var slaveState = replication.applier.state();
|
||||
|
||||
if (slaveState.state.lastError.errorNum > 0) {
|
||||
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
break;
|
||||
}
|
||||
|
||||
if (!slaveState.state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
|
||||
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
|
||||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
|
||||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
|
||||
console.log("slave has caught up. syncResult.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
console.topic("replication=debug", "slave has caught up. syncResult.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!printed) {
|
||||
console.log("waiting for slave to catch up");
|
||||
console.topic("replication=debug", "waiting for slave to catch up");
|
||||
printed = true;
|
||||
}
|
||||
internal.wait(0.5, false);
|
||||
|
|
|
@ -129,23 +129,22 @@ const compare = function(masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFin
|
|||
var slaveState = replication.globalApplier.state();
|
||||
|
||||
if (slaveState.state.lastError.errorNum > 0) {
|
||||
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
break;
|
||||
}
|
||||
|
||||
if (!slaveState.state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
|
||||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
|
||||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
|
||||
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!printed) {
|
||||
console.log("waiting for slave to catch up");
|
||||
console.topic("replication=debug", "waiting for slave to catch up");
|
||||
printed = true;
|
||||
}
|
||||
internal.wait(0.5, false);
|
||||
|
@ -1204,17 +1203,17 @@ function ReplicationOtherDBSuite() {
|
|||
while (true) {
|
||||
let slaveState = replication.globalApplier.state();
|
||||
if (slaveState.state.lastError.errorNum > 0) {
|
||||
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
break;
|
||||
}
|
||||
|
||||
if (!slaveState.state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
if (compareTicks(slaveState.state.lastAppliedContinuousTick, lastLogTick) >= 0 ||
|
||||
compareTicks(slaveState.state.lastProcessedContinuousTick, lastLogTick) >= 0) {
|
||||
console.log("slave has caught up. state.lastLogTick:",
|
||||
console.topic("replication=debug", "slave has caught up. state.lastLogTick:",
|
||||
slaveState.state.lastLogTick, "slaveState.lastAppliedContinuousTick:",
|
||||
slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:",
|
||||
slaveState.state.lastProcessedContinuousTick);
|
||||
|
@ -1222,7 +1221,7 @@ function ReplicationOtherDBSuite() {
|
|||
}
|
||||
|
||||
if (!printed) {
|
||||
console.log("waiting for slave to catch up");
|
||||
console.topic("replication=debug", "waiting for slave to catch up");
|
||||
printed = true;
|
||||
}
|
||||
internal.wait(0.5, false);
|
||||
|
|
|
@ -125,24 +125,23 @@ const compare = function (masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFi
|
|||
var slaveState = replication.applier.state();
|
||||
|
||||
if (slaveState.state.lastError.errorNum > 0) {
|
||||
console.log("slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
console.topic("replication=error", "slave has errored:", JSON.stringify(slaveState.state.lastError));
|
||||
break;
|
||||
}
|
||||
|
||||
if (!slaveState.state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
|
||||
if (compareTicks(slaveState.state.lastAppliedContinuousTick, state.lastLogTick) >= 0 ||
|
||||
compareTicks(slaveState.state.lastProcessedContinuousTick, state.lastLogTick) >= 0) { // ||
|
||||
// compareTicks(slaveState.state.lastAvailableContinuousTick, syncResult.lastLogTick) > 0) {
|
||||
console.log("slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
console.topic("replication=debug", "slave has caught up. state.lastLogTick:", state.lastLogTick, "slaveState.lastAppliedContinuousTick:", slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", slaveState.state.lastProcessedContinuousTick);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!printed) {
|
||||
console.log("waiting for slave to catch up");
|
||||
console.topic("replication=debug", "waiting for slave to catch up");
|
||||
printed = true;
|
||||
}
|
||||
internal.wait(0.5, false);
|
||||
|
@ -1238,12 +1237,12 @@ function ReplicationOtherDBSuite() {
|
|||
while (i-- > 0) {
|
||||
let state = replication.applier.state();
|
||||
if (!state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
if (compareTicks(state.lastAppliedContinuousTick, lastLogTick) >= 0 ||
|
||||
compareTicks(state.lastProcessedContinuousTick, lastLogTick) >= 0) {
|
||||
console.log("slave has caught up");
|
||||
console.topic("replication=error", "slave has caught up");
|
||||
break;
|
||||
}
|
||||
internal.sleep(0.5);
|
||||
|
@ -1285,12 +1284,12 @@ function ReplicationOtherDBSuite() {
|
|||
while (i-- > 0) {
|
||||
let state = replication.applier.state();
|
||||
if (!state.running) {
|
||||
console.log("slave is not running");
|
||||
console.topic("replication=error", "slave is not running");
|
||||
break;
|
||||
}
|
||||
if (compareTicks(state.lastAppliedContinuousTick, lastLogTick) >= 0 ||
|
||||
compareTicks(state.lastProcessedContinuousTick, lastLogTick) >= 0) {
|
||||
console.log("slave has caught up");
|
||||
console.topic("replication=error", "slave has caught up");
|
||||
break;
|
||||
}
|
||||
internal.sleep(0.5);
|
||||
|
|
Loading…
Reference in New Issue