//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_IN_MESSAGE_CACHE_H #define ARANGODB_IN_MESSAGE_CACHE_H 1 #include #include #include #include #include "Basics/Common.h" #include "Basics/StringHeap.h" #include "Pregel/GraphStore.h" #include "Pregel/Iterators.h" #include "Pregel/MessageCombiner.h" #include "Pregel/MessageFormat.h" namespace arangodb { namespace pregel { class WorkerConfig; /* In the longer run, maybe write optimized implementations for certain use cases. For example threaded processing */ template class InCache { protected: mutable std::map _bucketLocker; std::atomic _containedMessageCount; MessageFormat const* _format; /// Initialize format and mutex map. /// @param config can be null if you don't want locks explicit InCache(MessageFormat const* format); virtual void _set(PregelShard shard, velocypack::StringRef const& vertexId, M const& data) = 0; public: virtual ~InCache() = default; MessageFormat const* format() const { return _format; } uint64_t containedMessageCount() const { return _containedMessageCount; } void parseMessages(VPackSlice const& messages); /// @brief Store a single message. /// Only ever call when you are sure this is a thread local store void storeMessageNoLock(PregelShard shard, velocypack::StringRef const& vertexId, M const& data); /// @brief Store a single message void storeMessage(PregelShard shard, velocypack::StringRef const& vertexId, M const& data); virtual void mergeCache(WorkerConfig const& config, InCache const* otherCache) = 0; /// @brief get messages for vertex id. (Don't use keys from _from or _to /// directly, they contain the collection name) virtual MessageIterator getMessages(PregelShard shard, velocypack::StringRef const& key) = 0; /// clear cache virtual void clear() = 0; /// Deletes one entry. DOES NOT LOCK virtual void erase(PregelShard shard, velocypack::StringRef const& key) = 0; /// Calls function for each entry. DOES NOT LOCK virtual void forEach(std::function func) = 0; }; /// Cache version which stores a std::vector for each pregel id /// containing all messages for this vertex template class ArrayInCache : public InCache { typedef std::unordered_map> HMap; std::map _shardMap; protected: void _set(PregelShard shard, velocypack::StringRef const& vertexId, M const& data) override; public: ArrayInCache(WorkerConfig const* config, MessageFormat const* format); void mergeCache(WorkerConfig const& config, InCache const* otherCache) override; MessageIterator getMessages(PregelShard shard, velocypack::StringRef const& key) override; void clear() override; void erase(PregelShard shard, velocypack::StringRef const& key) override; void forEach(std::function func) override; }; /// Cache which stores one value per vertex id template class CombiningInCache : public InCache { typedef std::unordered_map HMap; MessageCombiner const* _combiner; std::map _shardMap; protected: void _set(PregelShard shard, velocypack::StringRef const& vertexId, M const& data) override; public: CombiningInCache(WorkerConfig const* config, MessageFormat const* format, MessageCombiner const* combiner); MessageCombiner const* combiner() const { return _combiner; } void mergeCache(WorkerConfig const& config, InCache const* otherCache) override; MessageIterator getMessages(PregelShard shard, velocypack::StringRef const& key) override; void clear() override; void erase(PregelShard shard, velocypack::StringRef const& key) override; void forEach(std::function func) override; }; } // namespace pregel } // namespace arangodb #endif