//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "ConnectedComponents.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" #include "Pregel/Algorithm.h" #include "Pregel/GraphStore.h" #include "Pregel/IncomingCache.h" #include "Pregel/VertexComputation.h" using namespace arangodb; using namespace arangodb::pregel; using namespace arangodb::pregel::algos; struct MyComputation : public VertexComputation { MyComputation() {} void compute(MessageIterator const& messages) override { if (localSuperstep() == 0) { sendMessageToAllNeighbours(vertexData()); } else { int64_t currentComponent = vertexData(); for (const int64_t* msg : messages) { if (*msg < currentComponent) { currentComponent = *msg; }; } if (currentComponent != vertexData()) { *mutableVertexData() = currentComponent; sendMessageToAllNeighbours(currentComponent); } voteHalt(); } } }; VertexComputation* ConnectedComponents::createComputation( WorkerConfig const* config) const { return new MyComputation(); } struct MyGraphFormat final : public VertexGraphFormat { explicit MyGraphFormat(application_features::ApplicationServer& server, std::string const& result) : VertexGraphFormat(server, result, 0) {} void copyVertexData(std::string const& documentId, arangodb::velocypack::Slice document, int64_t& targetPtr) override { targetPtr = vertexIdRange++; } }; GraphFormat* ConnectedComponents::inputFormat() const { return new MyGraphFormat(_server, _resultField); } struct MyCompensation : public VertexCompensation { MyCompensation() {} void compensate(bool inLostPartition) override { // actually don't do anything, graph format will reinitialize lost vertices /*if (inLostPartition) { int64_t* data = mutableVertexData(); *data = INT64_MAX; }*/ } }; VertexCompensation* ConnectedComponents::createCompensation( WorkerConfig const* config) const { return new MyCompensation(); }