1
0
Fork 0
arangodb/arangod/Pregel/Algos/AsyncSCC.cpp

270 lines
8.9 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "AsyncSCC.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Pregel/Aggregator.h"
#include "Pregel/Algorithm.h"
#include "Pregel/GraphStore.h"
#include "Pregel/IncomingCache.h"
#include "Pregel/MasterContext.h"
#include "Pregel/VertexComputation.h"
using namespace arangodb;
using namespace arangodb::pregel;
using namespace arangodb::pregel::algos;
static std::string const kPhase = "phase";
static std::string const kFoundNewMax = "max";
static std::string const kConverged = "converged";
enum SCCPhase {
TRANSPOSE = 0,
TRIMMING = 1,
FORWARD_TRAVERSAL = 2,
BACKWARD_TRAVERSAL_START = 3,
BACKWARD_TRAVERSAL_REST = 4
};
struct ASCCComputation final
: public VertexComputation<SCCValue, int8_t, SenderMessage<uint64_t>> {
ASCCComputation() {}
void compute(MessageIterator<SenderMessage<uint64_t>> const& messages) override {
if (isActive() == false) {
// color was already determinded or vertex was trimmed
return;
}
SCCValue* vertexState = mutableVertexData();
uint32_t const* phase = getAggregatedValue<uint32_t>(kPhase);
switch (*phase) {
// let all our connected nodes know we are there
case SCCPhase::TRANSPOSE: {
// only one step in this phase
enterNextGlobalSuperstep();
vertexState->parents.clear();
SenderMessage<uint64_t> message(pregelId(), 0);
sendMessageToAllNeighbours(message);
break;
}
// Creates list of parents based on the received ids and halts the
// vertices
// that don't have any parent or outgoing edge, hence, they can't be
// part of an SCC.
case SCCPhase::TRIMMING: {
// only one step in this phase
enterNextGlobalSuperstep();
for (SenderMessage<uint64_t> const* msg : messages) {
vertexState->parents.push_back(msg->senderId);
}
// reset the color for vertices which are not active
vertexState->color = vertexState->vertexID;
// If this node doesn't have any parents or outgoing edges,
// it can't be part of an SCC
if (vertexState->parents.size() == 0 || getEdgeCount() == 0) {
voteHalt();
} else {
SenderMessage<uint64_t> message(pregelId(), vertexState->color);
sendMessageToAllNeighbours(message);
}
break;
}
// converging phase
case SCCPhase::FORWARD_TRAVERSAL: {
uint64_t old = vertexState->color;
for (SenderMessage<uint64_t> const* msg : messages) {
if (vertexState->color < msg->value) {
vertexState->color = msg->value;
}
}
if (old != vertexState->color) {
SenderMessage<uint64_t> message(pregelId(), vertexState->color);
sendMessageToAllNeighbours(message);
aggregate(kFoundNewMax, true);
}
break;
}
case SCCPhase::BACKWARD_TRAVERSAL_START: {
// only one step in this phase
enterNextGlobalSuperstep();
// if I am the 'root' of a SCC start traversak
if (vertexState->vertexID == vertexState->color) {
SenderMessage<uint64_t> message(pregelId(), vertexState->color);
// sendMessageToAllParents
for (PregelID const& pid : vertexState->parents) {
sendMessage(pid, message);
}
}
break;
}
// converging phase
case SCCPhase::BACKWARD_TRAVERSAL_REST: {
for (SenderMessage<uint64_t> const* msg : messages) {
if (vertexState->color == msg->value) {
for (PregelID const& pid : vertexState->parents) {
sendMessage(pid, *msg);
}
aggregate(kConverged, true);
voteHalt();
break;
}
}
break;
}
}
}
};
VertexComputation<SCCValue, int8_t, SenderMessage<uint64_t>>* AsyncSCC::createComputation(
WorkerConfig const* config) const {
return new ASCCComputation();
}
struct SCCGraphFormat : public GraphFormat<SCCValue, int8_t> {
const std::string _resultField;
uint64_t vertexIdRange = 0;
explicit SCCGraphFormat(std::string const& result) : _resultField(result) {}
void willLoadVertices(uint64_t count) override {
// if we aren't running in a cluster it doesn't matter
if (arangodb::ServerState::instance()->isRunningInCluster()) {
arangodb::ClusterInfo* ci = arangodb::ClusterInfo::instance();
if (ci) {
vertexIdRange = ci->uniqid(count);
}
}
}
size_t estimatedEdgeSize() const override { return 0; };
void copyVertexData(std::string const& documentId, arangodb::velocypack::Slice document,
SCCValue& targetPtr) override {
targetPtr.vertexID = vertexIdRange++;
}
void copyEdgeData(arangodb::velocypack::Slice document, int8_t& targetPtr) override {}
bool buildVertexDocument(arangodb::velocypack::Builder& b,
const SCCValue* ptr, size_t size) const override {
SCCValue* senders = (SCCValue*)ptr;
b.add(_resultField, VPackValue(senders->color));
return true;
}
bool buildEdgeDocument(arangodb::velocypack::Builder& b, const int8_t* ptr,
size_t size) const override {
return false;
}
};
GraphFormat<SCCValue, int8_t>* AsyncSCC::inputFormat() const {
return new SCCGraphFormat(_resultField);
}
struct ASCCMasterContext : public MasterContext {
ASCCMasterContext() {} // TODO use _threashold
void preGlobalSuperstep() override {
if (globalSuperstep() == 0) {
enterNextGlobalSuperstep();
return;
}
uint32_t const* phase = getAggregatedValue<uint32_t>(kPhase);
switch (*phase) {
case SCCPhase::TRANSPOSE:
LOG_TOPIC("b0431", DEBUG, Logger::PREGEL) << "Phase: TRIMMING";
enterNextGlobalSuperstep();
aggregate<uint32_t>(kPhase, SCCPhase::TRIMMING);
break;
case SCCPhase::TRIMMING:
LOG_TOPIC("44a2f", DEBUG, Logger::PREGEL) << "Phase: FORWARD_TRAVERSAL";
enterNextGlobalSuperstep();
aggregate<uint32_t>(kPhase, SCCPhase::FORWARD_TRAVERSAL);
break;
case SCCPhase::FORWARD_TRAVERSAL: {
bool const* newMaxFound = getAggregatedValue<bool>(kFoundNewMax);
if (*newMaxFound == false) {
LOG_TOPIC("14832", DEBUG, Logger::PREGEL) << "Phase: BACKWARD_TRAVERSAL_START";
aggregate<uint32_t>(kPhase, SCCPhase::BACKWARD_TRAVERSAL_START);
}
} break;
case SCCPhase::BACKWARD_TRAVERSAL_START:
LOG_TOPIC("8d480", DEBUG, Logger::PREGEL) << "Phase: BACKWARD_TRAVERSAL_REST";
aggregate<uint32_t>(kPhase, SCCPhase::BACKWARD_TRAVERSAL_REST);
break;
case SCCPhase::BACKWARD_TRAVERSAL_REST:
bool const* converged = getAggregatedValue<bool>(kConverged);
// continue until no more vertices are updated
if (*converged == false) {
LOG_TOPIC("a9542", DEBUG, Logger::PREGEL) << "Phase: TRANSPOSE";
aggregate<uint32_t>(kPhase, SCCPhase::TRANSPOSE);
}
break;
}
};
void postLocalSuperstep() override {
uint32_t const* phase = getAggregatedValue<uint32_t>(kPhase);
if (*phase == SCCPhase::FORWARD_TRAVERSAL) {
bool const* newMaxFound = getAggregatedValue<bool>(kFoundNewMax);
if (*newMaxFound == false) {
enterNextGlobalSuperstep();
}
} else if (*phase == SCCPhase::BACKWARD_TRAVERSAL_REST) {
bool const* converged = getAggregatedValue<bool>(kConverged);
// continue until no more vertices are updated
if (*converged == false) {
enterNextGlobalSuperstep();
}
}
};
};
MasterContext* AsyncSCC::masterContext(VPackSlice userParams) const {
return new ASCCMasterContext();
}
IAggregator* AsyncSCC::aggregator(std::string const& name) const {
if (name == kPhase) { // permanent value
return new OverwriteAggregator<uint32_t>(SCCPhase::TRANSPOSE, true);
} else if (name == kFoundNewMax) {
return new BoolOrAggregator(false); // non perm
} else if (name == kConverged) {
return new BoolOrAggregator(false); // non perm
}
return nullptr;
}