1
0
Fork 0

added pregel vertex / edge count checks

This commit is contained in:
Simon Grätzer 2017-06-07 17:18:59 +02:00
parent 8dcf045a7b
commit 571356a6ae
4 changed files with 26 additions and 21 deletions

View File

@ -690,6 +690,22 @@ VPackBuilder Conductor::collectAQLResults() {
return messages;
}
VPackBuilder Conductor::toVelocyPack() const {
VPackBuilder result;
result.openObject();
result.add("state", VPackValue(pregel::ExecutionStateNames[_state]));
result.add("gss", VPackValue(_globalSuperstep));
result.add("totalRuntime", VPackValue(totalRuntimeSecs()));
_aggregators->serializeValues(result);
_statistics.serializeValues(result);
if (_state != ExecutionState::RUNNING) {
result.add("vertexCount", VPackValue(_totalVerticesCount));
result.add("edgeCount", VPackValue(_totalEdgesCount));
}
result.close();
return result;
}
int Conductor::_sendToAllDBServers(std::string const& path,
VPackBuilder const& message) {
return _sendToAllDBServers(path, message, std::function<void(VPackSlice)>());

View File

@ -116,14 +116,10 @@ class Conductor {
void start();
void cancel();
void startRecovery();
AggregatorHandler const* aggregators() const { return _aggregators.get(); }
ExecutionState getState() const { return _state; }
StatsManager workerStats() const { return _statistics; }
uint64_t globalSuperstep() const { return _globalSuperstep; }
VPackBuilder collectAQLResults();
double totalRuntimeSecs() {
VPackBuilder toVelocyPack() const;
double totalRuntimeSecs() const {
return _endTimeSecs == 0 ? TRI_microtime() - _startTimeSecs
: _endTimeSecs - _startTimeSecs;
}

View File

@ -93,7 +93,7 @@ struct StatsManager {
}
}
void serializeValues(VPackBuilder& b) {
void serializeValues(VPackBuilder& b) const {
MessageStats stats;
for (auto const& pair : _serverStats) {
stats.accumulate(pair.second);
@ -102,7 +102,7 @@ struct StatsManager {
}
/// Test if all messages were processed
bool allMessagesProcessed() {
bool allMessagesProcessed() const {
uint64_t send = 0, received = 0;
for (auto const& pair : _serverStats) {
send += pair.second.sendCount;
@ -111,7 +111,7 @@ struct StatsManager {
return send == received;
}
void debugOutput() {
void debugOutput() const {
uint64_t send = 0, received = 0;
for (auto const& pair : _serverStats) {
send += pair.second.sendCount;
@ -122,7 +122,7 @@ struct StatsManager {
}
/// tests if active count is greater 0
bool noActiveVertices() {
bool noActiveVertices() const {
for (auto const& pair : _activeStats) {
if (pair.second > 0) {
return false;
@ -139,7 +139,7 @@ struct StatsManager {
void reset() { _serverStats.clear(); }
size_t clientCount() { return _serverStats.size(); }
size_t clientCount() const { return _serverStats.size(); }
private:
std::map<std::string, uint64_t> _activeStats;

View File

@ -2225,15 +2225,8 @@ static void JS_PregelStatus(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_USAGE("Execution number is invalid");
}
VPackBuilder result;
result.openObject();
result.add("state", VPackValue(pregel::ExecutionStateNames[c->getState()]));
result.add("gss", VPackValue(c->globalSuperstep()));
result.add("totalRuntime", VPackValue(c->totalRuntimeSecs()));
c->aggregators()->serializeValues(result);
c->workerStats().serializeValues(result);
result.close();
TRI_V8_RETURN(TRI_VPackToV8(isolate, result.slice()));
VPackBuilder builder = c->toVelocyPack();
TRI_V8_RETURN(TRI_VPackToV8(isolate, builder.slice()));
TRI_V8_TRY_CATCH_END
}