//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_PREGEL_COMPUTATION_H #define ARANGODB_PREGEL_COMPUTATION_H 1 #include #include #include "Basics/Common.h" #include "Pregel/Graph.h" #include "Pregel/GraphStore.h" #include "Pregel/OutgoingCache.h" #include "Pregel/WorkerConfig.h" #include "Pregel/WorkerContext.h" namespace arangodb { namespace pregel { template class Worker; class IAggregator; template class VertexContext { friend class Worker; uint64_t _gss = 0; uint64_t _lss = 0; WorkerContext* _context = nullptr; GraphStore* _graphStore = nullptr; AggregatorHandler* _readAggregators = nullptr; AggregatorHandler* _writeAggregators = nullptr; Vertex* _vertexEntry = nullptr; public: virtual ~VertexContext() = default; template inline void aggregate(std::string const& name, T const& value) { T const* ptr = &value; _writeAggregators->aggregate(name, ptr); } template inline const T* getAggregatedValue(std::string const& name) { return (const T*)_readAggregators->getAggregatedValue(name); } IAggregator const* getReadAggregator(std::string const& name) { return _readAggregators->getAggregator(name); } IAggregator* getWriteAggregator(std::string const& name) { return _writeAggregators->getAggregator(name); } inline WorkerContext const* context() const { return _context; } V* mutableVertexData() { return &(_vertexEntry->data()); } V vertexData() const { return _vertexEntry->data(); } size_t getEdgeCount() const { return _vertexEntry->getEdgeCount(); } RangeIterator> getEdges() const { return _graphStore->edgeIterator(_vertexEntry); } void setVertexData(V const& val) { _graphStore->replaceVertexData(_vertexEntry, (void*)(&val), sizeof(V)); } /// store data, will potentially move the data around void setVertexData(void const* ptr, size_t size) { _graphStore->replaceVertexData(_vertexEntry, (void*)ptr, size); } void voteHalt() { _vertexEntry->setActive(false); } void voteActive() { _vertexEntry->setActive(true); } bool isActive() { return _vertexEntry->active(); } inline uint64_t globalSuperstep() const { return _gss; } inline uint64_t localSuperstep() const { return _lss; } PregelShard shard() const { return _vertexEntry->shard(); } velocypack::StringRef key() const { return _vertexEntry->key(); } PregelID pregelId() const { return _vertexEntry->pregelId(); } }; template class VertexComputation : public VertexContext { friend class Worker; OutCache* _cache = nullptr; bool _enterNextGSS = false; public: virtual ~VertexComputation() = default; void sendMessage(Edge const* edge, M const& data) { _cache->appendMessage(edge->targetShard(), edge->toKey(), data); } void sendMessage(PregelID const& pid, M const& data) { _cache->appendMessage(pid.shard, velocypack::StringRef(pid.key), data); } /// Send message along outgoing edges to all reachable neighbours /// TODO Multi-receiver messages void sendMessageToAllNeighbours(M const& data) { RangeIterator> edges = this->getEdges(); for (; edges.hasMore(); ++edges) { Edge const* edge = *edges; _cache->appendMessage(edge->targetShard(), edge->toKey(), data); } } /// Causes messages to be available in GSS+1. /// Only valid in async mode, a no-op otherwise void enterNextGlobalSuperstep() { // _enterNextGSS is true when we are not in async mode // making this a no-op if (!_enterNextGSS) { _enterNextGSS = true; _cache->sendToNextGSS(true); } } virtual void compute(MessageIterator const& messages) = 0; }; template class VertexCompensation : public VertexContext { friend class Worker; public: virtual ~VertexCompensation() = default; virtual void compensate(bool inLostPartition) = 0; }; } // namespace pregel } // namespace arangodb #endif