diff --git a/.gitignore b/.gitignore index 11402f2bfa..e5981f0c49 100644 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,4 @@ npm-debug.log log-* data-* +cluster-init diff --git a/Documentation/Books/Manual/Graphs/Pregel/README.mdpp b/Documentation/Books/Manual/Graphs/Pregel/README.mdpp index 9973dc85ce..d28df95f66 100644 --- a/Documentation/Books/Manual/Graphs/Pregel/README.mdpp +++ b/Documentation/Books/Manual/Graphs/Pregel/README.mdpp @@ -15,12 +15,12 @@ This concept enables us to perform distributed graph processing, without the nee If you are running a single ArangoDB instance in single-server mode, there are no requirements regarding the modeling of your data. All you need is at least one vertex collection and one edge collection. Note that the performance may be -better if the number of your shards / collections matches the number of CPU cores. +better, if the number of your shards / collections matches the number of CPU cores. When you use ArangoDB Community edition in cluster mode, you might need to model your collections in a certain way to ensure correct results. For more information see the next section. -## Requirements for Collections (Non Smart Graph) +## Requirements for Collections in a Cluster (Non Smart Graph) To enable iterative graph processing for your data, you will need to ensure that your vertex and edge collections are sharded in a specific way. @@ -232,14 +232,15 @@ There are various definitions for centrality, the simplest one being the vertex ![Illustration of an execution of different centrality measures (Freeman 1977)](centrality_visual.png) -A common definitions of centrality is the \textbf{closeness centrality} (or closeness). + +##### Effective Closeness + +A common definitions of centrality is the **closeness centrality** (or closeness). The closeness of a vertex in a graph is the inverse average length of the shortest path between the vertex and all other vertices. For vertices *x*, *y* and shortest distance *d(y,x)* it is defined as ![Vertex Closeness](closeness.png) -##### Effective Closeness - Effective Closeness approximates the closeness measure. The algorithm works by iteratively estimating the number of shortest paths passing through each vertex. The score will approximates the the real closeness score, since it is not possible to actually count all shortest paths due to the horrendous O(n^2 * d) memory requirements. @@ -255,19 +256,21 @@ algorithm. The algorithm can be used like this var handle = pregel.start("effectivecloseness", "yourgraph", resultField: "closeness"); ``` -Another common measure is the *betweenness* centrality: It measures the number of times a vertex is part -of shortest paths between any two vertices. For a vertex *v* betweenness is defined as +##### LineRank + +Another common measure is the [betweenness* centrality](https://en.wikipedia.org/wiki/Betweenness_centrality): +It measures the number of times a vertex is part of shortest paths between any pairs of vertices. +For a vertex *v* betweenness is defined as ![Vertex Betweeness](betweeness.png) -Where $\sigma_{xy}$ is the number of shortest paths between $x$ and $y$, and $\sigma_{xy}(v)$ is the -number of paths also passing through $v$. The intuitive visualization of these metrics can be seen -below. +Where the σ represents the number of shortest paths between *x* and *y*, and σ(v) represents the +number of paths also passing through a vertex *v*. By intuition a vertex with higher betweeness centrality will have more information +passing through it. Unfortunately these definitions were not designed with scalability in mind. It is probably impossible to compute them efficiently and accurately. Fortunately there are scalable substitutions proposed by **U Kang et.al. 2011**. -##### LineRank **LineRank** approximates the random walk betweenness of every vertex in a graph. This is the probability that someone starting on an arbitary vertex, will visit this node when he randomly chooses edges to visit. diff --git a/arangod/Pregel/CommonFormats.h b/arangod/Pregel/CommonFormats.h index 2e78f65ed4..d116f859f5 100644 --- a/arangod/Pregel/CommonFormats.h +++ b/arangod/Pregel/CommonFormats.h @@ -60,15 +60,14 @@ struct DMIDValue { struct DMIDMessage { DMIDMessage() {} - DMIDMessage(PregelID const& pid, float const& val) - : senderId(pid), weight(val) {} + DMIDMessage(PregelID const& pid, float val) : senderId(pid), weight(val) {} DMIDMessage(PregelID const& sender, PregelID const& leader) : senderId(sender), leaderId(leader) {} PregelID senderId; PregelID leaderId; - float weight; + float weight = 0; }; /// A counter for counting unique vertex IDs using a HyperLogLog sketch. diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 4e2144ded9..ca5aedc5ed 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -89,7 +89,8 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase, if (_lazyLoading) { LOG_TOPIC(INFO, Logger::PREGEL) << "Enabled lazy loading"; } - _storeResults = VelocyPackHelper::getBooleanValue(config, "store", true); + VPackSlice storeSlice = config.get("store"); + _storeResults = !storeSlice.isBool() || storeSlice.getBool(); if (!_storeResults) { LOG_TOPIC(INFO, Logger::PREGEL) << "Will keep results in-memory"; } @@ -398,27 +399,27 @@ void Conductor::startRecovery() { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); boost::asio::io_service* ioService = SchedulerFeature::SCHEDULER->ioService(); TRI_ASSERT(ioService != nullptr); - + // let's wait for a final state in the cluster - _boost_timer.reset(new boost::asio::deadline_timer(*ioService, - boost::posix_time::seconds(2))); - _boost_timer->async_wait([this] (const boost::system::error_code& error) { + _boost_timer.reset(new boost::asio::deadline_timer( + *ioService, boost::posix_time::seconds(2))); + _boost_timer->async_wait([this](const boost::system::error_code& error) { _boost_timer.reset(); - - if (error == boost::asio::error::operation_aborted - || _state != ExecutionState::RECOVERING) { + + if (error == boost::asio::error::operation_aborted || + _state != ExecutionState::RECOVERING) { return; // seems like we are canceled } std::vector goodServers; int res = PregelFeature::instance()->recoveryManager()->filterGoodServers( - _dbServers, goodServers); + _dbServers, goodServers); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC(ERR, Logger::PREGEL) << "Recovery proceedings failed"; cancel(); return; } _dbServers = goodServers; - + VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); @@ -428,7 +429,7 @@ void Conductor::startRecovery() { if (_state != ExecutionState::RECOVERING) { return; // seems like we are canceled } - + // Let's try recovery if (_masterContext) { bool proceed = _masterContext->preCompensation(); @@ -436,14 +437,14 @@ void Conductor::startRecovery() { cancel(); } } - + VPackBuilder additionalKeys; additionalKeys.openObject(); additionalKeys.add(Utils::recoveryMethodKey, VPackValue(Utils::compensate)); _aggregators->serializeValues(b); additionalKeys.close(); _aggregators->resetValues(); - + // initialize workers will reconfigure the workers and set the // _dbServers list to the new primary DBServers res = _initializeWorkers(Utils::startRecoveryPath, additionalKeys.slice()); diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 2611b566e1..d24e2e6f9c 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -25,13 +25,13 @@ #include +#include #include "Basics/Common.h" #include "Basics/Mutex.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterInfo.h" #include "Pregel/Statistics.h" #include "VocBase/vocbase.h" -#include namespace arangodb { namespace pregel { @@ -87,7 +87,9 @@ class Conductor { uint64_t _totalVerticesCount = 0; uint64_t _totalEdgesCount = 0; /// some tracking info - double _startTimeSecs = 0, _computationStartTimeSecs, _endTimeSecs = 0; + double _startTimeSecs = 0; + double _computationStartTimeSecs = 0; + double _endTimeSecs = 0; std::unique_ptr _boost_timer; bool _startGlobalStep(); diff --git a/arangod/Pregel/Graph.h b/arangod/Pregel/Graph.h index 15e4191641..118c9a5a7c 100644 --- a/arangod/Pregel/Graph.h +++ b/arangod/Pregel/Graph.h @@ -33,13 +33,13 @@ namespace pregel { typedef std::string PregelKey; // typedef uint64_t PregelKey; typedef uint16_t PregelShard; -const PregelShard invalid_prgl_shard = -1; +const PregelShard InvalidPregelShard = -1; struct PregelID { PregelShard shard; PregelKey key; - PregelID() : shard(invalid_prgl_shard), key("") {} + PregelID() : shard(InvalidPregelShard), key("") {} PregelID(PregelShard s, PregelKey const& k) : shard(s), key(k) {} // PregelID(PregelShard s, std::string const& k) : shard(s), // key(std::stoull(k)) {} @@ -53,7 +53,7 @@ struct PregelID { } bool inline isValid() const { - return shard != invalid_prgl_shard && !key.empty(); + return shard != InvalidPregelShard && !key.empty(); } }; @@ -73,7 +73,7 @@ class Edge { public: // EdgeEntry() : _nextEntryOffset(0), _dataSize(0), _vertexIDSize(0) {} - Edge() {} + Edge() : _targetShard(InvalidPregelShard) {} Edge(PregelShard target, PregelKey const& key) : _targetShard(target), _toKey(key) {} @@ -99,14 +99,9 @@ class VertexEntry { bool _active = true; public: - VertexEntry() {} + VertexEntry() : _shard(InvalidPregelShard) {} VertexEntry(PregelShard shard, PregelKey const& key) - : _shard(shard), - _key(key), - _vertexDataOffset(0), - _edgeDataOffset(0), - _edgeCount(0), - _active(true) {} //_vertexIDSize(0) + : _shard(shard), _key(key) {} inline size_t getVertexDataOffset() const { return _vertexDataOffset; } inline size_t getEdgeDataOffset() const { return _edgeDataOffset; } diff --git a/arangod/Pregel/GraphStore.h b/arangod/Pregel/GraphStore.h index 224dd740e0..3bad4a2478 100644 --- a/arangod/Pregel/GraphStore.h +++ b/arangod/Pregel/GraphStore.h @@ -56,8 +56,8 @@ template class GraphStore { VocbaseGuard _vocbaseGuard; const std::unique_ptr> _graphFormat; - WorkerConfig* _config; - + WorkerConfig* _config = nullptr; + std::vector _index; TypedBuffer* _vertexData = nullptr; TypedBuffer>* _edges = nullptr; diff --git a/arangod/Pregel/MasterContext.h b/arangod/Pregel/MasterContext.h index f163944342..47585a028e 100644 --- a/arangod/Pregel/MasterContext.h +++ b/arangod/Pregel/MasterContext.h @@ -39,7 +39,7 @@ class MasterContext { uint64_t _edgeCount = 0; // Should cause the master to tell everyone to enter the next phase bool _enterNextGSS = false; - AggregatorHandler* _aggregators; + AggregatorHandler* _aggregators = nullptr; public: MasterContext(){}; diff --git a/arangod/Pregel/OutgoingCache.h b/arangod/Pregel/OutgoingCache.h index f61a6cb155..f37bb3fa9c 100644 --- a/arangod/Pregel/OutgoingCache.h +++ b/arangod/Pregel/OutgoingCache.h @@ -55,7 +55,7 @@ class OutCache { protected: WorkerConfig const* _config; MessageFormat const* _format; - InCache* _localCache; + InCache* _localCache = nullptr; InCache* _localCacheNextGSS = nullptr; std::string _baseUrl; uint32_t _batchSize = 1000; diff --git a/arangod/Pregel/TypedBuffer.h b/arangod/Pregel/TypedBuffer.h index 763c0d7518..6f8b847e21 100644 --- a/arangod/Pregel/TypedBuffer.h +++ b/arangod/Pregel/TypedBuffer.h @@ -41,7 +41,7 @@ template struct TypedBuffer { /// close file (see close() ) virtual ~TypedBuffer(){}; - TypedBuffer() {} + TypedBuffer() : _ptr(nullptr) {} /// @brief return whether the datafile is a physical file (true) or an /// anonymous mapped region (false) diff --git a/arangod/Pregel/VertexComputation.h b/arangod/Pregel/VertexComputation.h index dd5f1cbdd6..12e1ae7f56 100644 --- a/arangod/Pregel/VertexComputation.h +++ b/arangod/Pregel/VertexComputation.h @@ -45,11 +45,11 @@ class VertexContext { uint64_t _gss = 0; uint64_t _lss = 0; - WorkerContext* _context; - GraphStore* _graphStore; - AggregatorHandler* _readAggregators; - AggregatorHandler* _writeAggregators; - VertexEntry* _vertexEntry; + WorkerContext* _context = nullptr; + GraphStore* _graphStore = nullptr; + AggregatorHandler* _readAggregators = nullptr; + AggregatorHandler* _writeAggregators = nullptr; + VertexEntry* _vertexEntry = nullptr; public: virtual ~VertexContext() {} @@ -101,7 +101,7 @@ class VertexContext { template class VertexComputation : public VertexContext { friend class Worker; - OutCache* _cache; + OutCache* _cache = nullptr; bool _enterNextGSS = false; public: diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 7aaa757031..85a5c94e37 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -59,18 +59,19 @@ using namespace arangodb::pregel; template Worker::Worker(TRI_vocbase_t* vocbase, Algorithm* algo, VPackSlice initConfig) - : _config(vocbase, initConfig), _algorithm(algo) { + : _state(WorkerState::IDLE), + _config(vocbase, initConfig), + _algorithm(algo), + _nextGSSSendMessageCount(0) { MUTEX_LOCKER(guard, _commandMutex); VPackSlice userParams = initConfig.get(Utils::userParametersKey); - _state = WorkerState::IDLE; _workerContext.reset(algo->workerContext(userParams)); _messageFormat.reset(algo->messageFormat()); _messageCombiner.reset(algo->messageCombiner()); _conductorAggregators.reset(new AggregatorHandler(algo)); _workerAggregators.reset(new AggregatorHandler(algo)); _graphStore.reset(new GraphStore(vocbase, _algorithm->inputFormat())); - _nextGSSSendMessageCount = 0; if (_config.asynchronousMode()) { _messageBatchSize = _algorithm->messageBatchSize(_config, _messageStats); } else { @@ -112,9 +113,6 @@ Worker::Worker(TRI_vocbase_t* vocbase, Algorithm* algo, } } -/*template -GSSContext::~GSSContext() {}*/ - template Worker::~Worker() { LOG_TOPIC(DEBUG, Logger::PREGEL) << "Called ~Worker()"; @@ -572,13 +570,14 @@ void Worker::_continueAsync() { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); boost::asio::io_service* ioService = SchedulerFeature::SCHEDULER->ioService(); TRI_ASSERT(ioService != nullptr); - + // wait for new messages before beginning to process - int64_t milli = _writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5; + int64_t milli = + _writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5; // start next iteration in $milli mseconds. - _boost_timer.reset(new boost::asio::deadline_timer(*ioService, - boost::posix_time::millisec(milli))); - _boost_timer->async_wait([this] (const boost::system::error_code& error) { + _boost_timer.reset(new boost::asio::deadline_timer( + *ioService, boost::posix_time::millisec(milli))); + _boost_timer->async_wait([this](const boost::system::error_code& error) { if (error != boost::asio::error::operation_aborted) { { // swap these pointers atomically MY_WRITE_LOCKER(guard, _cacheRWLock); diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 9fdfd1e6db..86e49af33b 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -24,6 +24,7 @@ #define ARANGODB_PREGEL_WORKER_H 1 #include +#include #include "Basics/Common.h" #include "Basics/Mutex.h" #include "Basics/ReadWriteLock.h" @@ -32,7 +33,6 @@ #include "Pregel/Statistics.h" #include "Pregel/WorkerConfig.h" #include "Pregel/WorkerContext.h" -#include struct TRI_vocbase_t; namespace arangodb { @@ -100,7 +100,7 @@ class Worker : public IWorker { // only valid while recovering to determine the offset // where new vertices were inserted - size_t _preRecoveryTotal; + size_t _preRecoveryTotal = 0; std::unique_ptr _conductorAggregators; std::unique_ptr _workerAggregators; @@ -130,7 +130,7 @@ class Worker : public IWorker { /// if the worker has started sendng messages to the next GSS std::atomic _requestedNextGSS; std::unique_ptr _boost_timer; - + void _initializeMessageCaches(); void _initializeVertexContext(VertexContext* ctx); void _startProcessing(); diff --git a/arangod/Pregel/WorkerContext.h b/arangod/Pregel/WorkerContext.h index b73fadf781..a9a994df06 100644 --- a/arangod/Pregel/WorkerContext.h +++ b/arangod/Pregel/WorkerContext.h @@ -58,7 +58,11 @@ class WorkerContext { virtual void postApplication(){}; public: - WorkerContext() {} + WorkerContext() + : _vertexCount(0), + _edgeCount(0), + _readAggregators(nullptr), + _writeAggregators(nullptr) {} virtual ~WorkerContext() {} inline uint64_t vertexCount() const { return _vertexCount; } diff --git a/js/server/tests/shell/shell-pregel.js b/js/server/tests/shell/shell-pregel.js index 69a9b75360..d693ea7200 100644 --- a/js/server/tests/shell/shell-pregel.js +++ b/js/server/tests/shell/shell-pregel.js @@ -44,11 +44,9 @@ function testAlgo(a, p) { var key = db._pregelStart(a, vColl, eColl, p); var i = 1000; do { - console.log("Waiting..."); internal.wait(1); var doc = db._pregelStatus(key); if (doc.state !== "running") { - console.log("Finished algorithm " + a); db[vColl].all().toArray() .forEach(function(d) { @@ -60,7 +58,6 @@ function testAlgo(a, p) { } } }); - console.log("Done executing " + a + " : " + key); break; } } while(i-- >= 0);