//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_PREGEL_GRAPH_STORE_H #define ARANGODB_PREGEL_GRAPH_STORE_H 1 #include "Basics/StringHeap.h" #include "Cluster/ClusterInfo.h" #include "Pregel/Graph.h" #include "Pregel/GraphFormat.h" #include "Pregel/Iterators.h" #include "Pregel/TypedBuffer.h" #include "Utils/DatabaseGuard.h" #include #include #include #include struct TRI_vocbase_t; namespace arangodb { class LogicalCollection; namespace transaction { class Methods; } namespace pregel { template struct TypedBuffer; class WorkerConfig; template struct GraphFormat; //////////////////////////////////////////////////////////////////////////////// /// @brief carry graph data for a worker job. NOT THREAD SAFE ON DOCUMENT LOADS //////////////////////////////////////////////////////////////////////////////// template class GraphStore final { public: GraphStore(TRI_vocbase_t& vocbase, GraphFormat* graphFormat); ~GraphStore(); uint64_t numberVertexSegments() const { return _vertices.size(); } uint64_t localVertexCount() const { return _localVertexCount; } uint64_t localEdgeCount() const { return _localEdgeCount; } GraphFormat const* graphFormat() { return _graphFormat.get(); } // ====================== NOT THREAD SAFE =========================== void loadShards(WorkerConfig* state, std::function const&); void loadDocument(WorkerConfig* config, std::string const& documentID); void loadDocument(WorkerConfig* config, PregelShard sourceShard, velocypack::StringRef const& _key); // ====================================================================== // only thread safe if your threads coordinate access to memory locations RangeIterator> vertexIterator(); /// j and j are the first and last index of vertex segments RangeIterator> vertexIterator(size_t i, size_t j); RangeIterator> edgeIterator(Vertex const* entry); /// Write results to database void storeResults(WorkerConfig* config, std::function); private: void _loadVertices(ShardID const& vertexShard, std::vector const& edgeShards); void _loadEdges(transaction::Methods& trx, Vertex& vertexEntry, ShardID const& edgeShard, std::string const& documentID, std::vector>>>&, std::vector>>&); void _storeVertices(std::vector const& globalShards, RangeIterator>& it); size_t vertexSegmentSize () const { return std::ceil( 64 * 1024 * 1024 / sizeof(Vertex)); } size_t edgeSegmentSize() const { return std::ceil( 64 * 1024 * 1024 / sizeof(Edge)); } private: DatabaseGuard _vocbaseGuard; const std::unique_ptr> _graphFormat; WorkerConfig* _config = nullptr; /// Holds vertex keys, data and pointers to edges std::mutex _bufferMutex; std::vector>>> _vertices; std::vector>> _vertexKeys; std::vector>>> _edges; std::vector>*> _nextEdgeBuffer; std::vector>> _edgeKeys; // cache the amount of vertices std::set _loadedShards; // actual count of loaded vertices / edges std::atomic _localVertexCount; std::atomic _localEdgeCount; std::atomic _runningThreads; bool _destroyed = false; }; } // namespace pregel } // namespace arangodb #endif