1
0
Fork 0

Pregel: Coverty Scan fixes

This commit is contained in:
Simon Grätzer 2017-03-24 11:04:15 +01:00
parent 1ab8c44330
commit 39b2e349dd
15 changed files with 70 additions and 69 deletions

1
.gitignore vendored
View File

@ -105,3 +105,4 @@ npm-debug.log
log-*
data-*
cluster-init

View File

@ -15,12 +15,12 @@ This concept enables us to perform distributed graph processing, without the nee
If you are running a single ArangoDB instance in single-server mode, there are no requirements regarding the modeling
of your data. All you need is at least one vertex collection and one edge collection. Note that the performance may be
better if the number of your shards / collections matches the number of CPU cores.
better, if the number of your shards / collections matches the number of CPU cores.
When you use ArangoDB Community edition in cluster mode, you might need to model your collections in a certain way to
ensure correct results. For more information see the next section.
## Requirements for Collections (Non Smart Graph)
## Requirements for Collections in a Cluster (Non Smart Graph)
To enable iterative graph processing for your data, you will need to ensure
that your vertex and edge collections are sharded in a specific way.
@ -232,14 +232,15 @@ There are various definitions for centrality, the simplest one being the vertex
![Illustration of an execution of different centrality measures (Freeman 1977)](centrality_visual.png)
A common definitions of centrality is the \textbf{closeness centrality} (or closeness).
##### Effective Closeness
A common definitions of centrality is the **closeness centrality** (or closeness).
The closeness of a vertex in a graph is the inverse average length of the shortest path between the vertex
and all other vertices. For vertices *x*, *y* and shortest distance *d(y,x)* it is defined as
![Vertex Closeness](closeness.png)
##### Effective Closeness
Effective Closeness approximates the closeness measure. The algorithm works by iteratively estimating the number
of shortest paths passing through each vertex. The score will approximates the the real closeness score, since
it is not possible to actually count all shortest paths due to the horrendous O(n^2 * d) memory requirements.
@ -255,19 +256,21 @@ algorithm. The algorithm can be used like this
var handle = pregel.start("effectivecloseness", "yourgraph", resultField: "closeness");
```
Another common measure is the *betweenness* centrality: It measures the number of times a vertex is part
of shortest paths between any two vertices. For a vertex *v* betweenness is defined as
##### LineRank
Another common measure is the [betweenness* centrality](https://en.wikipedia.org/wiki/Betweenness_centrality):
It measures the number of times a vertex is part of shortest paths between any pairs of vertices.
For a vertex *v* betweenness is defined as
![Vertex Betweeness](betweeness.png)
Where $\sigma_{xy}$ is the number of shortest paths between $x$ and $y$, and $\sigma_{xy}(v)$ is the
number of paths also passing through $v$. The intuitive visualization of these metrics can be seen
below.
Where the σ represents the number of shortest paths between *x* and *y*, and σ(v) represents the
number of paths also passing through a vertex *v*. By intuition a vertex with higher betweeness centrality will have more information
passing through it.
Unfortunately these definitions were not designed with scalability in mind. It is probably impossible to compute them
efficiently and accurately. Fortunately there are scalable substitutions proposed by **U Kang et.al. 2011**.
##### LineRank
**LineRank** approximates the random walk betweenness of every vertex in a graph. This is the probability that someone starting on
an arbitary vertex, will visit this node when he randomly chooses edges to visit.

View File

@ -60,15 +60,14 @@ struct DMIDValue {
struct DMIDMessage {
DMIDMessage() {}
DMIDMessage(PregelID const& pid, float const& val)
: senderId(pid), weight(val) {}
DMIDMessage(PregelID const& pid, float val) : senderId(pid), weight(val) {}
DMIDMessage(PregelID const& sender, PregelID const& leader)
: senderId(sender), leaderId(leader) {}
PregelID senderId;
PregelID leaderId;
float weight;
float weight = 0;
};
/// A counter for counting unique vertex IDs using a HyperLogLog sketch.

View File

@ -89,7 +89,8 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase,
if (_lazyLoading) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Enabled lazy loading";
}
_storeResults = VelocyPackHelper::getBooleanValue(config, "store", true);
VPackSlice storeSlice = config.get("store");
_storeResults = !storeSlice.isBool() || storeSlice.getBool();
if (!_storeResults) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Will keep results in-memory";
}
@ -398,27 +399,27 @@ void Conductor::startRecovery() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
boost::asio::io_service* ioService = SchedulerFeature::SCHEDULER->ioService();
TRI_ASSERT(ioService != nullptr);
// let's wait for a final state in the cluster
_boost_timer.reset(new boost::asio::deadline_timer(*ioService,
boost::posix_time::seconds(2)));
_boost_timer->async_wait([this] (const boost::system::error_code& error) {
_boost_timer.reset(new boost::asio::deadline_timer(
*ioService, boost::posix_time::seconds(2)));
_boost_timer->async_wait([this](const boost::system::error_code& error) {
_boost_timer.reset();
if (error == boost::asio::error::operation_aborted
|| _state != ExecutionState::RECOVERING) {
if (error == boost::asio::error::operation_aborted ||
_state != ExecutionState::RECOVERING) {
return; // seems like we are canceled
}
std::vector<ServerID> goodServers;
int res = PregelFeature::instance()->recoveryManager()->filterGoodServers(
_dbServers, goodServers);
_dbServers, goodServers);
if (res != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(ERR, Logger::PREGEL) << "Recovery proceedings failed";
cancel();
return;
}
_dbServers = goodServers;
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
@ -428,7 +429,7 @@ void Conductor::startRecovery() {
if (_state != ExecutionState::RECOVERING) {
return; // seems like we are canceled
}
// Let's try recovery
if (_masterContext) {
bool proceed = _masterContext->preCompensation();
@ -436,14 +437,14 @@ void Conductor::startRecovery() {
cancel();
}
}
VPackBuilder additionalKeys;
additionalKeys.openObject();
additionalKeys.add(Utils::recoveryMethodKey, VPackValue(Utils::compensate));
_aggregators->serializeValues(b);
additionalKeys.close();
_aggregators->resetValues();
// initialize workers will reconfigure the workers and set the
// _dbServers list to the new primary DBServers
res = _initializeWorkers(Utils::startRecoveryPath, additionalKeys.slice());

View File

@ -25,13 +25,13 @@
#include <string>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Pregel/Statistics.h"
#include "VocBase/vocbase.h"
#include <boost/date_time/posix_time/posix_time.hpp>
namespace arangodb {
namespace pregel {
@ -87,7 +87,9 @@ class Conductor {
uint64_t _totalVerticesCount = 0;
uint64_t _totalEdgesCount = 0;
/// some tracking info
double _startTimeSecs = 0, _computationStartTimeSecs, _endTimeSecs = 0;
double _startTimeSecs = 0;
double _computationStartTimeSecs = 0;
double _endTimeSecs = 0;
std::unique_ptr<boost::asio::deadline_timer> _boost_timer;
bool _startGlobalStep();

View File

@ -33,13 +33,13 @@ namespace pregel {
typedef std::string PregelKey;
// typedef uint64_t PregelKey;
typedef uint16_t PregelShard;
const PregelShard invalid_prgl_shard = -1;
const PregelShard InvalidPregelShard = -1;
struct PregelID {
PregelShard shard;
PregelKey key;
PregelID() : shard(invalid_prgl_shard), key("") {}
PregelID() : shard(InvalidPregelShard), key("") {}
PregelID(PregelShard s, PregelKey const& k) : shard(s), key(k) {}
// PregelID(PregelShard s, std::string const& k) : shard(s),
// key(std::stoull(k)) {}
@ -53,7 +53,7 @@ struct PregelID {
}
bool inline isValid() const {
return shard != invalid_prgl_shard && !key.empty();
return shard != InvalidPregelShard && !key.empty();
}
};
@ -73,7 +73,7 @@ class Edge {
public:
// EdgeEntry() : _nextEntryOffset(0), _dataSize(0), _vertexIDSize(0) {}
Edge() {}
Edge() : _targetShard(InvalidPregelShard) {}
Edge(PregelShard target, PregelKey const& key)
: _targetShard(target), _toKey(key) {}
@ -99,14 +99,9 @@ class VertexEntry {
bool _active = true;
public:
VertexEntry() {}
VertexEntry() : _shard(InvalidPregelShard) {}
VertexEntry(PregelShard shard, PregelKey const& key)
: _shard(shard),
_key(key),
_vertexDataOffset(0),
_edgeDataOffset(0),
_edgeCount(0),
_active(true) {} //_vertexIDSize(0)
: _shard(shard), _key(key) {}
inline size_t getVertexDataOffset() const { return _vertexDataOffset; }
inline size_t getEdgeDataOffset() const { return _edgeDataOffset; }

View File

@ -56,8 +56,8 @@ template <typename V, typename E>
class GraphStore {
VocbaseGuard _vocbaseGuard;
const std::unique_ptr<GraphFormat<V, E>> _graphFormat;
WorkerConfig* _config;
WorkerConfig* _config = nullptr;
std::vector<VertexEntry> _index;
TypedBuffer<V>* _vertexData = nullptr;
TypedBuffer<Edge<E>>* _edges = nullptr;

View File

@ -39,7 +39,7 @@ class MasterContext {
uint64_t _edgeCount = 0;
// Should cause the master to tell everyone to enter the next phase
bool _enterNextGSS = false;
AggregatorHandler* _aggregators;
AggregatorHandler* _aggregators = nullptr;
public:
MasterContext(){};

View File

@ -55,7 +55,7 @@ class OutCache {
protected:
WorkerConfig const* _config;
MessageFormat<M> const* _format;
InCache<M>* _localCache;
InCache<M>* _localCache = nullptr;
InCache<M>* _localCacheNextGSS = nullptr;
std::string _baseUrl;
uint32_t _batchSize = 1000;

View File

@ -41,7 +41,7 @@ template <typename T>
struct TypedBuffer {
/// close file (see close() )
virtual ~TypedBuffer(){};
TypedBuffer() {}
TypedBuffer() : _ptr(nullptr) {}
/// @brief return whether the datafile is a physical file (true) or an
/// anonymous mapped region (false)

View File

@ -45,11 +45,11 @@ class VertexContext {
uint64_t _gss = 0;
uint64_t _lss = 0;
WorkerContext* _context;
GraphStore<V, E>* _graphStore;
AggregatorHandler* _readAggregators;
AggregatorHandler* _writeAggregators;
VertexEntry* _vertexEntry;
WorkerContext* _context = nullptr;
GraphStore<V, E>* _graphStore = nullptr;
AggregatorHandler* _readAggregators = nullptr;
AggregatorHandler* _writeAggregators = nullptr;
VertexEntry* _vertexEntry = nullptr;
public:
virtual ~VertexContext() {}
@ -101,7 +101,7 @@ class VertexContext {
template <typename V, typename E, typename M>
class VertexComputation : public VertexContext<V, E, M> {
friend class Worker<V, E, M>;
OutCache<M>* _cache;
OutCache<M>* _cache = nullptr;
bool _enterNextGSS = false;
public:

View File

@ -59,18 +59,19 @@ using namespace arangodb::pregel;
template <typename V, typename E, typename M>
Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
VPackSlice initConfig)
: _config(vocbase, initConfig), _algorithm(algo) {
: _state(WorkerState::IDLE),
_config(vocbase, initConfig),
_algorithm(algo),
_nextGSSSendMessageCount(0) {
MUTEX_LOCKER(guard, _commandMutex);
VPackSlice userParams = initConfig.get(Utils::userParametersKey);
_state = WorkerState::IDLE;
_workerContext.reset(algo->workerContext(userParams));
_messageFormat.reset(algo->messageFormat());
_messageCombiner.reset(algo->messageCombiner());
_conductorAggregators.reset(new AggregatorHandler(algo));
_workerAggregators.reset(new AggregatorHandler(algo));
_graphStore.reset(new GraphStore<V, E>(vocbase, _algorithm->inputFormat()));
_nextGSSSendMessageCount = 0;
if (_config.asynchronousMode()) {
_messageBatchSize = _algorithm->messageBatchSize(_config, _messageStats);
} else {
@ -112,9 +113,6 @@ Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
}
}
/*template <typename M>
GSSContext::~GSSContext() {}*/
template <typename V, typename E, typename M>
Worker<V, E, M>::~Worker() {
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Called ~Worker()";
@ -572,13 +570,14 @@ void Worker<V, E, M>::_continueAsync() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
boost::asio::io_service* ioService = SchedulerFeature::SCHEDULER->ioService();
TRI_ASSERT(ioService != nullptr);
// wait for new messages before beginning to process
int64_t milli = _writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5;
int64_t milli =
_writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5;
// start next iteration in $milli mseconds.
_boost_timer.reset(new boost::asio::deadline_timer(*ioService,
boost::posix_time::millisec(milli)));
_boost_timer->async_wait([this] (const boost::system::error_code& error) {
_boost_timer.reset(new boost::asio::deadline_timer(
*ioService, boost::posix_time::millisec(milli)));
_boost_timer->async_wait([this](const boost::system::error_code& error) {
if (error != boost::asio::error::operation_aborted) {
{ // swap these pointers atomically
MY_WRITE_LOCKER(guard, _cacheRWLock);

View File

@ -24,6 +24,7 @@
#define ARANGODB_PREGEL_WORKER_H 1
#include <atomic>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h"
@ -32,7 +33,6 @@
#include "Pregel/Statistics.h"
#include "Pregel/WorkerConfig.h"
#include "Pregel/WorkerContext.h"
#include <boost/date_time/posix_time/posix_time.hpp>
struct TRI_vocbase_t;
namespace arangodb {
@ -100,7 +100,7 @@ class Worker : public IWorker {
// only valid while recovering to determine the offset
// where new vertices were inserted
size_t _preRecoveryTotal;
size_t _preRecoveryTotal = 0;
std::unique_ptr<AggregatorHandler> _conductorAggregators;
std::unique_ptr<AggregatorHandler> _workerAggregators;
@ -130,7 +130,7 @@ class Worker : public IWorker {
/// if the worker has started sendng messages to the next GSS
std::atomic<bool> _requestedNextGSS;
std::unique_ptr<boost::asio::deadline_timer> _boost_timer;
void _initializeMessageCaches();
void _initializeVertexContext(VertexContext<V, E, M>* ctx);
void _startProcessing();

View File

@ -58,7 +58,11 @@ class WorkerContext {
virtual void postApplication(){};
public:
WorkerContext() {}
WorkerContext()
: _vertexCount(0),
_edgeCount(0),
_readAggregators(nullptr),
_writeAggregators(nullptr) {}
virtual ~WorkerContext() {}
inline uint64_t vertexCount() const { return _vertexCount; }

View File

@ -44,11 +44,9 @@ function testAlgo(a, p) {
var key = db._pregelStart(a, vColl, eColl, p);
var i = 1000;
do {
console.log("Waiting...");
internal.wait(1);
var doc = db._pregelStatus(key);
if (doc.state !== "running") {
console.log("Finished algorithm " + a);
db[vColl].all().toArray()
.forEach(function(d) {
@ -60,7 +58,6 @@ function testAlgo(a, p) {
}
}
});
console.log("Done executing " + a + " : " + key);
break;
}
} while(i-- >= 0);